rmoff's random ramblings
about talks

Setting key value when piping from jq to kafkacat

Published Sep 30, 2020 by in Jq, Kcat (Kafkacat) at https://rmoff.net/2020/09/30/setting-key-value-when-piping-from-jq-to-kafkacat/

One of my favourite hacks for getting data into Kafka is using kafkacat and stdin, often from jq. You can see this in action with Wi-Fi data, IoT data, and data from a REST endpoint. This is fine for getting values into a Kafka message - but Kafka messages are key/value, and being able to specify a key is can often be important.

Here’s a way to do that, using a separator and some jq magic. Note that at the moment kafkacat only supports single byte separator characters, so you need to choose carefully. If you pick a separator that also appears in your data, it’s possibly going to have unintended consequences.

Here’s a simple payload

[{
  "orderId": "X94",
  "orderTotal": "42",
  "productName": "ACME yak shaver"
},
{
  "orderId": "X95",
  "orderTotal": "2",
  "productName": "ACME TNT"
}]

We want to stream this onto a Kafka topic as one message per object in the array, rather than multiple lines of input per object (or one single long line of the entire array). Let’s break this down and see how to do that. We’ll start by running it through jq with the --compact-output flag (or -c) to put each object on a single line, and a jq filter of [] to explode the array:

echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
jq --compact-output '.[]'
{"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
{"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}

and now we can pipe it to Kafka:

echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
jq --compact-output '.[]' | \
kafkacat -b localhost:9092 -t orders01 -P

Consuming it back we can see this has worked — but that the keys are currently null:

kafkacat -b localhost:9092 \
         -t orders01 \
         -C \
         -f 'Key: %k, payload: %s\n'
Key: , payload: {"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
Key: , payload: {"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}

You may be fine with a null key - but often you will want one, whether to ensure that data for a particular instance of an entity always routes to the same partition, or just because it’s useful data to have in the message key for when it comes to process it (e.g. with ksqlDB).

So let’s set the key now. We have two key (!) ingredients to the method:

  1. We’re going to set the -K parameter in kafkacat to declare the key/value delineator. This can be a straightforward printable character (such as :), but what if your key value includes that character? Mayhem would ensue. Instead we can use a character that we would hope not to find in our actual key value - just make sure that it’s a single byte (so fancy stuff like §, and emojis are out ☠️ 😿 ☠️ ).

    To specify a non-printable character you can use the bash syntax of $'\x00' to specify the hex value of the character - in this case 00, which is a NULL. I’ve chosen to use \x1c in the example below.

  2. We’re going to use the same character in the jq filter and get jq to concatenate it with the field that we want to use as the key and prefix it to the full payload value that we had originally. To pass in the character value without quote mark and escaping hell we can set it as a variable at invocation using the arg parameter.

    • We also set the --raw-output flag so that the string output doesn’t get written as a JSON string by jq - this is important for it to work as the subsequent stdin

Our jq invocation now looks like this:

echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
jq --compact-output \
   --raw-output \
   --arg sep $'\x1c' \
   '.[] | [.orderId + $sep, tostring] | join("")'
X94{"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
X95{"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}

The outpuut shows out key value has been correctly prepended - but where’s our separator? That’s the joy of non-printable characters :) We can run it through hexdump to prove that it is there:

echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
    jq --compact-output \
    --raw-output \
    --arg sep $'\x1c' \
    '.[] | [.orderId + $sep, tostring] | join("")' | \
   hexdump -C
There's the separator character |
                   ||------------
                   ||
                   VV
00000000  58 39 34 1c 7b 22 6f 72  64 65 72 49 64 22 3a 22  |X94.{"orderId":"|
00000010  58 39 34 22 2c 22 6f 72  64 65 72 54 6f 74 61 6c  |X94","orderTotal|
00000020  22 3a 22 34 32 22 2c 22  70 72 6f 64 75 63 74 4e  |":"42","productN|
00000030  61 6d 65 22 3a 22 41 43  4d 45 20 79 61 6b 20 73  |ame":"ACME yak s|
00000040  68 61 76 65 72 22 7d 0a  58 39 35 1c 7b 22 6f 72  |haver"}.X95.{"or|
00000050  64 65 72 49 64 22 3a 22  58 39 35 22 2c 22 6f 72  |derId":"X95","or|
00000060  64 65 72 54 6f 74 61 6c  22 3a 22 32 22 2c 22 70  |derTotal":"2","p|
00000070  72 6f 64 75 63 74 4e 61  6d 65 22 3a 22 41 43 4d  |roductName":"ACM|
00000080  45 20 54 4e 54 22 7d 0a                           |E TNT"}.|
00000088

So, let’s hook this up to kafkacat:

echo '[{ "orderId": "X94", "orderTotal": "42", "productName": "ACME yak shaver" }, { "orderId": "X95", "orderTotal": "2", "productName": "ACME TNT" }]' | \
    jq --compact-output \
    --raw-output \
    --arg sep $'\x1c' \
    '.[] | [.orderId + $sep, tostring] | join("")' | \
   kafkacat -b localhost:9092 -t orders02 -K$'\x1c' -P

Let’s see what the data now looks like on the topic:

kafkacat -b localhost:9092 \
         -t orders02 \
         -C \
         -f 'Key: %k, payload: %s\n'
Key: X94, payload: {"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
Key: X95, payload: {"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}

We can double check with ksqlDB too — there’s our keys:

ksql> PRINT orders02 FROM BEGINNING LIMIT 2;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/09/30 20:35:00.646 Z, key: X94, value: {"orderId":"X94","orderTotal":"42","productName":"ACME yak shaver"}
rowtime: 2020/09/30 20:35:00.646 Z, key: X95, value: {"orderId":"X95","orderTotal":"2","productName":"ACME TNT"}

via GIPHY

Footnote - what is that jq filter doing? 🔗

Kinda unintelligible, right?

'.[] | [.orderId + $sep, tostring] | join("")'

Let’s check it out.

This is the actual filter that we want to use with the data.
We're using [] to explode the array. If you want a noop then just
use . on its own
 |
 |   Now we pipe it to the next section
 |   |
 |   |                      |- This forces the object from the
 |   |                      |  previous section to a string
 V   V                      V
'.[] | [.orderId + $sep, tostring] | join("")'
            ^        ^                 ^
            |        |--------         |--- Joins the array that the [   ] created
This is the field that       |              so that the output is on a single line
we want to use as the     This is the separator
message key               character variable,
                          set in the --arg paramter

Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025