rmoff's random ramblings
about talks

Defining custom topic properties with Kafka Connect source connectors

Published Jan 6, 2021 by in Kafka Connect at https://rmoff.net/2021/01/06/defining-custom-topic-properties-with-kafka-connect-source-connectors/

When Kafka Connect ingests data from a source system into Kafka it writes it to a topic. Assuming you have set auto.create.topics.enable = true then it will create these topics for you, and do so using the defaults configured on the broker for topic creation, including num.partitions and default.replication.factor.

There are many other topic-level configurations which you may want to set for topics that are automatically created by Kafka Connect. This is particularly true for connectors which are creating a large number of topics, or where the topic name is not known in advance (e.g. when using a regex to select objects from the source system) and thus cannot be pre-created with the desired settings. Common settings for a topic that you may want to customise include cleanup.policy, min.insync.replicas, and compression.type.

KIP-158 was implemented in Apache Kafka 2.6 (available with Confluent Platform 6.0), and adds the ability to set topic-level configurations for topics created by Kafka Connect source connectors.

Just the defaults, ma’am.

Here’s a very simply Kafka Connect source connector, reading data in from a file:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" \
     http://localhost:8083/connectors/source-txt-file-00/config \
     -d '{
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "key.converter"  : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic"          : "testdata-00",
        "file"           : "/data/test.txt"
        }'

Since the broker is configured to automagically create new topics (auto.create.topics.enable = true), it does so and using the defaults - one partition, replication factor of 1, etc. We can examine this using various tools:

  • kafka-topics

    $ kafka-topics --bootstrap-server broker:29092  --topic testdata-00 --describe
    Topic: testdata-00      PartitionCount: 1       ReplicationFactor: 1    Configs:
            Topic: testdata-00      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
  • kafkacat

    $ kafkacat -b broker:29092 -L -J|jq '.topics[] | select(.topic =="testdata")'
    {
      "topic": "testdata-00",
      "partitions": [
        {
          "partition": 0,
          "leader": 1,
          "replicas": [
            {
              "id": 1
            }
          ],
          "isrs": [
            {
              "id": 1
            }
          ]
        }
      ]
    }

Setting the configuration for auto-created topics

Let’s see how we can use the new options in Apache Kafka 2.6 (Confluent Platform 6.0) to change some of the topic configurations that are set when it’s created.

Attempt 1 … Crashed and burned, Mav

Crashed and burned! Huh

In my sandbox I just have a single broker so I’m going to leave the number of replicas as a sensible setting of 1, but I’m going to change the number of partitions to four, as well as the cleanup policy from its default of delete to instead compact.

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" \
     http://localhost:8083/connectors/source-txt-file-01/config \
     -d '{
        "connector.class"                          : "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "key.converter"                            : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter"                          : "org.apache.kafka.connect.storage.StringConverter",
        "topic"                                    : "testdata-01",
        "file"                                     : "/data/test.txt",
        "topic.creation.default.partitions"        : 4,
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.cleanup.policy"    : "compact"
        }'
If you are setting topic creation overrides you must include replication.factor and partitions even if you’re not specifying a value that’s different from the broker default.

In the broker log you can see that the cleanup.policy configuration has been honoured ({cleanup.policy=compact}):

[2021-01-06 12:03:04,184] INFO Creating topic testdata-01 with configuration {cleanup.policy=compact} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1), 3 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)

🤯 ☠️ 💀 But, alas! The connector fails:

[2021-01-06 12:03:04,346] ERROR WorkerSourceTask{id=source-txt-file-01-0} failed to send record to testdata-01:  (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence will be rejected.

The FileStreamSourceConnector sends records with no key set, which for a compacted topic makes no sense, and hence we get org.apache.kafka.common.InvalidRecordException.

Attempt 2… I don’t know, but uh, it’s looking good so far.

I don’t know

Let’s try a different variation just to prove out the topic configuration:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" \
     http://localhost:8083/connectors/source-txt-file-02/config \
     -d '{
        "connector.class"                          : "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "key.converter"                            : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter"                          : "org.apache.kafka.connect.storage.StringConverter",
        "topic"                                    : "testdata-02",
        "file"                                     : "/data/test.txt",
        "topic.creation.default.partitions"        : 4,
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.compression.type"  : "snappy"
        }'

In the Kafka Connect worker log you can see the settings used (under the covers it’s done through TopicCreationGroup):

[2021-01-06 12:11:29,256] INFO Created topic '(name=testdata-02, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={compression.type=snappy})' using creation group TopicCreationGroup{name='default', inclusionPattern=.*, exclusionPattern=, numPartitions=4, replicationFactor=1, otherConfigs={compression.type=snappy}} (org.apache.kafka.connect.runtime.WorkerSourceTask)

Checking out the topic details we can see it’s as we wanted it - four partitions, and using snappy compression 💥

$ kafka-topics --bootstrap-server broker:29092  --topic testdata-02 --describe

Topic: testdata-02      PartitionCount: 4       ReplicationFactor: 1    Configs: compression.type=snappy
        Topic: testdata-02      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-02      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-02      Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-02      Partition: 3    Leader: 1       Replicas: 1     Isr: 1

Bonus - doing it through ksqlDB

topgun

ksqlDB can be used to create Kafka Connect connectors, either against an existing Kafka Connect cluster or using ksqlDB’s embedded Connect worker. Here’s an example of creating a connector that overrides the min.insync.replicas, partition count, and replication factor for a created topic:

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v0.14.0-rc732, Server v0.14.0-rc732 located at http://ksqldb:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> CREATE SOURCE CONNECTOR SOURCE_TXT_FILE_03 WITH (
        'connector.class'                            = 'org.apache.kafka.connect.file.FileStreamSourceConnector',
        'key.converter'                              = 'org.apache.kafka.connect.storage.StringConverter',
        'value.converter'                            = 'org.apache.kafka.connect.storage.StringConverter',
        'topic'                                      = 'testdata-03',
        'file'                                       = '/data/test.txt',
        'topic.creation.default.partitions'          = 4,
        'topic.creation.default.replication.factor'  = 1,
        'topic.creation.default.min.insync.replicas' = 1
      );

 Message
--------------------------------------
 Created connector SOURCE_TXT_FILE_03
--------------------------------------

ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 testdata-04                           | 4          | 1
-------------------------------------------------------------------------

ksql> PRINT 'testdata-04' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: KAFKA_STRING
rowtime: 2021/01/06 14:09:27.522 Z, key: <null>, value: Hello world!

Topic details:

kafka-topics --bootstrap-server broker:29092  --topic testdata-03 --describe
Topic: testdata-03      PartitionCount: 4       ReplicationFactor: 1    Configs: min.insync.replicas=1
        Topic: testdata-03      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-03      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-03      Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-03      Partition: 3    Leader: 1       Replicas: 1     Isr: 1

Topic Creation Groups

In the example above I used just the default topic creation group, but you can create multiple groups of configuration based on the topic name.

I can see this being really useful if you want to override topic configuration for just some of the topics that a connector creates but not all of them, or you want to override configuration for all topics but vary it by topic based on the topic name.

More examples

Check out the docs page for some nicely documented examples of using this feature further.


Robin Moffatt

Robin Moffatt is a Senior Developer Advocate at Confluent, and an Oracle ACE Director (Alumnus). He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2021