When Kafka Connect ingests data from a source system into Kafka it writes it to a topic. Assuming you have set auto.create.topics.enable = true
then it will create these topics for you, and do so using the defaults configured on the broker for topic creation, including num.partitions
and default.replication.factor
.
There are many other topic-level configurations which you may want to set for topics that are automatically created by Kafka Connect. This is particularly true for connectors which are creating a large number of topics, or where the topic name is not known in advance (e.g. when using a regex to select objects from the source system) and thus cannot be pre-created with the desired settings. Common settings for a topic that you may want to customise include cleanup.policy, min.insync.replicas, and compression.type.
KIP-158 was implemented in Apache Kafka 2.6 (available with Confluent Platform 6.0), and adds the ability to set topic-level configurations for topics created by Kafka Connect source connectors.
Just the defaults, ma’am.
Here’s a very simply Kafka Connect source connector, reading data in from a file:
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-txt-file-00/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"topic" : "testdata-00",
"file" : "/data/test.txt"
}'
Since the broker is configured to automagically create new topics (auto.create.topics.enable = true
), it does so and using the defaults - one partition, replication factor of 1, etc. We can examine this using various tools:
-
kafka-topics
$ kafka-topics --bootstrap-server broker:29092 --topic testdata-00 --describe Topic: testdata-00 PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: testdata-00 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
-
kafkacat
$ kafkacat -b broker:29092 -L -J|jq '.topics[] | select(.topic =="testdata")'
{ "topic": "testdata-00", "partitions": [ { "partition": 0, "leader": 1, "replicas": [ { "id": 1 } ], "isrs": [ { "id": 1 } ] } ] }
Setting the configuration for auto-created topics
Let’s see how we can use the new options in Apache Kafka 2.6 (Confluent Platform 6.0) to change some of the topic configurations that are set when it’s created.
Attempt 1 … Crashed and burned, Mav
In my sandbox I just have a single broker so I’m going to leave the number of replicas as a sensible setting of 1, but I’m going to change the number of partitions to four, as well as the cleanup policy from its default of delete
to instead compact
.
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-txt-file-01/config \
-d '{
"connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topic" : "testdata-01",
"file" : "/data/test.txt",
"topic.creation.default.partitions" : 4,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.cleanup.policy" : "compact"
}'
If you are setting topic creation overrides you must include replication.factor and partitions even if you’re not specifying a value that’s different from the broker default.
|
In the broker log you can see that the cleanup.policy
configuration has been honoured ({cleanup.policy=compact}
):
[2021-01-06 12:03:04,184] INFO Creating topic testdata-01 with configuration {cleanup.policy=compact} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1), 3 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
🤯 ☠️ 💀 But, alas! The connector fails:
[2021-01-06 12:03:04,346] ERROR WorkerSourceTask{id=source-txt-file-01-0} failed to send record to testdata-01: (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence will be rejected.
The FileStreamSourceConnector
sends records with no key set, which for a compacted topic makes no sense, and hence we get org.apache.kafka.common.InvalidRecordException
.
Attempt 2… I don’t know, but uh, it’s looking good so far.
Let’s try a different variation just to prove out the topic configuration:
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-txt-file-02/config \
-d '{
"connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topic" : "testdata-02",
"file" : "/data/test.txt",
"topic.creation.default.partitions" : 4,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.compression.type" : "snappy"
}'
In the Kafka Connect worker log you can see the settings used (under the covers it’s done through TopicCreationGroup
):
[2021-01-06 12:11:29,256] INFO Created topic '(name=testdata-02, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={compression.type=snappy})' using creation group TopicCreationGroup{name='default', inclusionPattern=.*, exclusionPattern=, numPartitions=4, replicationFactor=1, otherConfigs={compression.type=snappy}} (org.apache.kafka.connect.runtime.WorkerSourceTask)
Checking out the topic details we can see it’s as we wanted it - four partitions, and using snappy compression 💥
$ kafka-topics --bootstrap-server broker:29092 --topic testdata-02 --describe
Topic: testdata-02 PartitionCount: 4 ReplicationFactor: 1 Configs: compression.type=snappy
Topic: testdata-02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-02 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-02 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-02 Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Bonus - doing it through ksqlDB
ksqlDB can be used to create Kafka Connect connectors, either against an existing Kafka Connect cluster or using ksqlDB’s embedded Connect worker. Here’s an example of creating a connector that overrides the min.insync.replicas
, partition count, and replication factor for a created topic:
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.14.0-rc732, Server v0.14.0-rc732 located at http://ksqldb:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql> CREATE SOURCE CONNECTOR SOURCE_TXT_FILE_03 WITH (
'connector.class' = 'org.apache.kafka.connect.file.FileStreamSourceConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'topic' = 'testdata-03',
'file' = '/data/test.txt',
'topic.creation.default.partitions' = 4,
'topic.creation.default.replication.factor' = 1,
'topic.creation.default.min.insync.replicas' = 1
);
Message
--------------------------------------
Created connector SOURCE_TXT_FILE_03
--------------------------------------
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------------
testdata-04 | 4 | 1
-------------------------------------------------------------------------
ksql> PRINT 'testdata-04' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: KAFKA_STRING
rowtime: 2021/01/06 14:09:27.522 Z, key: <null>, value: Hello world!
Topic details:
kafka-topics --bootstrap-server broker:29092 --topic testdata-03 --describe
Topic: testdata-03 PartitionCount: 4 ReplicationFactor: 1 Configs: min.insync.replicas=1
Topic: testdata-03 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-03 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-03 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: testdata-03 Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic Creation Groups
In the example above I used just the default
topic creation group, but you can create multiple groups of configuration based on the topic name.
I can see this being really useful if you want to override topic configuration for just some of the topics that a connector creates but not all of them, or you want to override configuration for all topics but vary it by topic based on the topic name.
More examples
Check out the docs page for some nicely documented examples of using this feature further.