rmoff's random ramblings
about talks

Exploring ksqlDB window start time

Published Jan 9, 2020 by in KsqlDB, Windowing, Aggregation at https://rmoff.net/2020/01/09/exploring-ksqldb-window-start-time/

Prompted by a question on StackOverflow I had a bit of a dig into how windows behave in ksqlDB, specifically with regards to their start time. This article shows also how to create test data in ksqlDB and create data to be handled with a timestamp in the past.

For a general background to windowing in ksqlDB see the excellent docs.

The nice thing about recent releases of ksqlDB/KSQL is that you can create and populate streams directly with CREATE STREAM and INSERT INTO respectively. Much as I love kafkacat, being able to build a whole example within the ksqlDB CLI is very useful.

Create the stream

This creates the stream as well as the underlying topic. Since the topic doesn’t exist already I’ve specified its name and also PARITIONS - without these ksqlDB won’t create it for me.

CREATE STREAM SOURCE_DATA (OP_TS BIGINT, CUSTOMER VARCHAR, COST INT) 
  WITH (KAFKA_TOPIC='MY_DATA', 
        VALUE_FORMAT='AVRO', 
        PARTITIONS=1, 
        TIMESTAMP='OP_TS');

Note that I’ve created OP_TS to hold the timestamp as an epoch (hence BIGINT) and indicated to ksqlDB that this column is to be used as the timestamp for the records when doing any time-based processing. By default ksqlDB will use the timestamp of the Kafka message.

Populate the stream

Using https://www.epochconverter.com/ for ease I came up with a handful of times within the past year, and inserted messages into the stream for these:

INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1549715863000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1560083863000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1574339863000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1575981463000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1576931863000, 'A',1);
INSERT INTO SOURCE_DATA (OP_TS, CUSTOMER, COST) VALUES (1578573463000, 'A',1);

Query the stream

Here’s how the data looks. Note a few things:

  1. The use of TIMESTAMPTOSTRING to make the milliseconds-since-epoch more readable

  2. The UNIX_TIMESTAMP function is used to do some date maths to show how long ago from now the timestamp is.

  3. ROWTIME and OP_TS match, because that’s what we told ksqlDB with the WITH TIMESTAMP clause in the CREATE STREAM. If we hadn’t, then the ROWTIME would just be the time at which the rows were `INSERT`ed above.

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS ROWTIME_STR,
             TIMESTAMPTOSTRING(OP_TS,'yyyy-MM-dd HH:mm:ss','Europe/London')   AS OP_TS, 
             (UNIX_TIMESTAMP()-OP_TS) / 1000 / 60 / 60 / 24 AS DAYS_DIFF , 
             CUSTOMER, 
             COST 
        FROM SOURCE_DATA 
        EMIT CHANGES;

+--------------------+--------------------+----------+-----------+-----+
|ROWTIME_STR         |OP_TS               |DAYS_DIFF |CUSTOMER   |COST |
+--------------------+--------------------+----------+-----------+-----+
|2019-02-09 12:37:43 |2019-02-09 12:37:43 |334       |A          |1    |
|2019-06-09 13:37:43 |2019-06-09 13:37:43 |214       |A          |1    |
|2019-11-21 12:37:43 |2019-11-21 12:37:43 |49        |A          |1    |
|2019-12-10 12:37:43 |2019-12-10 12:37:43 |30        |A          |1    |
|2019-12-21 12:37:43 |2019-12-21 12:37:43 |19        |A          |1    |
|2020-01-09 12:37:43 |2020-01-09 12:37:43 |0         |A          |1    |

Tumbling window examples

Here’s the output of tumbling windows of various sizes.

  • 7 Days

    ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START_TS, 
                 CUSTOMER, 
                 SUM(COST) 
            FROM SOURCE_DATA 
                    WINDOW TUMBLING (SIZE 7 DAYS) 
            GROUP BY CUSTOMER 
            EMIT CHANGES ;
    
    +----------------------+----------+------------+
    |WINDOW_START_TS       |CUSTOMER  |KSQL_COL_2  |
    +----------------------+----------+------------+
    |2019-02-07 00:00:00   |A         |1           |
    |2019-06-06 01:00:00   |A         |1           |
    |2019-11-21 00:00:00   |A         |1           |
    |2019-12-05 00:00:00   |A         |1           |
    |2019-12-19 00:00:00   |A         |1           |
    |2020-01-09 00:00:00   |A         |1           |

    This is pretty much what we’d expect to see

  • 31 days

    I was hoping for a month, but ksqlDB only supports:

    Caused by: line 1:160: mismatched input 'MONTH' expecting {'DAY', 'HOUR',
            'MINUTE', 'SECOND', 'MILLISECOND', 'DAYS', 'HOURS', 'MINUTES', 'SECONDS',
            'MILLISECONDS'}

    Hence here’s 31 days instead:

    ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START_TS, 
                 CUSTOMER, 
                 SUM(COST) 
            FROM SOURCE_DATA 
                    WINDOW TUMBLING (SIZE 31 DAYS) 
            GROUP BY CUSTOMER 
            EMIT CHANGES ;    
    
    +-----------------------+----------+------------+
    |WINDOW_START_TS        |CUSTOMER  |KSQL_COL_2  |
    +-----------------------+----------+------------+
    |2019-01-22 00:00:00    |A         |1           |
    |2019-05-26 01:00:00    |A         |1           |
    |2019-10-28 00:00:00    |A         |1           |
    |2019-11-28 00:00:00    |A         |2           |
    |2019-12-29 00:00:00    |A         |1           |

    Note that two of the values (for 2019-12-10, 2019-12-21) fall within the same window (starting 2019-11-28)

  • 365 days

    As noted above ksqlDB support DAYS as the largest unit of time, so I’ll have to approximate 1 year ~~ 365 days

    ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START_TS, 
                 CUSTOMER, 
                 SUM(COST) 
            FROM SOURCE_DATA 
                    WINDOW TUMBLING (SIZE 365 DAYS) 
            GROUP BY CUSTOMER 
            EMIT CHANGES ;    
    
    +-----------------------+----------+------------+
    |WINDOW_START_TS        |CUSTOMER  |KSQL_COL_2  |
    +-----------------------+----------+------------+
    |2018-12-20 00:00:00    |A         |4           |
    |2019-12-20 00:00:00    |A         |2           |

    So this is where it gets interesting - looking back on the query output above you can see we only have data within the last year, but for a tumbling window of 365 days we’re getting two values, starting on December 20th of two consecutive years.

Window start date

When calculating an time window within a day the window starts at midnight. For a window greater than a day, it seems that the window date from which the window end date is calculated is based on Unix time (which also ties in with window sizes less than a day starting a midnight).

Let’s double-check this assumption. Unix time starts at 1st January 1970 00:00:00. Taking our window size of 365 days, this is:

days

365

hours

8760

minutes

525600

seconds

31536000

millseconds

31536000000

Now let’s look at the epoch returned by WINDOWSTART():

ksql> SELECT WINDOWSTART() AS WINDOW_START_EPOCH,
             CUSTOMER,
             SUM(COST)
        FROM SOURCE_DATA
                WINDOW TUMBLING (SIZE 365 DAYS)
        GROUP BY CUSTOMER
        EMIT CHANGES ;

+--------------------+---------+-----------+
|WINDOW_START_EPOCH  |CUSTOMER |KSQL_COL_2 |
+--------------------+---------+-----------+
|1545264000000       |A        |4          |
|1576800000000       |A        |2          |

The first of these is 1545264000000. What do we get if we divide this by the number of milliseconds in a 365-day window (31536000000)? We find that it fits exactly: 1545264000000➗31536000000 = 49.

This suggests that if you want to build monthly or yearly aggregates in ksqlDB that start based on the Gregorian calendar markers, ksqlDB will need to add support for MONTH and YEAR as window sizes (tracking in issue #1968).


Robin Moffatt

Robin Moffatt is a Principal DevEx Engineer at LakeFS. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2023