rmoff's random ramblings
about talks

Kafka Connect JDBC Sink - setting the key field name

Published Feb 25, 2020 by in Kafka Connect, JDBC Sink at https://rmoff.net/2020/02/25/kafka-connect-jdbc-sink-setting-the-key-field-name/

I wanted to get some data from a Kafka topic:

ksql> PRINT PERSON_STATS FROM BEGINNING;
Key format: KAFKA (STRING)
Value format: AVRO
rowtime: 2/25/20 1:12:51 PM UTC, key: robin, value: {"PERSON": "robin",
 "LOCATION_CHANGES":1, "UNIQUE_LOCATIONS": 1}

into Postgres, so did the easy thing and used Kafka Connect with the JDBC Sink connector.

I wanted to use UPSERT behaviour based on the key of the Kafka message (as shown above, robin), so set pk.mode = record_key. Unfortunately this didn’t work and errored out with:

Need exactly one PK column defined since the key schema for records is a primitive type, defined columns are: []

Hmmmmm, I was puzzled. How can I specify a field name for something that’s the message key?

Turns out that if you’ve got a primitive field, you need to specify the field name that is to be created on the target table.

So this works:

…
pk.mode = record_key
pk.fields = 'PERSON'
…

Which then creates a table in the target database like this:

postgres=# \d "PERSON_STATS"
                Table "public.PERSON_STATS"
      Column      |  Type  | Collation | Nullable | Default
------------------+--------+-----------+----------+---------
 PERSON           | text   |           | not null |
 LOCATION_CHANGES | bigint |           |          |
 UNIQUE_LOCATIONS | bigint |           |          |
Indexes:
    "PERSON_STATS_pkey" PRIMARY KEY, btree ("PERSON")

with data that updates in place as changes are made to the topic in Kafka


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025