rmoff's random ramblings
about talks

A poor man’s KSQL EXPLODE/UNNEST technique

Published May 9, 2019 by in KsqlDB at https://rmoff.net/2019/05/09/a-poor-mans-ksql-explode/unnest-technique/

There is an open issue for support of EXPLODE/UNNEST functionality in KSQL, and if you need it then do up-vote the issue. Here I detail a hacky, but effective, workaround for exploding arrays into multiple messages—so long as you know the upper-bound on your array.

Populate some test data into a Kafka topic

$ curl "https://api.mockaroo.com/api/440970e0?count=5&key=ff7856d0" | \
    kafkacat -P -b localhost -t car_data_01

Examine the data in KSQL:

ksql> PRINT 'car_data_01' FROM BEGINNING;
Format:JSON
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1533200557","car":"Oldsmobile","value":[68.93,53.58]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1548442477","car":"Mercury","value":[60.09,69.07,63.77,63.13]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1544928225","car":"Volkswagen","value":[59.77,6.94,97.7,30.86,16.9]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1545383393","car":"Nissan","value":[13.32]}
{"ROWTIME":1557392065412,"ROWKEY":"null","timestamp":"1552825010","car":"Hyundai","value":[12.92]}

Note the value element is an array:

"value":[68.93,53.58]

Create a stream over the data:

CREATE STREAM CAR_DATA (timestamp BIGINT, CAR VARCHAR, VALUE ARRAY<DOUBLE>) WITH (KAFKA_TOPIC='car_data_01', VALUE_FORMAT='JSON');
ksql> SELECT TIMESTAMP, CAR, VALUE[0], VALUE[1], VALUE[2] FROM CAR_DATA;
1533200557 | Oldsmobile | 68.93 | 53.58 | null
1548442477 | Mercury | 60.09 | 69.07 | 63.77
1544928225 | Volkswagen | 59.77 | 6.94 | 97.7
1545383393 | Nissan | 13.32 | null | null
1552825010 | Hyundai | 12.92 | null | null

Create the output stream, to start with just containing the zero-index elements of the array:

CREATE STREAM CAR_DATA_EXPLODED AS SELECT TIMESTAMP, CAR, 'Sensor 00' AS SOURCE, VALUE[0] AS VALUE FROM CAR_DATA WHERE VALUE[0] IS NOT NULL;
ksql> SELECT * FROM CAR_DATA_EXPLODED;
1557392065409 | null | 1544928225 | Volkswagen | Sensor 00 | 59.77
1557392065409 | null | 1545383393 | Nissan | Sensor 00 | 13.32
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 00 | 68.93
1557392065412 | null | 1552825010 | Hyundai | Sensor 00 | 12.92
1557392065409 | null | 1548442477 | Mercury | Sensor 00 | 60.09

Insert the remaining array indices to the new stream:

CREATE STREAM CAR_DATA_EXPLODED_00 AS SELECT TIMESTAMP, CAR, 'Sensor 00' AS SOURCE, VALUE[0] AS VALUE FROM CAR_DATA
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 01' AS SOURCE, VALUE[1] AS VALUE FROM CAR_DATA WHERE  VALUE[1] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 02' AS SOURCE, VALUE[2] AS VALUE FROM CAR_DATA WHERE  VALUE[2] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 03' AS SOURCE, VALUE[3] AS VALUE FROM CAR_DATA WHERE  VALUE[3] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 04' AS SOURCE, VALUE[4] AS VALUE FROM CAR_DATA WHERE  VALUE[4] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 05' AS SOURCE, VALUE[5] AS VALUE FROM CAR_DATA WHERE  VALUE[5] IS NOT NULL;

Examine the exploded data:

ksql> SELECT * FROM CAR_DATA_EXPLODED_00;
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 00 | 68.93
1557392065409 | null | 1545383393 | Nissan | Sensor 00 | 13.32
1557392065409 | null | 1544928225 | Volkswagen | Sensor 00 | 59.77
1557392065409 | null | 1548442477 | Mercury | Sensor 02 | 63.77
1557392065409 | null | 1544928225 | Volkswagen | Sensor 03 | 30.86
1557392065409 | null | 1544928225 | Volkswagen | Sensor 04 | 16.9
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 01 | 53.58
1557392065409 | null | 1544928225 | Volkswagen | Sensor 02 | 97.7
1557392065412 | null | 1552825010 | Hyundai | Sensor 00 | 12.92
1557392065409 | null | 1548442477 | Mercury | Sensor 01 | 69.07
1557392065409 | null | 1548442477 | Mercury | Sensor 00 | 60.09
1557392065409 | null | 1544928225 | Volkswagen | Sensor 01 | 6.94
1557392065409 | null | 1548442477 | Mercury | Sensor 03 | 63.13
ksql> SELECT * FROM CAR_DATA_EXPLODED_00 WHERE SOURCE='Sensor 03';
1557392065409 | null | 1544928225 | Volkswagen | Sensor 03 | 30.86
1557392065409 | null | 1548442477 | Mercury | Sensor 03 | 63.13

Remember that this is a workaround - if you actually need this functionality then do be sure to upvote the open issue for support of EXPLODE/UNNEST functionality in KSQL.


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025