rmoff's random ramblings
about talks

Creating topics with Kafka Connect

Published Jan 6, 2021 by in Kafka Connect at https://rmoff.net/2021/01/06/creating-topics-with-kafka-connect/

When Kafka Connect ingests data from a source system into Kafka it writes it to a topic. If you have set auto.create.topics.enable = true on your broker then the topic will be created when written to. If auto.create.topics.enable = false (as it is on Confluent Cloud and many self-managed environments, for good reasons) then you can tell Kafka Connect to create those topics first. This was added in Apache Kafka 2.6 (Confluent Platform 6.0) - prior to that you had to manually create the topics yourself otherwise the connector would fail.

Configuring Kafka Connect to create topics 🔗

Kafka Connect (as of Apache Kafka 2.6) ships with a new worker configuration, topic.creation.enable which is set to true by default. So long as this is set, you can then specify the defaults for new topics to be created by a connector in the connector configuration:

[…]
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
[…]
Without these two settings present in the connector configuration, Kafka Connect will not create the target topic for you.

Defining custom topic properties with Kafka Connect source connectors 🔗

When topics are created they are done so using the defaults configured on the broker for topic creation including num.partitions and default.replication.factor. There are many other topic-level configurations which you may want to set for topics that are automatically created by Kafka Connect. This is particularly true for connectors which are creating a large number of topics, or where the topic name is not known in advance (e.g. when using a regex to select objects from the source system) and thus cannot be pre-created with the desired settings. Common settings for a topic that you may want to customise include cleanup.policy, min.insync.replicas, and compression.type.

KIP-158 was implemented in Apache Kafka 2.6 (available with Confluent Platform 6.0), and adds the ability to customise topic-level configurations for topics created by Kafka Connect source connectors.

Just the defaults, ma’am. 🔗

Here’s a very simply Kafka Connect source connector, reading data in from a file:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" \
     http://localhost:8083/connectors/source-txt-file-00/config \
     -d '{
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "key.converter"  : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic"          : "testdata-00",
        "file"           : "/data/test.txt"
        }'

Since the broker is configured to automagically create new topics (auto.create.topics.enable = true), it does so and using the defaults - one partition, replication factor of 1, etc. We can examine this using various tools:

  • kafka-topics

    $ kafka-topics --bootstrap-server broker:29092  --topic testdata-00 --describe
    Topic: testdata-00      PartitionCount: 1       ReplicationFactor: 1    Configs:
            Topic: testdata-00      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
  • kafkacat

    $ kafkacat -b broker:29092 -L -J|jq '.topics[] | select(.topic =="testdata")'
    {
      "topic": "testdata-00",
      "partitions": [
        {
          "partition": 0,
          "leader": 1,
          "replicas": [
            {
              "id": 1
            }
          ],
          "isrs": [
            {
              "id": 1
            }
          ]
        }
      ]
    }

Setting the configuration for auto-created topics 🔗

Let’s see how we can use the new options in Apache Kafka 2.6 (Confluent Platform 6.0) to change some of the topic configurations that are set when it’s created.

Attempt 1 … Crashed and burned, Mav 🔗

Crashed and burned! Huh

In my sandbox I just have a single broker so I’m going to leave the number of replicas as a sensible setting of 1, but I’m going to change the number of partitions to four, as well as the cleanup policy from its default of delete to instead compact.

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" \
     http://localhost:8083/connectors/source-txt-file-01/config \
     -d '{
        "connector.class"                          : "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "key.converter"                            : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter"                          : "org.apache.kafka.connect.storage.StringConverter",
        "topic"                                    : "testdata-01",
        "file"                                     : "/data/test.txt",
        "topic.creation.default.partitions"        : 4,
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.cleanup.policy"    : "compact"
        }'
If you are setting topic creation overrides you must include replication.factor and partitions even if you’re not specifying a value that’s different from the broker default.

In the broker log you can see that the cleanup.policy configuration has been honoured ({cleanup.policy=compact}):

[2021-01-06 12:03:04,184] INFO Creating topic testdata-01 with configuration {cleanup.policy=compact} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1), 3 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)

🤯 ☠️ 💀 But, alas! The connector fails:

[2021-01-06 12:03:04,346] ERROR WorkerSourceTask{id=source-txt-file-01-0} failed to send record to testdata-01:  (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence will be rejected.

The FileStreamSourceConnector sends records with no key set, which for a compacted topic makes no sense, and hence we get org.apache.kafka.common.InvalidRecordException.

Attempt 2… I don’t know, but uh, it’s looking good so far. 🔗

I don’t know

Let’s try a different variation just to prove out the topic configuration:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" \
     http://localhost:8083/connectors/source-txt-file-02/config \
     -d '{
        "connector.class"                          : "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "key.converter"                            : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter"                          : "org.apache.kafka.connect.storage.StringConverter",
        "topic"                                    : "testdata-02",
        "file"                                     : "/data/test.txt",
        "topic.creation.default.partitions"        : 4,
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.compression.type"  : "snappy"
        }'

In the Kafka Connect worker log you can see the settings used (under the covers it’s done through TopicCreationGroup):

[2021-01-06 12:11:29,256] INFO Created topic '(name=testdata-02, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={compression.type=snappy})' using creation group TopicCreationGroup{name='default', inclusionPattern=.*, exclusionPattern=, numPartitions=4, replicationFactor=1, otherConfigs={compression.type=snappy}} (org.apache.kafka.connect.runtime.WorkerSourceTask)

Checking out the topic details we can see it’s as we wanted it - four partitions, and using snappy compression 💥

$ kafka-topics --bootstrap-server broker:29092  --topic testdata-02 --describe

Topic: testdata-02      PartitionCount: 4       ReplicationFactor: 1    Configs: compression.type=snappy
        Topic: testdata-02      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-02      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-02      Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-02      Partition: 3    Leader: 1       Replicas: 1     Isr: 1

Bonus - doing it through ksqlDB 🔗

topgun

ksqlDB can be used to create Kafka Connect connectors, either against an existing Kafka Connect cluster or using ksqlDB’s embedded Connect worker. Here’s an example of creating a connector that overrides the min.insync.replicas, partition count, and replication factor for a created topic:

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v0.14.0-rc732, Server v0.14.0-rc732 located at http://ksqldb:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> CREATE SOURCE CONNECTOR SOURCE_TXT_FILE_03 WITH (
        'connector.class'                            = 'org.apache.kafka.connect.file.FileStreamSourceConnector',
        'key.converter'                              = 'org.apache.kafka.connect.storage.StringConverter',
        'value.converter'                            = 'org.apache.kafka.connect.storage.StringConverter',
        'topic'                                      = 'testdata-03',
        'file'                                       = '/data/test.txt',
        'topic.creation.default.partitions'          = 4,
        'topic.creation.default.replication.factor'  = 1,
        'topic.creation.default.min.insync.replicas' = 1
      );

 Message
--------------------------------------
 Created connector SOURCE_TXT_FILE_03
--------------------------------------

ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 testdata-04                           | 4          | 1
-------------------------------------------------------------------------

ksql> PRINT 'testdata-04' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: KAFKA_STRING
rowtime: 2021/01/06 14:09:27.522 Z, key: <null>, value: Hello world!

Topic details:

kafka-topics --bootstrap-server broker:29092  --topic testdata-03 --describe
Topic: testdata-03      PartitionCount: 4       ReplicationFactor: 1    Configs: min.insync.replicas=1
        Topic: testdata-03      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-03      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-03      Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: testdata-03      Partition: 3    Leader: 1       Replicas: 1     Isr: 1

Topic Creation Groups 🔗

In the example above I used just the default topic creation group, but you can create multiple groups of configuration based on the topic name.

I can see this being really useful if you want to override topic configuration for just some of the topics that a connector creates but not all of them, or you want to override configuration for all topics but vary it by topic based on the topic name.

More examples 🔗

Check out the docs page for some nicely documented examples of using this feature further.


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025