rmoff's random ramblings
about talks

ksqlDB - How to model a variable number of fields in a nested value (STRUCT)

Published Oct 7, 2020 by in KsqlDB at https://rmoff.net/2020/10/07/ksqldb-how-to-model-a-variable-number-of-fields-in-a-nested-value-struct/

There was a good question on StackOverflow recently in which someone was struggling to find the appropriate ksqlDB DDL to model a source topic in which there was a variable number of fields in a STRUCT.

This was their example:

{
  "device" : "REDACTED",
  "context" : {
    "2" : "25",
    "3" : "0",
    "4" : "60",
    "1" : "REDACTED"
  }
}
{
  "device" : "REDACTED",
  "context" : {
    "2" : "2020-10-07T07:02:48.0000000Z",
    "1" : "REDACTED"
  }
}

So context looks like a STRUCT - but when they tried that they found that ksqlDB wouldn’t return the data unless all four fields were included in the payload, and since it was variable, this wouldn’t work.

The Answer - MAP 🔗

You need to use a MAP rather than a STRUCT.

Here’s a working example using ksqlDB 0.12.

  • Load the sample data into a topic

    kafkacat -b localhost:9092 -P -t events <<EOF
    { "device" : "REDACTED", "event" : "403151", "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z", "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z", "occurrenceCount" : 1, "receiveTime" : "2020-09-30T11:03:50.000Z", "persistTime" : "2020-09-30T14:32:59.580Z", "state" : "open", "context" : { "2" : "25", "3" : "0", "4" : "60", "1" : "REDACTED" } }
    { "device" : "REDACTED", "event" : "402004", "firstOccurrenceTime" : "2020-10-07T07:02:48Z", "lastOccurrenceTime" : "2020-10-07T07:02:48Z", "occurrenceCount" : 1, "receiveTime" : "2020-10-07T07:02:48Z", "persistTime" : "2020-10-07T07:15:28.533Z", "state" : "open", "context" : { "2" : "2020-10-07T07:02:48.0000000Z", "1" : "REDACTED" } }
    EOF
  • In ksqlDB, declare the stream:

    CREATE STREAM "events" (
        "device" VARCHAR,
        "event" VARCHAR,
        "firstOccurenceTime" VARCHAR,
        "lastOccurenceTime" VARCHAR,
        "occurenceCount" INTEGER,
        "receiveTime" VARCHAR,
        "persistTime" VARCHAR,
        "state" VARCHAR,
        "context" MAP < VARCHAR, VARCHAR >
    ) WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT = 'JSON');
  • Query the stream to check things work:

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    
    ksql> SELECT "device", "event", "receiveTime", "state", "context" FROM "events" EMIT CHANGES;
    +----------+--------+--------------------------+--------+------------------------------------+
    |device    |event   |receiveTime               |state   |context                             |
    +----------+--------+--------------------------+--------+------------------------------------+
    |REDACTED  |403151  |2020-09-30T11:03:50.000Z  |open    |{1=REDACTED, 2=25, 3=0, 4=60}       |
    |REDACTED  |402004  |2020-10-07T07:02:48Z      |open    |{1=REDACTED, 2=2020-10-07T07:02:48.0|
    |          |        |                          |        |000000Z}                            |
  • Use the [''] syntax to access specific keys within the map:

    ksql> SELECT "device", "event", "context", "context"['1'] AS CONTEXT_1, "context"['3'] AS CONTEXT_3 FROM "events" EMIT CHANGES;
    +-----------+--------+------------------------------------+-----------+-----------+
    |device     |event   |context                             |CONTEXT_1  |CONTEXT_3  |
    +-----------+--------+------------------------------------+-----------+-----------+
    |REDACTED   |403151  |{1=REDACTED, 2=25, 3=0, 4=60}       |REDACTED   |0          |
    |REDACTED   |402004  |{1=REDACTED, 2=2020-10-07T07:02:48.0|REDACTED   |null       |
    |           |        |000000Z}                            |           |           |

Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025