rmoff's random ramblings
about talks

Pivoting Aggregates in Ksql

Published Apr 17, 2019 by in KsqlDB, Data Wrangling at https://rmoff.net/2019/04/17/pivoting-aggregates-in-ksql/

Prompted by a question on StackOverflow, the requirement is to take a series of events related to a common key and for each key output a series of aggregates derived from a changing value in the events. I’ll use the data from the question, based on ticket statuses. Each ticket can go through various stages, and the requirement was to show, per customer, how many tickets are currently at each stage.

Here’s the source data:

Customer Ticket ID Ticket Status
2216 1472 closed
8945 1472 waiting
8945 1472 processing
8945 1472 waiting
8952 1472 new
8952 1472 close-request

By eyeballing the data we can see that for this one customer there are three tickets, in state closed, waiting, close-request and so the desired output is:

Customer Tickets closed Tickets waiting Tickets processing Tickets waiting Tickets new Tickets close request
1472 1 1 0 0 0 1

In RDBMS SQL this would be a fairly trivial PIVOT operation. In KSQL we can achieve the same using the CASE statement which was added in 5.2. Along the way we also need to reason about state vs event stream.


It’s possible to do this by building a table (for state) and then an aggregate on that table.

  1. Set up the test data

     kafkacat -b localhost -t tickets -P <<EOF
     {"ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
     {"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
     {"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
     {"ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
     {"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
     {"ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}
     EOF
    
  2. Preview the topic data

     ksql> PRINT 'tickets' FROM BEGINNING;
     Format:JSON
     {"ROWTIME":1555511270573,"ROWKEY":"null","ID":2216,"CONTACT_ID":1472,"SUBJECT":"Test Bodenbach","STATUS":"closed","TIMESTRING":"2012-11-08 10:34:30.000"}
     {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
     {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"processing","TIMESTRING":"2019-04-16 23:52:08.000"}
     {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8945,"CONTACT_ID":1472,"SUBJECT":"sync-test","STATUS":"waiting","TIMESTRING":"2019-04-17 00:10:38.000"}
     {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"new","TIMESTRING":"2019-04-17 00:11:23.000"}
     {"ROWTIME":1555511270573,"ROWKEY":"null","ID":8952,"CONTACT_ID":1472,"SUBJECT":"another sync ticket","STATUS":"close-request","TIMESTRING":"2019-04-17 00:12:04.000"}
    
  3. Register the stream

     CREATE STREAM TICKETS (ID INT, 
                           CONTACT_ID VARCHAR, 
                           SUBJECT VARCHAR, 
                           STATUS VARCHAR, 
                           TIMESTRING VARCHAR) 
             WITH (KAFKA_TOPIC='tickets', 
             VALUE_FORMAT='JSON');
    
  4. Query the data

     ksql> SET 'auto.offset.reset' = 'earliest';
     ksql> SELECT * FROM TICKETS;
     1555502643806 | null | 2216 | 1472 | Test Bodenbach | closed | 2012-11-08 10:34:30.000
     1555502643806 | null | 8945 | 1472 | sync-test | waiting | 2019-04-16 23:07:01.000
     1555502643806 | null | 8945 | 1472 | sync-test | processing | 2019-04-16 23:52:08.000
     1555502643806 | null | 8945 | 1472 | sync-test | waiting | 2019-04-17 00:10:38.000
     1555502643806 | null | 8952 | 1472 | another sync ticket | new | 2019-04-17 00:11:23.000
     1555502643806 | null | 8952 | 1472 | another sync ticket | close-request | 2019-04-17 00:12:04.000
    
  5. At this point we can use CASE to pivot the aggregates:

     SELECT CONTACT_ID, 
           SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, 
           SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, 
           SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, 
           SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST ,
           SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED
       FROM TICKETS 
       GROUP BY CONTACT_ID;
    
       1472 | 1 | 1 | 2 | 1 | 1
    

    But, you’ll notice that the answer isn’t as expected. This is because we’re counting all six input events.

    Let’s look at a single ticket, ID 8945—this goes through three state changes (waiting -> processing -> waiting) which each get included in the aggregate. We can validate this as follows with a simple predicate:

     SELECT CONTACT_ID, 
           SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, 
           SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, 
           SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, 
           SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST ,
           SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED
       FROM TICKETS 
       WHERE ID=8945
       GROUP BY CONTACT_ID;
    
     1472 | 0 | 1 | 2 | 0 | 0
    
  6. What we actually want is the current state for each ticket. So repartition the data on ticket ID:

     CREATE STREAM TICKETS_BY_ID AS SELECT * FROM TICKETS PARTITION BY ID;
    
     CREATE TABLE TICKETS_TABLE (ID INT, 
                           CONTACT_ID INT, 
                           SUBJECT VARCHAR, 
                           STATUS VARCHAR, 
                           TIMESTRING VARCHAR) 
             WITH (KAFKA_TOPIC='TICKETS_BY_ID', 
             VALUE_FORMAT='JSON',
             KEY='ID');
    
  7. Compare event stream vs current state

    • Event stream (KSQL Stream)

        ksql> SELECT ID, TIMESTRING, STATUS FROM TICKETS;
        2216 | 2012-11-08 10:34:30.000 | closed
        8945 | 2019-04-16 23:07:01.000 | waiting
        8945 | 2019-04-16 23:52:08.000 | processing
        8945 | 2019-04-17 00:10:38.000 | waiting
        8952 | 2019-04-17 00:11:23.000 | new
        8952 | 2019-04-17 00:12:04.000 | close-request
      
    • Current state (KSQL Table)

        ksql> SELECT ID, TIMESTRING, STATUS FROM TICKETS_TABLE;
        2216 | 2012-11-08 10:34:30.000 | closed
        8945 | 2019-04-17 00:10:38.000 | waiting
        8952 | 2019-04-17 00:12:04.000 | close-request
      
  8. We want an aggregate of the table—we want to run the same SUM(CASE…)…GROUP BY trick that we did above, but based on the current state of each ticket, rather than each event:

       SELECT CONTACT_ID, 
           SUM(CASE WHEN STATUS='new' THEN 1 ELSE 0 END) AS TICKETS_NEW, 
           SUM(CASE WHEN STATUS='processing' THEN 1 ELSE 0 END) AS TICKETS_PROCESSING, 
           SUM(CASE WHEN STATUS='waiting' THEN 1 ELSE 0 END) AS TICKETS_WAITING, 
           SUM(CASE WHEN STATUS='close-request' THEN 1 ELSE 0 END) AS TICKETS_CLOSEREQUEST ,
           SUM(CASE WHEN STATUS='closed' THEN 1 ELSE 0 END) AS TICKETS_CLOSED
       FROM TICKETS_TABLE 
       GROUP BY CONTACT_ID;
    

    This gives us what we want:

       1472 | 0 | 0 | 1 | 1 | 1
    
  9. Let’s feed another ticket’s events into the topic and observe how the table’s state changes. Rows from a table are re-emitted when the state changes; you can also cancel the SELECT and rerun it to see the current state only.

    Sample data to try it for yourself:

     {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"new","TIMESTRING":"2019-04-16 23:07:01.000"}
     {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"processing","TIMESTRING":"2019-04-16 23:07:01.000"}
     {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
     {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"processing","TIMESTRING":"2019-04-16 23:07:01.000"}
     {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"waiting","TIMESTRING":"2019-04-16 23:07:01.000"}
     {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"closed","TIMESTRING":"2019-04-16 23:07:01.000"}
     {"ID":8946,"CONTACT_ID":42,"SUBJECT":"","STATUS":"close-request","TIMESTRING":"2019-04-16 23:07:01.000"}
    

If you want to try this out further you can generate an stream of additional dummy data with this from Mockaroo, piped through awk to slow it down so you can see the effect on the generated aggregates as each message arrives:

while [ 1 -eq 1 ]
  do curl -s "https://api.mockaroo.com/api/f2d6c8a0?count=1000&key=ff7856d0" | \
      awk '{print $0;system("sleep 2");}' | \
      kafkacat -b localhost -t tickets -P
  done

Robin Moffatt

Robin Moffatt is a Principal DevEx Engineer at LakeFS. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2023