Config

Config Details

DMux reads basic config during start-up from conf.json. It has list of dmuxItems. On startup DMux reads at run time the config for each dmuxItems to create and start connection.

Basic config include: * name * dmuxItems * logging

Sample config file which which DMux will reading during start-up

{
  "name": "myDmux",
  "dmuxItems": [
    {
      "name": "sample_name",
      "disabled": false,
      "connectionType": "kafka_http",
      "connection": {
        "dmux": {
          "size": 250,
          "distributor_type": "Hash",
          "batch_size": 1
        },
        "source": {
          "name": "source_name",
          "zk_path": "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/zk-path",
          "kafka_version_major": 2,
          "topic": "source_topic",
          "force_restart": false,
          "read_newest": true
        },
        "pending_acks": 1000000,
        "sink": {
          "endpoint": "http://elb/1.0/api/consume",
          "timeout": "10s",
          "retry_interval": "100ms",
          "headers": [
            {
              "name": "X-Client",
              "value": "go-dumx"
            },
            {
              "name": "Content-Type",
              "value": "application/json"
            }
          ],
          "method": "POST"
        }
      }
    }
  ],
  "logging": {
    "path": "default.log",
    "enable_debug": false,
    "rotation": {
      "size_in_mb": 256,
      "retention_count": 5,
      "retention_days": 90,
      "compress": true
    }
  }
}

Config Details:

Config Key Default Comment
name NA The name given for this dmux instance
dmuxItems NA dmuxItems are dmuxConnections each connection has name and connectionType - name is used to refer to its config and connectionType can be kafka_http or kafka_foxtrot
dmux.size 10 demultiplex size. If size = 10; 1 Source will connect to 10 sink. Use this to increase throughput until the client box resource is saturated.
dmux.distributor_type Hash Type of distributor other option is RoundRobin
dmux.batch_size 1 make this value > 1 to specify batching
source.name NA consumer_group_name for Kafka consumer. This will be used in zookeeper offset tracking
source.zk_path NA kafka zookeeper path
source.topic NA kafka topic you want to consume
source.force_restart false set to true to reset consumer to consume from start
source.read_newest false read from head if this value is set, this config will take in effect only if force_restart is true
source.kafka_version_major int set to 2 if the source is a kafka 2.x.x cluster, 1 if the source is a kafka 1.x.x cluster otherwise ignore it for default (0.8.2)
sink.endpoint NA http endpoint to hit, If connectionType == kafka_http then url given here will be appended by /{topic}/{partition}/{key}/{offset}. This will be POST call with byte[] in body, if connectionType == kafka_foxtrot then expected url should be http://foxtrot.com:10000/foxtrot/v1/document/KEY_NAME where KEY_NAME is replaced by kafka-key and body will be JSON. Note: if batch_size is > 1 then batching will result in byte[][] payload for kafka_http connection and []json payload for foxtrot connection
sink.timeout 10s http roundtrip timeout
sink.retry_interval 100ms time interval to sleep before retry if http call failed. Note: go-dmux has no concept of sideline, It will do infinite retries. Client is expected to build sideline if need at the Sink Application being hit
sink.headers NA static headers to be added in http call. Note: Content-Type:application/octet-stream will be added for POST calls for kafka_http and application/json for kafka_foxtrot
pending_acks 10000 No of unordered acks acceptable till go-dmux starts to apply backpressure to the source. Increase this if QPS does not increase on increasing size and you can see Warning Log in go-dmux that you hit this threshold. Cost of increasing this is memory and larger no of records replay when go-dmux crashes.
logging.path /var/log/go-dmux/default.log log file path
dmuxLogEnableDebug false boolean flag to enable debug logging
logging.rotation.size_in_mb 256 log size
logging.rotation.retention_count 5 number of log files to keep, rest are archived
logging.rotation.retention_days 90 number of days to keep log files, rest are archive.
logging.rotation.compress true will compress log files which are rotated

Note which every condition becomes true first in retention_count and retention_days will apply.