Question from the Confluent Community Slack group:
How can I access the data in object in an array like below using ksqlDB stream
"Total": [ { "TotalType": "Standard", "TotalAmount": 15.99 }, { "TotalType": "Old Standard", "TotalAmount": 16, " STID":56 } ]
Let’s take a look at this using using ksqlDB 0.9 (latest version as of May 2020). First, spin up a ksqlDB environment using this Docker Compose.
Send the sample message to a Kafka topic, first wrapping it in curly braces to make it valid JSON
docker exec -i kafkacat kafkacat \
        -b kafka:29092 -P \
        -t my_topic <<EOF
{ "Total": [ { "TotalType": "Standard", "TotalAmount": 15.99 }, { "TotalType": "Old Standard", "TotalAmount": 16, "STID": 56 } ] }
EOFFire up ksqlDB CLI
$ ksql htp://ksqldb:8088
                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.9.0, Server v0.9.0 located at http://ksqldb:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>Set the offset to the earliest so that we’re querying all the data in the topic
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.Model the input data as a ksqlDB stream:
ksql> CREATE STREAM my_stream (TOTAL ARRAY<STRUCT<TotalType   VARCHAR, 
                                                  TotalAmount VARCHAR, 
                                                  STID        VARCHAR>>) 
                         WITH (KAFKA_TOPIC='my_topic', 
                               VALUE_FORMAT='JSON');
 Message
----------------
 Stream created
----------------Play with the data:
- 
Select specific array entry
NoteksqlDB arrays are 1-based ksql> SELECT TOTAL[1] FROM my_stream EMIT CHANGES LIMIT 1; +-------------------------------------------------------------------------------------------------------------------------------------------+ |KSQL_COL_0 | +-------------------------------------------------------------------------------------------------------------------------------------------+ |{TOTALTYPE=Standard, TOTALAMOUNT=15.99, STID=null} | - 
Select nested array element
ksql> SELECT TOTAL[1]->TotalType, TOTAL[1]->totalamount FROM my_stream EMIT CHANGES; +--------------------------------------------------------------------+--------------------------------------------------------------------+ |TOTALTYPE |TOTALAMOUNT | +--------------------------------------------------------------------+--------------------------------------------------------------------+ |Standard |15.99 | - 
Explode the array
ksql> SELECT EXPLODE(TOTAL) FROM my_stream EMIT CHANGES; +-------------------------------------------------------------------------------------------------------------------------------------------+ |KSQL_COL_0 | +-------------------------------------------------------------------------------------------------------------------------------------------+ |{TOTALTYPE=Standard, TOTALAMOUNT=15.99, STID=null} | |{TOTALTYPE=Old Standard, TOTALAMOUNT=16, STID=56} | - 
Explode, un-nest, and change the field names of the resulting fields
ksql> SELECT EXPLODE(TOTAL)->TOTALTYPE AS TOTAL_TYPE, EXPLODE(TOTAL)->TOTALAMOUNT AS TOTALAMOUNT, EXPLODE(TOTAL)->STID AS STID FROM my_stream EMIT CHANGES; +---------------------------------------------+---------------------------------------------+---------------------------------------------+ |TOTAL_TYPE |TOTALAMOUNT |STID | +---------------------------------------------+---------------------------------------------+---------------------------------------------+ |Standard |15.99 |null | |Old Standard |16 |56 | 
Persist this to a new stream (backed by a Kafka topic):
ksql> CREATE STREAM new_stream AS
         SELECT EXPLODE(TOTAL)->TOTALTYPE AS TOTAL_TYPE, 
                EXPLODE(TOTAL)->TOTALAMOUNT AS TOTALAMOUNT, 
                EXPLODE(TOTAL)->STID AS STID 
            FROM my_stream EMIT CHANGES;
 Message
-----------------------------------------
 Created query with ID CSAS_NEW_STREAM_0
-----------------------------------------
ksql>ksql> SHOW TOPICS;
 Kafka Topic | Partitions | Partition Replicas
-----------------------------------------------
 NEW_STREAM  | 1          | 1
 my_topic    | 1          | 1
-----------------------------------------------
ksql> PRINT NEW_STREAM FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2020/05/26 08:58:35.691 Z, key: <null>, value: {"TOTAL_TYPE":"Standard","TOTALAMOUNT":"15.99","STID":null}
rowtime: 2020/05/26 08:58:35.691 Z, key: <null>, value: {"TOTAL_TYPE":"Old Standard","TOTALAMOUNT":"16","STID":"56"}