rmoff's random ramblings
about talks

Exploring KSQL Stream-Stream Joins

Published Mar 28, 2019 by in KsqlDB, Stream Processing at https://rmoff.net/2019/03/28/exploring-ksql-stream-stream-joins/

Introduction

What can you use stream-stream joins for? Can you use them to join between a stream of orders and stream of related shipments to do useful things? What’s not supported in KSQL, where are the cracks?

Test data

via Mockaroo:

curl -s "https://api.mockaroo.com/api/86f89de0?count=500&key=ff7856d0" | \
  kafkacat -P -b localhost -t orders

curl -s "https://api.mockaroo.com/api/b410d0b0?count=500&key=ff7856d0" | \
  kafkacat -P -b localhost -t shipments

Model the streams in KSQL

Note setting the timestamp to reflect the time of the event (ORDER_TS and SHIPMENT_TS respectively)

CREATE STREAM ORDERS (ORDER_ID INT, 
                      CUSTOMER_ID INT, 
                      ORDER_TS VARCHAR, 
                      ORDER_TOTAL_USD DOUBLE, 
                      MAKE VARCHAR) 
  WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON', 
        TIMESTAMP='ORDER_TS', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX');

CREATE STREAM SHIPMENTS (SHIPMENT_ID VARCHAR, 
                         ORDER_ID INT, 
                         SHIPMENT_PROVIDER VARCHAR, 
                         SHIPMENT_TS VARCHAR) 
  WITH (KAFKA_TOPIC='shipments', VALUE_FORMAT='JSON', 
  TIMESTAMP='SHIPMENT_TS', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX');

SET 'auto.offset.reset' = 'earliest';

Simple aggregations (no joins yet)

  • How many orders have been placed per hour? What was the maximum and average order value?

    CREATE TABLE ORDERS_AGG AS 
      SELECT WINDOWSTART() AS WINDOW_START_TS, 
             MAKE, 
             COUNT(*) AS ORDER_COUNT, 
             MAX(ORDER_TOTAL_USD) AS MAX_ORDER_VALUE_USD,
             SUM(ORDER_TOTAL_USD) AS TOTAL_ORDER_VALUE_USD,
             SUM(ORDER_TOTAL_USD)/COUNT(*) AS AVG_ORDER_VALUE_USD
        FROM ORDERS 
               WINDOW TUMBLING (SIZE 1 HOUR) 
    GROUP BY MAKE;
    SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss'), 
           MAKE, 
           ORDER_COUNT,
           MAX_ORDER_VALUE_USD 
      FROM ORDERS_AGG;
    
    2019-03-24 16:00:00 | Ford | 8 | 193828.28
    2019-03-24 12:00:00 | Ford | 7 | 171296.8
    2019-03-24 09:00:00 | Ford | 4 | 181811.96
    2019-03-24 04:00:00 | Audi | 9 | 184161.1
    2019-03-24 05:00:00 | BMW | 1 | 130305.35
    2019-03-24 19:00:00 | BMW | 1 | 200194.86
  • How many shipments per hour?

    CREATE TABLE SHIPMENTS_AGG AS 
      SELECT WINDOWSTART() AS WINDOW_START_TS, 
             SHIPMENT_PROVIDER, 
             COUNT(*) AS SHIPMENT_COUNT
        FROM SHIPMENTS 
               WINDOW TUMBLING (SIZE 1 HOUR) 
    GROUP BY SHIPMENT_PROVIDER;
    SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss'), 
           SHIPMENT_PROVIDER, 
           SHIPMENT_COUNT 
      FROM SHIPMENTS_AGG;
    
    2019-03-26 01:00:00 | ups | 2
    2019-03-26 04:00:00 | dhl | 1
    2019-03-26 13:00:00 | dhl | 6
    2019-03-26 11:00:00 | ups | 3
    2019-03-26 16:00:00 | dhl | 1

Match orders to shipments

Notes about the SQL:

  1. The timestamp of the generated message is set to that of the order. By default KSQL will set the timestamp to that of the message that triggers the join, which here could be an order or shipment.

  2. The data is written as Avro, rather than the source JSON

  3. A column is created using CASE that denotes if the order has shipped, which can be useful for subsequent filtering or aggregations

  4. The time difference between order and shipment is calculated and stored as a minutes value

  5. The join is a LEFT OUTER, meaning that records must exist on the left of the join (ORDERS) and if no matching record on the right (SHIPMENTS) is found the value will be returned as NULL.

  6. Because this is a stream-stream join it needs to specify a time window across which to evaluate the condition. By setting it to (0 SECONDS, 7 DAYS) any shipment up to seven days after the order will be matched. No shipments occurring before the order will be matched.

CREATE STREAM ORDER_SHIPMENTS WITH (TIMESTAMP='ORDER_TS', VALUE_FORMAT='AVRO') AS 
  SELECT O.ROWTIME AS ORDER_TS, 
         S.ROWTIME AS SHIPMENT_TS, 
         O.ORDER_ID AS ORDER_ID, 
         O.MAKE AS MAKE,
         O.ORDER_TOTAL_USD AS ORDER_TOTAL_USD,
         S.SHIPMENT_ID AS SHIPMENT_ID, 
         S.SHIPMENT_PROVIDER AS SHIPMENT_PROVIDER, 
         (S.ROWTIME - O.ROWTIME)/1000/60 AS LEADTIME_MINUTES, 
         CASE WHEN S.SHIPMENT_ID IS NULL THEN 0 ELSE 1 END AS SHIPPED_IND
    FROM ORDERS O 
          LEFT OUTER JOIN SHIPMENTS S 
            WITHIN (0 SECONDS, 7 DAYS) 
            ON O.ORDER_ID=S.ORDER_ID;

From this new stream we can answer questions such as what was the lead time? That is, the time difference between the order being placed and it shipping.

SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'), 
       TIMESTAMPTOSTRING(SHIPMENT_TS,'yyyy-MM-dd HH:mm:ss'), 
       ORDER_ID, 
       MAKE, 
       SHIPMENT_PROVIDER, 
       LEADTIME_MINUTES 
  FROM ORDER_SHIPMENTS 
WHERE  ORDER_ID=329
  AND  SHIPPED_IND=1;

2019-03-24 09:41:13 | 2019-03-26 05:42:37 | 329 | BMW | ups | 2641

(2641 minutes = 44 hours, 1 minute)

What we can’t answer yet from this stream is "Has an order shipped?". It may seem obvious to simply query it for where no match to a shipment was made (e.g. WHERE SHIPMENT_ID IS NULL) or use the derived SHIPPED_IND column (which uses the same logic), but this won’t work. Why? Because we are working with a stream of events. Consider the order above, ID 329:

SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
       ORDER_ID,
       MAKE,
       SHIPPED_IND,
       SHIPMENT_ID 
  FROM ORDER_SHIPMENTS
WHERE  ORDER_ID=329;

2019-03-24 09:41:13 | 329 | BMW | 0 | null
2019-03-24 09:41:13 | 329 | BMW | 1 | ship-wv85258

There are two events:

  1. The order is placed. No shipment matches

  2. The shipment is made. It matches to the order and a new event is written.

So if we simply query the stream for a "not shipped" condition we’ll just pull back the initial Order record:

SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
       ORDER_ID,
       MAKE,
       SHIPPED_IND,
       SHIPMENT_ID
  FROM ORDER_SHIPMENTS
 WHERE ORDER_ID=329 
   AND SHIPPED_IND=0;

2019-03-24 09:41:13 | 329 | BMW | 0 | null

Stream/Table duality in action

Let’s take the stream of events relating to a given key (ORDER_ID) and treat it as a table—what’s the current state for the given key? Here we register a KSQL table on top of the Kafka topic to which the previous join query is writing.

CREATE TABLE ORDER_SHIPMENTS_T 
  WITH (KAFKA_TOPIC='ORDER_SHIPMENTS', 
        VALUE_FORMAT='AVRO',
        KEY='ORDER_ID'); 

Now we can run the same query as above, but this time we get the current state for any order:

SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
       ORDER_ID,
       MAKE,
       SHIPPED_IND,
       SHIPMENT_ID 
  FROM ORDER_SHIPMENTS_T
WHERE  ORDER_ID=329;

2019-03-24 09:41:13 | 329 | BMW | 1 | ship-wv85258

Just one row returned; the current state of the record. Since the table maintains the current state it means that we can accurately query it for orders that have not shipped:

SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
       ORDER_ID,
       MAKE,
       SHIPPED_IND,
       SHIPMENT_ID
  FROM ORDER_SHIPMENTS_T
 WHERE SHIPPED_IND=0;

2019-03-24 16:27:56 | 7 | Audi | 0 | null
2019-03-24 05:54:43 | 39 | Audi | 0 | null
2019-03-24 23:20:37 | 53 | Audi | 0 | null

Build a windowed aggregate on a table? Not yet.

Note
non-windowed aggregates are possible; just not windowed aggregates.

Taking the above state provided by the KSQL table, we’d like to know things like :

  • By hour, how many orders are there outstanding (that have not shipped)?

However, doing an aggregation on a table is not currently possible in KSQL:

ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss'),
>        MAKE,
>        COUNT(*) AS ORDERS_PLACED,
>        SUM(SHIPPED_IND) AS SHIPPED_ORDERS,
>        COUNT(*) - SUM(SHIPPED_IND) AS OUTSTANDING_ORDERS
>  FROM ORDER_SHIPMENTS_T
>        WINDOW TUMBLING (SIZE 1 HOUR) 
> GROUP BY MAKE;
Windowing not supported for table aggregations.
  • https://github.com/confluentinc/ksql/issues/778

But what about the order count? It would also be useful to have it in the same query as the count of shipments, rather than individual queries against ORDERS_AGG and SHIPMENTS_AGG we saw earlier. Can’t we just get that from the ORDER_SHIPMENTS stream?

Let’s try the aggregate just for ORDER_ID=329, so that we know there is only a single order:

SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss'), 
        MAKE, 
        COUNT(*) AS ORDERS_PLACED, 
        SUM(SHIPPED_IND) AS SHIPPED_ORDERS
  FROM ORDER_SHIPMENTS
        WINDOW TUMBLING (SIZE 1 HOUR)
 WHERE ORDER_ID=329
 GROUP BY MAKE;

2019-03-24 09:00:00 | BMW | 2 | 1

Why’s ORDERS_PLACED==2? Because it’s a COUNT(*) of all matching rows, which is indeed 2:

SELECT TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'),
       ORDER_ID,
       MAKE,
       SHIPPED_IND,
       SHIPMENT_ID 
  FROM ORDER_SHIPMENTS
WHERE  ORDER_ID=329;

2019-03-24 09:41:13 | 329 | BMW | 0 | null
2019-03-24 09:41:13 | 329 | BMW | 1 | ship-wv85258

Useful aggregations that we can do

Even though we can’t do an aggregate against the state, we can still do some very useful stream processing against the stream of events themselves.

CREATE TABLE SHIPPED_ORDERS_AGG AS 
  SELECT WINDOWSTART() AS WINDOW_START_TS, 
         MAKE, 
         COUNT(*) AS SHIPPED_ORDERS, 
         MAX(LEADTIME_MINUTES) AS MAX_LEADTIME_MINUTES, 
         SUM(LEADTIME_MINUTES) / COUNT(*) AS AVG_LEADTIME_MINUTES 
    FROM ORDER_SHIPMENTS 
           WINDOW TUMBLING (SIZE 1 HOUR) 
  WHERE SHIPPED_IND=1 
GROUP BY MAKE;
  • By hour, how many orders have shipped

  • By hour, what was the max leadtime?

  • By hour, what was the average leadtime?

SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss'), 
       MAKE, 
       SHIPPED_ORDERS, 
       MAX_LEADTIME_MINUTES, 
       AVG_LEADTIME_MINUTES 
  FROM SHIPPED_ORDERS_AGG;

2019-03-24 09:00:00 | Audi | 8 | 3397 | 2939
2019-03-24 20:00:00 | Audi | 9 | 3022 | 2266
2019-03-24 22:00:00 | BMW  | 1 | 2317 | 2317
2019-03-24 21:00:00 | BMW  | 3 | 2534 | 2121

ALERT! We breached our SLA 😫

As well as creating aggregates, we can set thresholds and monitor for any orders that breach an SLA in terms of the leadtime.

CREATE STREAM ORDERS_BREACHED_LEADTIME_SLA AS
  SELECT ORDER_ID,
         SHIPMENT_ID, 
         MAKE, 
         SHIPMENT_PROVIDER, 
         ORDER_TS,
         SHIPMENT_TS,
         LEADTIME_MINUTES
    FROM ORDER_SHIPMENTS
  WHERE  LEADTIME_MINUTES > 4100;

Now we have a KSQL stream (and thus Kafka topic) which lists an orders that took longer than the defined threshold to ship. This topic can be used for driving both dashboards and applications directly that need to respond to this breach.

SELECT ORDER_ID,
        SHIPMENT_ID, 
        MAKE, 
        SHIPMENT_PROVIDER, 
        TIMESTAMPTOSTRING(ORDER_TS,'yyyy-MM-dd HH:mm:ss'), 
        TIMESTAMPTOSTRING(SHIPMENT_TS,'yyyy-MM-dd HH:mm:ss'), 
        LEADTIME_MINUTES
  FROM ORDERS_BREACHED_LEADTIME_SLA;

14 | ship-og22112 | Audi | hermes | 2019-03-24 03:15:52 | 2019-03-26 23:46:19 | 4110
315 | ship-yp90671 | Ford | dhl | 2019-03-24 00:11:42 | 2019-03-26 21:03:43 | 4132

Try it out?

Use this Docker Compose.


Robin Moffatt

Robin Moffatt is a Principal DevEx Engineer at LakeFS. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2023