Creating topics with Kafka Connect

Published by in Kafka Connect at https://rmoff.net/2021/01/06/creating-topics-with-kafka-connect/

When Kafka Connect ingests data from a source system into Kafka it writes it to a topic. If you have set auto.create.topics.enable = true on your broker then the topic will be created when written to. If auto.create.topics.enable = false (as it is on Confluent Cloud and many self-managed environments, for good reasons) then you can tell Kafka Connect to create those topics first. This was added in Apache Kafka 2.6 (Confluent Platform 6.0) - prior to that you had to manually create the topics yourself otherwise the connector would fail.

Configuring Kafka Connect to create topics 🔗

Kafka Connect (as of Apache Kafka 2.6) ships with a new worker configuration, topic.creation.enable which is set to true by default. So long as this is set, you can then specify the defaults for new topics to be created by a connector in the connector configuration:

[]
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
[]
Without these two settings present in the connector configuration, Kafka Connect will not create the target topic for you.

Defining custom topic properties with Kafka Connect source connectors 🔗

When topics are created they are done so using the defaults configured on the broker for topic creation including num.partitions and default.replication.factor. There are many other topic-level configurations which you may want to set for topics that are automatically created by Kafka Connect. This is particularly true for connectors which are creating a large number of topics, or where the topic name is not known in advance (e.g. when using a regex to select objects from the source system) and thus cannot be pre-created with the desired settings. Common settings for a topic that you may want to customise include cleanup.policy, min.insync.replicas, and compression.type.

KIP-158 was implemented in Apache Kafka 2.6 (available with Confluent Platform 6.0), and adds the ability to customise topic-level configurations for topics created by Kafka Connect source connectors.

Just the defaults, ma’am. 🔗

Here’s a very simply Kafka Connect source connector, reading data in from a file:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" \
     http://localhost:8083/connectors/source-txt-file-00/config \
     -d '{
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "key.converter"  : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic"          : "testdata-00",
        "file"           : "/data/test.txt"
        }'

Since the broker is configured to automagically create new topics (auto.create.topics.enable = true), it does so and using the defaults - one partition, replication factor of 1, etc. We can examine this using various tools:

  • kafka-topics

    $ kafka-topics --bootstrap-server broker:29092  --topic testdata-00 --describe
    Topic: testdata-00      PartitionCount: 1       ReplicationFactor: 1    Configs:
            Topic: testdata-00      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
  • kafkacat

    $ kafkacat -b broker:29092 -L -J|jq '.topics[] | select(.topic =="testdata")'
    {
      "topic": "testdata-00",
      "partitions": [
        {
          "partition": 0,
          "leader": 1,
          "replicas": [
            {
              "id": 1
            }
          ],
          "isrs": [
            {
              "id": 1
            }
          ]
        }
      ]
    }

Setting the configuration for auto-created topics 🔗

Let’s see how we can use the new options in Apache Kafka 2.6 (Confluent Platform 6.0) to change some of the topic configurations that are set when it’s created.

Attempt 1 … Crashed and burned, Mav 🔗

Crashed and burned! Huh

In my sandbox I just have a single broker so I’m going to leave the number of replicas as a sensible setting of 1, but I’m going to change the number of partitions to four, as well as the cleanup policy from its default of delete to instead compact.

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" \
     http://localhost:8083/connectors/source-txt-file-01/config \
     -d '{
        "connector.class"                          : "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "key.converter"                            : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter"                          : "org.apache.kafka.connect.storage.StringConverter",
        "topic"                                    : "testdata-01",
        "file"                                     : "/data/test.txt",
        "topic.creation.default.partitions"        : 4,
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.cleanup.policy"    : "compact"
        }'
If you are setting topic creation overrides you must include replication.factor and partitions even if you’re not specifying a value that’s different from the broker default.

In the broker log you can see that the cleanup.policy configuration has been honoured ({cleanup.policy=compact}):

[2021-01-06 12:03:04,184] INFO Creating topic testdata-01 with configuration {cleanup.policy=compact} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1), 3 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)

🤯 ☠️ 💀 But, alas! The connector fails:

[2021-01-06 12:03:04,346] ERROR WorkerSourceTask{id=source-txt-file-01-0} failed to send record to testdata-01:  (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence will be rejected.

The FileStreamSourceConnector sends records with no key set, which for a compacted topic makes no sense, and hence we get org.apache.kafka.common.InvalidRecordException.

Attempt 2… I don’t know, but uh, it’s looking good so far. 🔗

I don’t know

Let’s try a different variation just to prove out the topic configuration:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" \
     http://localhost:8083/connectors/source-txt-file-02/config \
     -d '{
        "connector.class"                          : "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "key.converter"                            : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter"                          : "org.apache.kafka.connect.storage.StringConverter",
        "topic"                                    : "testdata-02",
        "file"                                     : "/data/test.txt",
        "topic.creation.default.partitions"        : 4,
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.compression.type"  : "snappy"
        }'

In the Kafka Connect worker log you can see the settings used (under the covers it’s done through TopicCreationGroup):

[2021-01-06 12:11:29,256] INFO Created topic '(name=testdata-02, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={compression.type=snappy})' using creation group TopicCreationGroup{name='default', inclusionPattern=.*, exclusionPattern=, numPartitions=4, replicationFactor=1, otherConfigs={compression.type=snappy}} (org.apache.kafka.connect.runtime.WorkerSourceTask)

Checking out the topic details we can see it’s as we wanted it - four partitions, and using snappy compression 💥

$ kafka-topics --bootstrap-server broker:29092  --topic testdata-02 --describe

Topic: testdata-02      PartitionCount: 4       ReplicationFactor: 1    Configs: compression.type=snappy
        Topic: testdata-02      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-02      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-02      Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-02      Partition: 3    Leader: 1       Replicas: 1     Isr: 1

Bonus - doing it through ksqlDB 🔗

topgun

ksqlDB can be used to create Kafka Connect connectors, either against an existing Kafka Connect cluster or using ksqlDB’s embedded Connect worker. Here’s an example of creating a connector that overrides the min.insync.replicas, partition count, and replication factor for a created topic:

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v0.14.0-rc732, Server v0.14.0-rc732 located at http://ksqldb:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> CREATE SOURCE CONNECTOR SOURCE_TXT_FILE_03 WITH (
        'connector.class'                            = 'org.apache.kafka.connect.file.FileStreamSourceConnector',
        'key.converter'                              = 'org.apache.kafka.connect.storage.StringConverter',
        'value.converter'                            = 'org.apache.kafka.connect.storage.StringConverter',
        'topic'                                      = 'testdata-03',
        'file'                                       = '/data/test.txt',
        'topic.creation.default.partitions'          = 4,
        'topic.creation.default.replication.factor'  = 1,
        'topic.creation.default.min.insync.replicas' = 1
      );

 Message
--------------------------------------
 Created connector SOURCE_TXT_FILE_03
--------------------------------------

ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 testdata-04                           | 4          | 1
-------------------------------------------------------------------------

ksql> PRINT 'testdata-04' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: KAFKA_STRING
rowtime: 2021/01/06 14:09:27.522 Z, key: <null>, value: Hello world!

Topic details:

kafka-topics --bootstrap-server broker:29092  --topic testdata-03 --describe
Topic: testdata-03      PartitionCount: 4       ReplicationFactor: 1    Configs: min.insync.replicas=1
        Topic: testdata-03      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-03      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-03      Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-03      Partition: 3    Leader: 1       Replicas: 1     Isr: 1

Topic Creation Groups 🔗

In the example above I used just the default topic creation group, but you can create multiple groups of configuration based on the topic name.

I can see this being really useful if you want to override topic configuration for just some of the topics that a connector creates but not all of them, or you want to override configuration for all topics but vary it by topic based on the topic name.

More examples 🔗

Check out the docs page for some nicely documented examples of using this feature further.

TABLE OF CONTENTS