Streaming Data from MongoDB into Kafka with Kafka Connect and Debezium

Disclaimer: I am not a MongoDB person. These steps may or may not be appropriate and proper. But they worked for me :) Feel free to post in comments if I'm doing something wrong

MongoDB config - enabling replica sets

For Debezium to be able to stream changes from MongoDB, Mongo needs to have replication configured:

Docs: Replication / Convert a Standalone to a Replica Set

Stop Mongo:

rmoff@proxmox01 ~> sudo service mongod stop  

Add replica set config to /etc/mongod.conf:

[source,yaml]
----
replication:  
   replSetName: mongo01
----

Optionally, also set the bindIp so that it listens on all IPs, not just loopback:

[source,yaml]
----
net:  
  bindIp: 0.0.0.0
----

Start Mongo, and initiate the replica set:

rmoff@proxmox01 ~> sudo service mongod start  
rmoff@proxmox01 ~> mongo --host 127.0.0.1:27017  
MongoDB shell version v3.6.3  
connecting to: mongodb://127.0.0.1:27017/  
> rs.initiate()
{
        "info2" : "no configuration specified. Using a default configuration for the set",
        "me" : "127.0.0.1:27017",
        "ok" : 1,
        "operationTime" : Timestamp(1520428346, 1),
        "$clusterTime" : {
                "clusterTime" : Timestamp(1520428346, 1),
                "signature" : {
                        "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
                        "keyId" : NumberLong(0)
                }
        }
}
mongo01:PRIMARY>  

Check /var/log/mongodb/mongod.log, should see replica set config success and oplog get created:

2018-03-07T13:12:26.007+0000 I REPL     [conn1] replSetInitiate admin command received from client  
2018-03-07T13:12:26.007+0000 I REPL     [conn1] creating replication oplog of size: 2864MB...  
2018-03-07T13:12:26.048+0000 I REPL     [conn1] New replica set config in use: { _id: "mongo01", version: 1, protocolVersion: 1, members: [ { _id: 0, host: "127.0.0.1:27017", arbiterOnly: false, buildIndexes: true, hidden: false, priority: 1.0, tags: {}, slaveDelay: 0, votes: 1 } ], settings: { chainingAllowed: true, heartbeatIntervalMillis: 2000, heartbeatTimeoutSecs: 10, electionTimeoutMillis: 10000, catchUpTimeoutMillis: -1, catchUpTakeoverDelayMillis: 30000, getLastErrorModes: {}, getLastErrorDefaults: { w: 1, wtimeout: 0 }, replicaSetId: ObjectId('5a9fe53ac81eed28a3bf207a') } }  

Setting up Debezium to stream changes from MongoDB into Apache Kafka

There's a detailed explanation of how Debezium CDC works with Debezium on the Debezium doc site.

Install Debezium Mongo plugin

mkdir ~/connect-jars  
cd ~/connect-jars/  
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mongodb/0.7.4/debezium-connector-mongodb-0.7.4-plugin.tar.gz  
tar -xf debezium-connector-mongodb-0.7.4-plugin.tar.gz  

Add the plugin folder (e.g. /home/rmoff/connect-jars) to the Connect worker config file (e.g. ./etc/schema-registry/connect-avro-distributed.properties)

plugin.path=share/java,/home/rmoff/connect-jars  

Configure Debezium MongoDB connector

Config file (/home/rmoff/connect-config/mongodb.json):

{
  "name": "mongodb-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "rs0/localhost:27017",
    "mongodb.name": "ubnt",
    "database.whitelist": "ace"
  }
}

Load connector:

curl -i -X POST -H "Accept:application/json" \  
    -H  "Content-Type:application/json" http://localhost:8084/connectors/ \
    -d @/home/rmoff/connect-config/mongodb.json

Check Connect worker stdout:

[2018-03-07 13:33:35,303] INFO Beginning initial sync of 'mongo01' at {sec=1520429608, ord=1, h=-4885990198351632203} (io.debezium.connector.mongodb.Replicator:247)
[2018-03-07 13:33:35,324] INFO Preparing to use 1 thread(s) to sync 39 collection(s): mongo01.ace.wlangroup, mongo01.ace.extension, mongo01.ace.stat, mongo01.ace.networkconf, mongo01.ace.voucher, mongo01.ace.hotspotpackage, mongo01.ace.hotspotop, mongo01.ace.alarm, mongo01.ace.heatmap, mongo01.ace.map, mongo01.ace.dpigroup, mongo01.ace.setting, mongo01.ace.usergroup, mongo01.ace.verification, mongo01.ace.payment, mongo01.ace.heatmappoint, mongo01.ace.scheduletask, mongo01.ace.guest, mongo01.ace.admin, mongo01.ace.radiusprofile, mongo01.ace.portalfile, mongo01.ace.mediafile, mongo01.ace.device, mongo01.ace.firewallgroup, mongo01.ace.site, mongo01.ace.task, mongo01.ace.dynamicdns, mongo01.ace.portconf, mongo01.ace.wlanconf, mongo01.ace.rogue, mongo01.ace.routing, mongo01.ace.firewallrule, mongo01.ace.event, mongo01.ace.hotspot2conf, mongo01.ace.broadcastgroup, mongo01.ace.portforward, mongo01.ace.privilege, mongo01.ace.account, mongo01.ace.user (io.debezium.connector.mongodb.Replicator:276)
[2018-03-07 13:33:35,326] INFO Creating thread debezium-mongodbconnector-ubnt-copy-mongo01-0 (io.debezium.util.Threads:247)
[2018-03-07 13:33:35,327] INFO Starting initial sync of 'mongo01.ace.wlangroup' (io.debezium.connector.mongodb.Replicator:286)

Check topics:

rmoff@proxmox01 ~/connect-jars> kafka-topics --zookeeper localhost:2181 --list  
ubnt.ace.admin  
ubnt.ace.alarm  
ubnt.ace.broadcastgroup  
ubnt.ace.device  
ubnt.ace.dpigroup  
ubnt.ace.event  
ubnt.ace.guest  
ubnt.ace.map  
ubnt.ace.networkconf  
ubnt.ace.portconf  
ubnt.ace.portforward  
ubnt.ace.privilege  
ubnt.ace.rogue  
ubnt.ace.scheduletask  
ubnt.ace.setting  
ubnt.ace.site  
ubnt.ace.user  
ubnt.ace.usergroup  
ubnt.ace.wlanconf  
ubnt.ace.wlangroup  

Check data

{
  "after": {\"_id\" : {\"$oid\" : \"58385328e4b001431e4e497a\"},\"adopted\" : true,\"board_rev\" : 18,\"cfgversion\" : \"xxxxxxxxxxxxx\",\"config_network\" : {\"ip\" : \"192.168.10.12\",\"type\" : \"dhcp\"},\"ethernet_table\" : [{\"mac\" : \"xx:xx:xx:xx:xx:xx\",\"num_port\" : 1,\"name\" : \"eth0\"}],\"fw_caps\" : 75,\"has_eth1\" : false,\"has_speaker\" : false,\"inform_ip\" : \"192.168.10.172\",\"inform_url\" : \"http://192.168.10.172:8080/inform\",\"ip\" : \"192.168.10.68\",\"led_override\" : \"on\",\"mac\" : \"xx:xx:xx:xx:xx:xx\",\"model\" : \"BZ2\",\"name\" : \"Unifi AP - Study\",\"port_table\" : [],\"radio_table\" : [{\"radio\" : \"ng\",\"min_txpower\" : 5,\"max_txpower\" : 23,\"builtin_antenna\" : true,\"builtin_ant_gain\" : 0,\"nss\" : 2,\"name\" : \"wifi0\"}],\"serial\" : \"xxx\",\"site_id\" : \"xxx\",\"type\" : \"uap\",\"version\" : \"3.7.40.6115\",\"vwire_table\" : [],\"wifi_caps\" : 117,\"wlangroup_id_ng\" : \"xx\",\"x_authkey\" : \"xx\",\"x_fingerprint\" : \"xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:3e:39:08:45\",\"x_ssh_hostkey\" : \"xx/xx=\",\"x_ssh_hostkey_fingerprint\" : \"xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx\",\"x_vwirekey\" : \"xx\",\"map_id\" : \"xx\",\"x\" : 880.3939455186993,\"y\" : 966.2397514041841,\"locked\" : true,\"wlan_overrides\" : []}"
  },
  "patch": null,
  "source": {
    "version": {
      "string": "0.7.4"
    },
    "name": "ubnt",
    "rs": "mongo01",
    "ns": "ace.device",

Note that the MongoDB document is not as fields in the Kafka message, but instead everything is in the payload as a string field as escaped JSON.

Debezium does provide a Single Message Transform (SMT) to flatten the MongoDB record out like this, but in using it I hit a bug (DBZ-649) that seems to be down to the MongoDB collection documents having different fields between documents. The reported error was org.apache.kafka.connect.errors.DataException: <field> is not a valid field name.

However, using KSQL's EXTRACTJSONFIELD you can still work with the data as-is:

ksql> CREATE STREAM DEVICE (after VARCHAR) WITH (KAFKA_TOPIC='ubnt.ace.device-07',VALUE_FORMAT='JSON');  
ksql> select EXTRACTJSONFIELD(after,'$.name'),EXTRACTJSONFIELD(after,'$.ip') from device;  
Unifi AP - Study | 192.168.10.68  
Unifi AP - Attic | 192.168.10.67  
ubnt.moffatt.me | 77.102.5.159  
Unifi AP - Pantry | 192.168.10.71  

Robin Moffatt

Read more posts by this author.

Yorkshire, UK