rmoff's random ramblings
about talks

Primitive Keys in ksqlDB

Published Feb 7, 2020 by in KsqlDB, Debezium, Single Message Transform, Kafka Connect, Kcat (Kafkacat) at https://rmoff.net/2020/02/07/primitive-keys-in-ksqldb/

ksqlDB 0.7 will add support for message keys as primitive data types beyond just STRING (which is all we’ve had to date). That means that Kafka messages are going to be much easier to work with, and require less wrangling to get into the form in which you need them. Take an example of a database table that you’ve ingested into a Kafka topic, and want to join to a stream of events. Previously you’d have had to take the Kafka topic into which the table had been ingested and run a ksqlDB processor to re-key the messages such that ksqlDB could join on them. Friends, I am here to tell you that this is no longer needed!

Note
ksqlDB 0.7 is not yet (7 February 2020) released - my notes here are based on the latest build from master on GitHub. You can find Docker images for the latest build here: https://hub.docker.com/r/rmoff/ksqldb-server

Let’s take the example from above, ingesting data from a database. I’m going to use Debezium (of course), and stream in data from MySQL. Create the connector using the Kafka Connect REST API:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-debezium-mysql-00/config \
    -d '{
    "connector.class" : "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname" : "mysql",
    "database.port" : "3306",
    "database.user" : "debezium",
    "database.password" : "dbz",
    "database.server.id" : "42",
    "database.server.name" : "asgard",
    "table.whitelist" : "demo.customers",
    "database.history.kafka.bootstrap.servers" : "kafka:29092",
    "database.history.kafka.topic" : "dbhistory.demo" ,
    "include.schema.changes" : "false",
    "transforms": "unwrap,extractkey",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.extractkey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractkey.field": "id",
    "key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
    }'

Check it’s running

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort

source  |  source-debezium-mysql-00  |  RUNNING  |  RUNNING  |  io.debezium.connector.mysql.MySqlConnector

Now let’s peel back the covers just a bit. The extremely eagle-eyed amongst you will have noticed that in the connector above I specified:

"key.converter": "org.apache.kafka.connect.converters.IntegerConverter"

This uses the new set of primitive converters that were added in KIP-305 as part of Apache Kafka 2.0. So Debezium will take the primary key of the table (id) and set it as the key of the message, in a struct. I use the ExtractField Single Message Transform (SMT) to lift this out of the struct, and write it as the key of the Kafka message as a signed 32-bit integer with the IntegerConverter converter.

I also use the ExtractNewRecordState SMT to flatten the value part of the message to just the current DB record state.

We can inspect the payload, which shows that things are working as we want them to. I’m using the -s setting to specify the serde for reading the key and value:

kafkacat -b kafka:29092 \
         -t asgard.demo.CUSTOMERS \
         -C \
         -c1 \
         -s key=i \
         -s value=avro \
         -r http://schema-registry:8081
         -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n' 

Topic asgard.demo.CUSTOMERS / Partition 0 / Offset: 2 / Timestamp: 1581069692127
Headers:
Key (4 bytes): 3
Payload (155 bytes): {"id": 3, "first_name": {"string": "Mariejeanne"}, "last_name": {"string": "Cocci"}, "email": {"string": "mcocci2@techcrunch.com"}, "gender": {"string": "Female"}, "club_status": {"string": "bronze"}, "comments": {"string": "Multi-tiered bandwidth-monitored capability"}, "create_ts": {"string": "2020-02-07T09:35:27Z"}, "update_ts": {"string": "2020-02-07T09:35:27Z"}}
--

What are all these arguments for kafkacat?

  • -b is the broker connection, and -t is the topic

  • -C means run as a consumer, and -c1 means just read one message and then exit

  • -f is the format string to use when rendering the message - here we’re showing a ton of useful metadata as well as the key and value themselves.

  • -s tells kafkacat how to deserialise the message’s key and/or value

    • key=i deserialises the key as a signed 32-bit integer

    • value=avro deserialise the value as Avro using the Schema Registry specified in -r to fetch the schema

Use it in ksqlDB 🔗

Note
ksqlDB 0.7 is not yet (7 February 2020) released - my notes here are based on the latest build from master on GitHub. You can find Docker images for the latest build here: https://hub.docker.com/r/rmoff/ksqldb-server

So we’ve got the Kafka topic populated correctly. Now we can declare a table over it, in ksqlDB, using the new ROWKEY … KEY syntax with the appropriate primitive type. Note that at the moment you have to declare the Avro schema explicitly if you are declaring the key’s type.

CREATE TABLE CUSTOMERS (ROWKEY INT KEY, 
                        FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR, GENDER VARCHAR, CLUB_STATUS VARCHAR, COMMENTS VARCHAR, CREATE_TS VARCHAR, UPDATE_TS VARCHAR) 
                WITH    (KAFKA_TOPIC='asgard.demo.CUSTOMERS', 
                        VALUE_FORMAT='AVRO');

Check out the schema - note the INTEGER key:

ksql> DESCRIBE CUSTOMERS;

Name                 : CUSTOMERS
 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | INTEGER          (system)
 FIRST_NAME  | VARCHAR(STRING)
 LAST_NAME   | VARCHAR(STRING)
 EMAIL       | VARCHAR(STRING)
 GENDER      | VARCHAR(STRING)
 CLUB_STATUS | VARCHAR(STRING)
 COMMENTS    | VARCHAR(STRING)
 CREATE_TS   | VARCHAR(STRING)
 UPDATE_TS   | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

Query the TABLE:

ksql> SELECT * FROM CUSTOMERS EMIT CHANGES LIMIT 5;
+----------------+---------+-------------+-----------+------------------------+--------+------------+--------------------------+--------------------------+--------------------------+
|ROWTIME         |ROWKEY   |FIRST_NAME   |LAST_NAME  |EMAIL                   |GENDER  |CLUB_STATUS |COMMENTS                  |CREATE_TS                 |UPDATE_TS                 |
+----------------+---------+-------------+-----------+------------------------+--------+------------+--------------------------+--------------------------+--------------------------+
|1581069692127   |1        |Rica         |Blaisdell  |rblaisdell0@rambler.ru  |Female  |bronze      |Universal optimal hierarch|2020-02-07T09:35:27Z      |2020-02-07T09:35:27Z      |
|                |         |             |           |                        |        |            |y                         |                          |                          |
|1581069692127   |2        |Ruthie       |Brockherst |rbrockherst1@ow.ly      |Female  |platinum    |Reverse-engineered tangibl|2020-02-07T09:35:27Z      |2020-02-07T09:35:27Z      |
|                |         |             |           |                        |        |            |e interface               |                          |                          |
|1581069692127   |3        |Mariejeanne  |Cocci      |mcocci2@techcrunch.com  |Female  |bronze      |Multi-tiered bandwidth-mon|2020-02-07T09:35:27Z      |2020-02-07T09:35:27Z      |
|                |         |             |           |                        |        |            |itored capability         |                          |                          |
|1581069692128   |4        |Hashim       |Rumke      |hrumke3@sohu.com        |Male    |platinum    |Self-enabling 24/7 firmwar|2020-02-07T09:35:27Z      |2020-02-07T09:35:27Z      |
|                |         |             |           |                        |        |            |e                         |                          |                          |
|1581069692128   |5        |Hansiain     |Coda       |hcoda4@senate.gov       |Male    |platinum    |Centralized full-range app|2020-02-07T09:35:27Z      |2020-02-07T09:35:27Z      |
|                |         |             |           |                        |        |            |roach                     |                          |                          |
Limit Reached
Query terminated

Now let’s take a stream of events that have a foreign key (USER_ID) to the customer data above:

ksql> DESCRIBE RATINGS;

Name                 : RATINGS
 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 RATING_ID   | BIGINT
 USER_ID     | INTEGER
 STARS       | INTEGER
 ROUTE_ID    | INTEGER
 RATING_TIME | BIGINT
 CHANNEL     | VARCHAR(STRING)
 MESSAGE     | VARCHAR(STRING)
-----------------------------------------

ksql> SELECT USER_ID, STARS, MESSAGE FROM RATINGS EMIT CHANGES;
+----------+--------+------------------------------------------------------------------------------+
|USER_ID   |STARS   |MESSAGE                                                                       |
+----------+--------+------------------------------------------------------------------------------+
|10        |4       |your team here rocks!                                                         |
|6         |1       |more peanuts please                                                           |
|19        |4       |why is it so difficult to keep the bathrooms clean ?                          |
|18        |3       |Exceeded all my expectations. Thank you !                                     |
|1         |1       |more peanuts please                                                           |
…

Join this stream to the customer data, on the common key:

ksql> SELECT C.FIRST_NAME + ' ' + C.LAST_NAME AS CUSTOMER, 
             R.STARS, 
             R.MESSAGE 
        FROM RATINGS R 
             INNER JOIN CUSTOMERS C 
                ON R.USER_ID = C.ROWKEY 
        EMIT CHANGES;
+----------------+-------+-----------------------------------------------------+
|CUSTOMER        |STARS  |MESSAGE                                              |
+----------------+-------+-----------------------------------------------------+
|Brena Tollerton |4      |your team here rocks!                                |
|Robinet Leheude |1      |more peanuts please                                  |
|Josiah Brockett |4      |why is it so difficult to keep the bathrooms clean ? |
|Waldon Keddey   |3      |Exceeded all my expectations. Thank you !            |
|Rica Blaisdell  |1      |more peanuts please                                  |
…

What if I’m using ksqlDB <0.7 (Confluent Platform <5.5) ? (or if I don’t want to type in the whole value schema if it’s in Avro?) 🔗

The option you’ve got here it to serialise the key as a string, and then in ksqlDB force the foreign key to the same type.

Here’s a new version of the connector, using the StringConverter. Note that it’s still using the ExtractField$Key SMT.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-debezium-mysql-02/config \
    -d '{
    "connector.class" : "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname" : "mysql",
    "database.port" : "3306",
    "database.user" : "debezium",
    "database.password" : "dbz",
    "database.server.id" : "43",
    "database.server.name" : "asgard2",
    "table.whitelist" : "demo.customers",
    "database.history.kafka.bootstrap.servers" : "kafka:29092",
    "database.history.kafka.topic" : "dbhistory.demo" ,
    "include.schema.changes" : "false",
    "transforms": "unwrap,extractkey",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.extractkey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractkey.field": "id",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
    }'

Create a new version of the table - note that we don’t have to enter the full schema :)

ksql> CREATE TABLE CUSTOMERS2 WITH (KAFKA_TOPIC='asgard2.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

 Message
---------------
 Table created
---------------
ksql> DESCRIBE CUSTOMERS2;

Name                 : CUSTOMERS2
 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 ID          | INTEGER
 FIRST_NAME  | VARCHAR(STRING)
 LAST_NAME   | VARCHAR(STRING)
 EMAIL       | VARCHAR(STRING)
 GENDER      | VARCHAR(STRING)
 CLUB_STATUS | VARCHAR(STRING)
 COMMENTS    | VARCHAR(STRING)
 CREATE_TS   | VARCHAR(STRING)
 UPDATE_TS   | VARCHAR(STRING)
-----------------------------------------

Now we workaround the fact that the foreign key USER_ID is an INT on the events we’re joining to but ROWKEY is a STRING on the table (per the DESCRIBE output above) by `CAST`ing the datatype on the left-hand side of the join:

ksql> SELECT C.FIRST_NAME + ' ' + C.LAST_NAME AS CUSTOMER, 
             R.STARS, 
             R.MESSAGE 
        FROM RATINGS R 
             INNER JOIN CUSTOMERS2 C 
                ON CAST(R.USER_ID AS STRING) = C.ROWKEY 
        EMIT CHANGES;
+-----------------+-------+-----------------------------------------------------+
|CUSTOMER         |STARS  |MESSAGE                                              |
+-----------------+-------+-----------------------------------------------------+
|Brena Tollerton  |4      |your team here rocks!                                |
|Robinet Leheude  |1      |more peanuts please                                  |
|Josiah Brockett  |4      |why is it so difficult to keep the bathrooms clean ? |
|Waldon Keddey    |3      |Exceeded all my expectations. Thank you !            |
|Rica Blaisdell   |1      |more peanuts please                                  |
…

But my Single Message Transform doesn’t work… 🔗

With the Debezium connector and ExtractField$Key SMT you might hit this error when you run the connector:

java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)

This is detailed here, but in short you need to make sure that you’ve set in the Debezium connector config:

"include.schema.changes": "false",

Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025