rmoff's random ramblings
about talks

Using Kafka Connect and Debezium with Confluent Cloud

Published Oct 16, 2019 by in Kafka Connect, Debezium, Kafkacat, Confluent Cloud at https://rmoff.net/2019/10/16/using-kafka-connect-and-debezium-with-confluent-cloud/

This is based on using Confluent Cloud to provide your managed Kafka and Schema Registry. All that you run yourself is the Kafka Connect worker.

Optionally, you can use this Docker Compose to run the worker and a sample MySQL database.

What you need

A Confluent Cloud account with a Kafka and Schema Registry API host names and keys. Write these to a .env file:

CCLOUD_BROKER_HOST=
CCLOUD_API_KEY=
CCLOUD_API_SECRET=
CCLOUD_SCHEMA_REGISTRY_URL=
CCLOUD_SCHEMA_REGISTRY_API_KEY=
CCLOUD_SCHEMA_REGISTRY_API_SECRET=

Install Debezium connector

This article assumes that you’re running your own Kafka Connect worker with the appropriate configuration done to hook it up to Confluent Cloud’s brokers and Schema Registry.

You need to install the Debezium connector on the Kafka Connect worker:

confluent-hub install --no-prompt debezium/debezium-connector-mysql:0.10.0

You can also do this as part of your Docker Compose:

    command: 
      - bash 
      - -c 
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:0.10.0
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run & 
        #
        sleep infinity

Pre-create the topics to which you’ll be writing

  • Make sure your ccloud environment is using the correct Confluent Cloud cluster

    $ ccloud kafka cluster list
          Id      |       Name        | Provider |  Region   | Durability | Status
    +-------------+-------------------+----------+-----------+------------+--------+
        lkc-42p8m | pipeline-to-cloud | aws      | us-east-1 | HIGH       | UP
      * lkc-43xgj | race-mapper       | aws      | us-east-1 | LOW        | UP
    
    $ ccloud kafka cluster use lkc-42p8m
  • Create the required topics:

    • Database history.

      Name is set in the configuration property database.history.kafka.topic. Must not be partitioned.

      ccloud kafka topic create --partitions 1 dbz_dbhistory.asgard-01

      If you don’t create this topic in advance, Debezium will do so for you, but with a hardcoded timeout of 3 seconds which is often not long enough in a Cloud environment—hence it’s best to create it in advance.

    • Schema changes.

      Enabled by default, set include.schema.changes to false if not required.

      Name is taken from the configuration property database.server.name (asgard). In this example I’m using the RegexRouter Single Message Transform which prepends a mysql-01- prefix to the topic name too. This is optional.

      Note that this topic must not be partitioned - Thanks to Terry Franklin for this 👍

      ccloud kafka topic create --partitions 1 mysql-01-asgard
    • One topic per table ingested. The topic name is made up by the database.server.name (asgard), the database name (demo), and the table name.

      In this example I’m using the RegexRouter Single Message Transform which prepends a mysql-01- prefix to the topic name too. This is optional.

      ccloud kafka topic create mysql-01-asgard.demo.customers
      ccloud kafka topic create mysql-01-asgard.demo.transactions
Note

If you don’t pre-create your topics, you’ll get repeating errors in your Kafka Connect worker log:

Error while fetching metadata with correlation id … : {<…topic…>=UNKNOWN_TOPIC_OR_PARTITION} 

You can create the topics afterwards if you forget, but it’s easier up-front.

Create the connector

Now create the connector itself, substituting your MySQL details below as indicated. The Confluent Cloud details and credentials will be picked up from the file /data/credentials.properties local to the Kafka Connect worker—which if you’re using Docker can be mapped from the same .env file as above. Or, just hardcode the values if you’d prefer 🤷‍.

The configuration is the same as a normal Debezium connector except the additional details for the connector to be able to connect to Confluent Cloud for writing and reading the database schema history topic.

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-debezium-mysql-01/config \
    -d '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.name": "asgard",
    "database.history.kafka.bootstrap.servers": "${file:/data/credentials.properties:CCLOUD_BROKER_HOST}",
    "database.history.kafka.topic": "dbz_dbhistory.asgard-01",
    "database.history.consumer.security.protocol": "SASL_SSL",
    "database.history.consumer.ssl.endpoint.identification.algorithm": "https",
    "database.history.consumer.sasl.mechanism": "PLAIN",
    "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:CCLOUD_API_KEY}\" password=\"${file:/data/credentials.properties:CCLOUD_API_SECRET}\";",
    "database.history.producer.security.protocol": "SASL_SSL",
    "database.history.producer.ssl.endpoint.identification.algorithm": "https",
    "database.history.producer.sasl.mechanism": "PLAIN",
    "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:CCLOUD_API_KEY}\" password=\"${file:/data/credentials.properties:CCLOUD_API_SECRET}\";",
    "table.whitelist":"demo.transactions,demo.customers",
    "decimal.handling.mode":"double",
    "transforms": "unwrap,addTopicPrefix",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.addTopicPrefix.regex":"(.*)",
    "transforms.addTopicPrefix.replacement":"mysql-01-$1"
    }'

Check that the connector is running:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | column -s : -t| sed 's/\"//g'| sort

source  |  source-debezium-mysql-01  |  RUNNING  |  RUNNING  |  io.debezium.connector.mysql.MySqlConnector

Consume the data

Confluent Cloud GUI

Confluent Cloud screenshot

kafkacat

# Set the variables, either from this script or manually
source .env

# Use kafkacat to pull Avro messages from Confluent Cloud 
#  deserialised using the Schema Registry hosted on Confluent Cloud

docker run --rm edenhill/kafkacat:1.5.0 \
      -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
      -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
      -b ${CCLOUD_BROKER_HOST} \
      -X sasl.username="${CCLOUD_API_KEY}" \
      -X sasl.password="${CCLOUD_API_SECRET}" \
      -r https://"${CCLOUD_SCHEMA_REGISTRY_API_KEY}":"${CCLOUD_SCHEMA_REGISTRY_API_SECRET}"@${CCLOUD_SCHEMA_REGISTRY_URL} \
      -s avro \
      -t mysql-01-asgard.demo.transactions \
      -C -o beginning

{"txn_id": {"int": 996}, "customer_id": {"int": 4}, "amount": {"double": 69.819999999999993}, "currency": {"string": "CNY"}, "txn_timestamp": {"string": "2018-04-10T10:23:41Z"}}
{"txn_id": {"int": 997}, "customer_id": {"int": 1}, "amount": {"double": 74.170000000000002}, "currency": {"string": "PEN"}, "txn_timestamp": {"string": "2018-11-19T15:29:14Z"}}
{"txn_id": {"int": 998}, "customer_id": {"int": 2}, "amount": {"double": -92.920000000000002}, "currency": {"string": "JPY"}, "txn_timestamp": {"string": "2018-05-25T19:43:48Z"}}
{"txn_id": {"int": 999}, "customer_id": {"int": 1}, "amount": {"double": 71.159999999999997}, "currency": {"string": "EUR"}, "txn_timestamp": {"string": "2018-11-15T07:24:44Z"}}
{"txn_id": {"int": 1000}, "customer_id": {"int": 5}, "amount": {"double": 28.149999999999999}, "currency": {"string": "IRR"}, "txn_timestamp": {"string": "2018-01-12T14:53:49Z"}}
{"txn_id": {"int": 603}, "customer_id": {"int": 4}, "amount": {"double": -85.510000000000005}, "currency": {"string": "CNY"}, "txn_timestamp": {"string": "2018-11-08T22:06:49Z"}}

Robin Moffatt

Robin Moffatt is a Principal DevEx Engineer at LakeFS. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2023