This is based on using Confluent Cloud to provide your managed Kafka and Schema Registry. All that you run yourself is the Kafka Connect worker.
Optionally, you can use this Docker Compose to run the worker and a sample MySQL database.
What you need đź”—
A Confluent Cloud account with a Kafka and Schema Registry API host names and keys. Write these to a .env
Install Debezium connector đź”—
This article assumes that you’re running your own Kafka Connect worker with the appropriate configuration done to hook it up to Confluent Cloud’s brokers and Schema Registry.
You need to install the Debezium connector on the Kafka Connect worker:
confluent-hub install --no-prompt debezium/debezium-connector-mysql:0.10.0
You can also do this as part of your Docker Compose:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt debezium/debezium-connector-mysql:0.10.0
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
sleep infinity
Pre-create the topics to which you’ll be writing 🔗
Make sure your
environment is using the correct Confluent Cloud cluster$ ccloud kafka cluster list Id | Name | Provider | Region | Durability | Status +-------------+-------------------+----------+-----------+------------+--------+ lkc-42p8m | pipeline-to-cloud | aws | us-east-1 | HIGH | UP * lkc-43xgj | race-mapper | aws | us-east-1 | LOW | UP $ ccloud kafka cluster use lkc-42p8m
Create the required topics:
Name is set in the configuration property
. Must not be partitioned.ccloud kafka topic create --partitions 1 dbz_dbhistory.asgard-01
If you don’t create this topic in advance, Debezium will do so for you, but with a hardcoded timeout of 3 seconds which is often not long enough in a Cloud environment—hence it’s best to create it in advance.
Enabled by default, set
to false if not required.Name is taken from the configuration property
). In this example I’m using theRegexRouter
Single Message Transform which prepends amysql-01-
prefix to the topic name too. This is optional.Note that this topic must not be partitioned - Thanks to Terry Franklin for this đź‘Ť
ccloud kafka topic create --partitions 1 mysql-01-asgard
One topic per table ingested. The topic name is made up by the
), the database name (demo
), and the table name.In this example I’m using the
Single Message Transform which prepends amysql-01-
prefix to the topic name too. This is optional.ccloud kafka topic create mysql-01-asgard.demo.customers ccloud kafka topic create mysql-01-asgard.demo.transactions
If you don’t pre-create your topics, you’ll get repeating errors in your Kafka Connect worker log:
You can create the topics afterwards if you forget, but it’s easier up-front. |
Create the connector đź”—
Now create the connector itself, substituting your MySQL details below as indicated. The Confluent Cloud details and credentials will be picked up from the file /data/
local to the Kafka Connect worker—which if you’re using Docker can be mapped from the same .env
file as above. Or, just hardcode the values if you’d prefer 🤷‍.
The configuration is the same as a normal Debezium connector except the additional details for the connector to be able to connect to Confluent Cloud for writing and reading the database schema history topic.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-debezium-mysql-01/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"": "asgard",
"database.history.kafka.bootstrap.servers": "${file:/data/}",
"database.history.kafka.topic": "dbz_dbhistory.asgard-01",
"": "SASL_SSL",
"database.history.consumer.ssl.endpoint.identification.algorithm": "https",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": " required username=\"${file:/data/}\" password=\"${file:/data/}\";",
"": "SASL_SSL",
"database.history.producer.ssl.endpoint.identification.algorithm": "https",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": " required username=\"${file:/data/}\" password=\"${file:/data/}\";",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
Check that the connector is running:
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '. | to_entries[] | [, .key, .value.status.connector.state,.value.status.tasks[].state,"connector.class"]|join(":|:")' | column -s : -t| sed 's/\"//g'| sort
source | source-debezium-mysql-01 | RUNNING | RUNNING | io.debezium.connector.mysql.MySqlConnector
Consume the data đź”—
Confluent Cloud GUI đź”—

kafkacat đź”—
# Set the variables, either from this script or manually
source .env
# Use kafkacat to pull Avro messages from Confluent Cloud
# deserialised using the Schema Registry hosted on Confluent Cloud
docker run --rm edenhill/kafkacat:1.5.0 \
-X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-X -X api.version.request=true \
-X sasl.username="${CCLOUD_API_KEY}" \
-X sasl.password="${CCLOUD_API_SECRET}" \
-s avro \
-t mysql-01-asgard.demo.transactions \
-C -o beginning
{"txn_id": {"int": 996}, "customer_id": {"int": 4}, "amount": {"double": 69.819999999999993}, "currency": {"string": "CNY"}, "txn_timestamp": {"string": "2018-04-10T10:23:41Z"}}
{"txn_id": {"int": 997}, "customer_id": {"int": 1}, "amount": {"double": 74.170000000000002}, "currency": {"string": "PEN"}, "txn_timestamp": {"string": "2018-11-19T15:29:14Z"}}
{"txn_id": {"int": 998}, "customer_id": {"int": 2}, "amount": {"double": -92.920000000000002}, "currency": {"string": "JPY"}, "txn_timestamp": {"string": "2018-05-25T19:43:48Z"}}
{"txn_id": {"int": 999}, "customer_id": {"int": 1}, "amount": {"double": 71.159999999999997}, "currency": {"string": "EUR"}, "txn_timestamp": {"string": "2018-11-15T07:24:44Z"}}
{"txn_id": {"int": 1000}, "customer_id": {"int": 5}, "amount": {"double": 28.149999999999999}, "currency": {"string": "IRR"}, "txn_timestamp": {"string": "2018-01-12T14:53:49Z"}}
{"txn_id": {"int": 603}, "customer_id": {"int": 4}, "amount": {"double": -85.510000000000005}, "currency": {"string": "CNY"}, "txn_timestamp": {"string": "2018-11-08T22:06:49Z"}}