In this blog post Iβm going to explore how as a data engineer in the field today I might go about putting together a rudimentary data pipeline. Iβll take some operational data, and wrangle it into a form that makes it easily pliable for analytics work.
After a somewhat fevered and nightmarish period during which people walked around declaring "Schema on Read" was the future, that "Data is the new oil", and "Look at the size of my big data", the path that is history in IT is somewhat coming back on itself to a more sensible approach to things.
As they say:
Whatβs old is new
This is good news for me, because I am old and what I knew then is 'new' now ;)
Overview π
The Data π
this uses Environment Agency flood and river level data from the real-time data API (Beta) |
If youβve been following this blog closely youβll have seen some of my noodlings around with this data already. I wrote about exploring it with DuckDB and Rill, using it as an excuse to try out the new DuckDB UI, as well as loading it into Kafka and figuring out what working with it in Flink SQL would look like.
At the heart of the data are readings, providing information about measures such as rainfall and river levels. These are reported from a variety of stations around the UK.
The data is available on a public REST API (try it out here to see the current river level at one of the stations in Sheffield).
The Plan π
Iβm going to make heavy use of DuckDB for this project. It can read data from REST APIs, it can process data, and it can store data. Whatβs more, it can be queried with various visualisation tools including Rill Data, Superset, and Metabase, as weβll see later.
Once loaded, the dimension data will be brute-force rebuilt based on the latest values. For those who like this kind of thing (and who doesnβt), this is in effect Slowly-Changing Dimension (SCD) type 1. With SCD Type 1, no history is retained. This means that if a measure or station is removed, associated readings previously recorded will not have a match on the corresponding dimension table. If itβs updated, historical readings will be shown with the dimension data as-is now, not as-was then.
The readings
fact data weβll collect in a fact table that will mostly be appended to with each incremental load.
Itβs not entirely that simple though:
-
Some stations may lag in reporting their data, so we might pull duplicates (i.e. the reading for the last time period when it did report)
-
Some stations may batch their reporting, so we need to handle polling back over a period of time and dealing with the resulting duplicates for readings that had been reported
In addition, historical data is available and we want to be able to include that too.
Once weβve got our fact and dimension data sorted, weβll join it into a denormalised table that we can build analytics against.
The Environment π
Iβm running this all locally on my Mac. The first step is to install DuckDB and a few other useful tools:
brew install duckdb httpie jq
Then fire up DuckDB with its notebook-like UI (you donβt have to use the UI; you can use whatever interface you want):
duckdb env-agency.duckdb -ui
Extract (with just a little bit of transform) π
The basic ingest looks like this:
CREATE OR REPLACE TABLE readings_stg AS
SELECT * FROM read_json('https://environment.data.gov.uk/flood-monitoring/data/readings?latest')
DuckDB automagically determines the schema of the table. Weβre going to do one bit of processing at this stage too.
By default all the API calls return a payload made up of metadata and then an array of the actual data. I decided to explode out the array at this point of ingest just to make things a bit easier.
At this point Iβm throwing away the @context and meta data elements; you may decide you want to keep them.
|
CREATE OR REPLACE TABLE readings_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/data/readings?latest'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
CREATE OR REPLACE TABLE measures_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/measures'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
CREATE OR REPLACE TABLE stations_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/stations'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
Letβs see what data weβve pulled in:
SELECT 'readings_stg' AS table_name, COUNT(*) as row_ct, min(dateTime) as min_dateTime, max(dateTime) as max_dateTime FROM readings_stg
UNION ALL
SELECT 'measures_stg' AS table_name, COUNT(*) as row_ct,NULL,NULL from measures_stg
UNION ALL
SELECT 'stations_stg' AS table_name, COUNT(*) ,NULL,NULL from stations_stg;
table_name row_ct min_dateTime max_dateTime
readings_stg 5272 2025-02-21 13:45:00 2025-03-20 13:25:10
measures_stg 6638
stations_stg 5400
The latest dateTime
value looks right (itβs 2025-03-20 13:42:45 as I write this) but why is the minimum nearly a month ago?
This is where the DuckDB UIβs data viz comes in useful:
What this shows us is that almost all the data is for the latest timestamp, with just a handful of readings for other dates.
We can prove this out with a SQL query too:
SELECT dateTime, COUNT(*) AS reading_count
FROM readings_stg
GROUP BY dateTime
ORDER BY 2 desc, 1 desc;
Transform π
Keys π
The staging tables have no keys defined, because YOLO right? Well no. Staging is where we bring in the source data, warts and all. A station shouldnβt have more than one instance, but who says thatβs the case?
Rather than failing the ingest because of a logical data error, itβs our job to work with what weβve got. That means coding defensively and ensuring that whilst weβll accept anything into the staging area, we donβt blindly propagate crap through the rest of the pipeline.
One of the ways to enforce this is constraints, of which primary keys are an example.
ReadingsβMeasures π
Unchanged, the data in readings
relates to measures
on the readings.measure
column:
http://environment.data.gov.uk/flood-monitoring/id/measures/5312TH-level-stage-i-15_min-mASD
On measures
the @id
column matches this:
http://environment.data.gov.uk/flood-monitoring/id/measures/5312TH-level-stage-i-15_min-mASD
But this is duplicated in the notation
column, minus the http://environment.data.gov.uk/flood-monitoring/id/measures/
URL prefix:
5312TH-level-stage-i-15_min-mASD
Weβll pre-process the readings.measure
column to strip this prefix to make the join easier (and simpler to debug, since youβre not wading through columns of long text).
MeasuresβStations π
The station for which a reading was taken is found via the measure, since measures are unique to a station.
On measures
the station
column is the foreign key:
http://environment.data.gov.uk/flood-monitoring/id/stations/SP50_72
Again, the URL prefix (http://environment.data.gov.uk/flood-monitoring/id/stations/
) is repeated and weβll strip that out.
One thing that caught me out here is that the station
(minus the URL prefix) and the stationReference
are almost always the same.
Almost always.
I spent a bunch of time chasing down duplicates after the subsequent join to the fact table resulted in a fan-out, because stationReference
isnβt unique.
SELECT stationReference, station
FROM measures
WHERE station!=stationReference
ORDER BY stationReference;
stationReference station
4063TH 4063TH-southern
4063TH 4063TH-thames
E22300 E22300-anglian
E22300 E22300-southern
E22300 E22300-southern
[β¦]
26 rows out of 6612β¦enough to cause plenty of trouble when I made assumptions about the data I was eyeballing and missed the 0.4% exceptionsβ¦
It does state it clearly in the API doc; station
is the foreign key, not stationReference
.
RTFM, always ;)
Dimension tables π
Building the dimension tables is simple, if crude, enough.
With a CREATE OR REPLACE
we tell DuckDB to go ahead and create the table, and if it exists already, nuke it and create a fresh version.
The transformation weβll do is pretty light.
Measures π
Weβre going to drop a couple of fields:
-
@id
we donβt need -
latestReading
holds fact data that weβre getting from elsewhere, so no point duplicating it here
Weβll also transform the foreign key to strip the URL prefix making it easier to work with.
CREATE OR REPLACE TABLE measures AS
SELECT *
EXCLUDE ("@id", latestReading)
REPLACE(
REGEXP_REPLACE(station,
'http://environment\.data\.gov\.uk/flood-monitoring/id/stations/',
'') AS station
)
FROM measures_stg;
This is using a couple of my favourite recent discoveries in DuckDBβthe EXCLUDE
and REPLACE
clauses.
With EXCLUDE
weβre taking advantage of SELECT *
to bring in all columns from the source tableβwhich saves typing, but also means new columns added to the source will propagate automagicallyβbut with the exception of ones that we donβt want.
The REPLACE
clause is a really elegant way of providing a different version of the same column.
Since we want to retain the station
column but just trim the prefix, this is a great way to do it without moving its position in the column list.
The other option would have been to EXCLUDE
it too, and then add it on to the column list.
With the table created letβs define the primary key as discussion above:
ALTER TABLE measures
ADD CONSTRAINT measures_pk PRIMARY KEY (notation);
Stations π
Very similar to the process above:
CREATE OR REPLACE TABLE stations AS
SELECT * EXCLUDE (measures)
FROM stations_stg;
ALTER TABLE stations
ADD CONSTRAINT stations_pk PRIMARY KEY (notation);
One thing that you might also do is move the primary key (notation
) to be the first column in the table.
This is a habit I picked up years ago; I donβt know if itβs still common practice.
To do it youβd EXCLUDE
the field and manually prefix it to the star expression:
CREATE OR REPLACE TABLE stations AS
SELECT notation, * EXCLUDE (measures, notation)
FROM stations_stg;
ALTER TABLE stations ADD CONSTRAINT stations_pk PRIMARY KEY (notation);
(If you do this, youβd want to logically do the same for the other tables' PKs too).
Fact table π
This is where things get fun :)
Because weβre going to be adding to the table with new data rather than replacing it, we canβt just CREATE OR REPLACE
it each time.
Therefore weβll run the CREATE
as a one-off:
CREATE TABLE IF NOT EXISTS readings AS
SELECT * EXCLUDE "@id" FROM readings_stg WHERE FALSE;
A few notes:
-
IF NOT EXISTS
makes sure we donβt overwrite the table. Weβd get the same effect if we just putCREATE TABLE
, the only difference is the latter would fail if the table already exists, whilstIF NOT EXISTS
causes it to exit silently. -
Weβre going to
EXCLUDE
the@id
column because we donβt need it -
This will only create the table using the schema projected by the
SELECT
; theWHERE FALSE
means no rows will be selected. This is so that we decouple the table creation from its population.
Now weβll add a primary key.
The key here is the time of the reading (dateTime
) plus the measure (measure
).
ALTER TABLE readings
ADD CONSTRAINT readings_pk PRIMARY KEY (dateTime, measure);
Populating the fact table π
Our logic here is: "Add data if itβs new, donβt throw an error if it already exists". Our primary key for this is the time of the reading and the measure. If we receive a duplicate weβre going to ignore it.
Weβre making a design choice here; in theory we could decide that a duplicate reading represents an update to the original (re-stating a fact that could have been wrong previously) and handle it as an UPSERT (i.e. INSERT if new, UPDATE if existing).
|
DuckDB has some very nice syntax available around the INSERT INTO β¦ SELECT FROM
pattern. To achieve what we want we use the self-documenting statement INSERT OR IGNORE
. This is a condensed version of the more verbose INSERT INTO⦠SELECT FROM⦠ON CONFLICT DO NOTHING
syntax.
INSERT OR IGNORE INTO readings
SELECT *
EXCLUDE "@id"
REPLACE(
REGEXP_REPLACE(measure,
'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
'') AS measure)
FROM readings_stg
Weβre using the same EXCLUDE
and REPLACE
expressions as we did above; remove the @id
column, and strip the URL prefix from the foreign key measure
.
The first time we run this we can see the number of INSERTS:
changes: 5272
Then we re-run it:
changes: 0
Since nothing changed in the staging table, this makes sense. Letβs load the staging table with the latest data again:
changes: 4031
Joining the data π
Similar to the fact table above, weβre going to be incrementally loading this final, denormalised, table. Iβm taking a slightly roundabout tack to do this here.
First, Iβve defined a view which is the result of the join:
CREATE OR REPLACE VIEW vw_readings_enriched AS
SELECT "r_\0": COLUMNS(r.*),
"m_\0": COLUMNS(m.*),
"s_\0": COLUMNS(s.*)
FROM
readings r
LEFT JOIN measures m ON r.measure = m.notation
LEFT JOIN stations s ON m.station = s.notation
See my earlier blog post if youβre not familiar with the COLUMNS syntax
|
From the view I create the tableβs schema (but donβt populate anything yet):
CREATE TABLE IF NOT EXISTS readings_enriched AS
SELECT * FROM vw_readings_enriched LIMIT 0;
ALTER TABLE readings_enriched
ADD CONSTRAINT readings_enriched_pk PRIMARY KEY (r_dateTime, r_measure);
And now populate it in the same way as we did for the readings
table:
INSERT OR IGNORE INTO readings_enriched
SELECT * FROM vw_readings_enriched;
Query the joined data π
Now that weβve got our joined data we can start to query and analyse it. Hereβs the five most recent readings for all water level measurements on the River Wharfe:
SELECT r_dateTime
, s_label
, r_value
FROM readings_enriched
WHERE s_rivername= 'River Wharfe' and m_parameterName = 'Water Level'
ORDER BY r_dateTime desc LIMIT 5 ;
βββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ¬ββββββββββ
β r_dateTime β s_label β r_value β
β timestamp β json β double β
βββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββΌββββββββββ€
β 2025-03-19 15:00:00 β "Kettlewell" β 0.171 β
β 2025-03-19 15:00:00 β "Cock Beck Sluices" β 3.598 β
β 2025-03-19 15:00:00 β "Nun Appleton Fleet Pumping Station" β 2.379 β
β 2025-03-19 15:00:00 β "Tadcaster" β 0.227 β
β 2025-03-19 15:00:00 β "Netherside Hall" β 0.319 β
βββββββββββββββββββββββ΄βββββββββββββββββββββββββββββββββββββββ΄ββββββββββ
Historical data π
The readings
API includes the option for specifying a date range.
However, there is a hard limit of 10000 rows, and a single time periodβs readings for all stations is about 5000 rows, this doesnβt look like a viable option if weβre wanting to backfill data for all stations.
Historic readings are available, although in CSV format rather than the JSON weβre used to. Nothing like real-world data engineering problems to keep us on our feet :)
$ http https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-18.csv |head -n2
dateTime,measure,value
2025-03-18T00:00:00Z,http://environment.data.gov.uk/flood-monitoring/id/measures/531166-level-downstage-i-15_min-mAOD,49.362
Fortunately, DuckDB has us covered, and handles it in its stride:
π‘β SELECT * FROM 'https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-18.csv' LIMIT 1;
βββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ¬ββββββββββ
β dateTime β measure β value β
β timestamp β varchar β varchar β
βββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΌββββββββββ€
β 2025-03-18 00:00:00 β http://environment.data.gov.uk/flood-monitoring/id/measures/531166-level-downstage-i-15_min-mAOD β 49.362 β
βββββββββββββββββββββββ΄βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ΄ββββββββββ
β¦or almost in its strideβonce I ran it on a full file I got this:
CSV Error on Line: 388909
Original Line:
2025-03-17T22:30:00Z,http://environment.data.gov.uk/flood-monitoring/id/measures/690552-level-stage-i-15_min-m,0.770|0.688
Error when converting column "value". Could not convert string "0.770|0.688" to 'DOUBLE'
Column value is being converted as type DOUBLE
This type was auto-detected from the CSV file.
[β¦]
Bravo for such a verbose and useful error message. Not just "thereβs an error", or "could not convert", but tells you where, shows you the line, makes it super-easy to understand the problem and what to do.
What to do? Brush it under the carpet and pretend it didnβt happen!
In other words, ignore_errors=true
:
CREATE OR REPLACE TABLE readings_historical AS
SELECT *
FROM read_csv('https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-18.csv',
ignore_errors=true)
This loads all 476k rows of data for 18th March into a new table. Now weβll add the previous days tooβand head out to the shell to do it:
β― duckdb env-agency.duckdb -c "INSERT INTO readings_historical SELECT * FROM read_csv('https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-16.csv', ignore_errors=true);"
100% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Run Time (s): real 16.405 user 1.090767 sys 0.516826
Even more concise is the COPY
option:
duckdb env-agency.duckdb -c "COPY readings_historical FROM 'https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-03-14.csv' (IGNORE_ERRORS);"
Run Time (s): real 3.275 user 1.718801 sys 0.247875
Why am I doing this from the shell? So that I can then do this:
start_date="2025-01-01"
end_date="2025-03-13"
current_date=$start_date
while [[ "$current_date" < "$end_date" || "$current_date" == "$end_date" ]]; do
echo "Processing $current_date..."
duckdb env-agency.duckdb -c \
"COPY readings_historical
FROM 'https://environment.data.gov.uk/flood-monitoring/archive/readings-$current_date.csv'
(IGNORE_ERRORS);"
current_date=$(date -d "$current_date + 1 day" +%Y-%m-%d)
done
In the readings_historical
table is now a nice big chunk of data (not Big Data, just a big chunk of normally-size data):
Now to merge this into the main table:
π‘β INSERT OR IGNORE INTO readings
SELECT *
REPLACE(
REGEXP_REPLACE(measure,
'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
'') AS measure)
FROM readings_historical;
Run Time (s): real 0.003 user 0.002708 sys 0.000571
Conversion Error:
Could not convert string '0.772|0.692' to DOUBLE
Hereβs the problem with taking the easy route. By letting DuckDB guess at the data types for the CSV data, weβve ended up with dodgy data being ingested. How much dodgy data? 0.01%β¦
π‘β SELECT COUNT(*) FROM readings_historical WHERE TRY_CAST(value AS DOUBLE) IS NULL ;
ββββββββββββββββ
β count_star() β
β int64 β
ββββββββββββββββ€
β 3202 β
ββββββββββββββββ
It took a few minutes to load the historical data, so instead of ditching the table letβs just deal with what weβve got. First up, what is the dodgy data?
π‘β SELECT value
FROM readings_historical
WHERE TRY_CAST(value AS DOUBLE) IS NULL
USING SAMPLE 0.5%;
βββββββββββββββ
β value β
β varchar β
βββββββββββββββ€
β 2.415|2.473 β
β 1.496|1.489 β
β 1.730|1.732 β
β 1.419|1.413 β
β 1.587|1.586 β
β 1.097|1.101 β
β 1.032|1.033 β
β 0.866|0.874 β
β 0.864|0.862 β
β 0.861|0.862 β
β 0.386|0.387 β
β 1.118|1.062 β
βββββββββββββββ€
β 12 rows β
βββββββββββββββ
It looks like they all follow this pattern of two valid-looking values separated by a pipe |
.
We can double-check this:
π‘β SELECT * FROM readings_historical
WHERE value NOT LIKE '%|%'
AND TRY_CAST(value AS DOUBLE) IS NULL;
βββββββββββββ¬ββββββββββ¬ββββββββββ
β dateTime β measure β value β
β timestamp β varchar β varchar β
βββββββββββββ΄ββββββββββ΄ββββββββββ€
β 0 rows β
βββββββββββββββββββββββββββββββββ
Weβll make an executive decision to take the first value in these pairs, using REPLACE
to override the value
to split out the string and use the first instance.
INSERT OR IGNORE INTO readings
SELECT *
REPLACE(
REGEXP_REPLACE(measure,
'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
'') AS measure,
SPLIT_PART(value, '|', 1) AS value)
FROM readings_historical
WHERE value LIKE '%|%';
Now we can load the rest of the data:
π‘β INSERT OR IGNORE INTO readings
SELECT *
REPLACE(
REGEXP_REPLACE(measure,
'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
'') AS measure)
FROM readings_historical
WHERE value NOT LIKE '%|%';
100% ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Run Time (s): real 218.189 user 213.713807 sys 3.481680
changes: 37031770 total_changes: 37034972
The dataβs loaded:
π‘β SELECT COUNT(*) as row_ct,
min(dateTime) as min_dateTime,
max(dateTime) as max_dateTime
FROM readings;
ββββββββββββ¬ββββββββββββββββββββββ¬ββββββββββββββββββββββ
β row_ct β min_dateTime β max_dateTime β
β int64 β timestamp β timestamp β
ββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββ€
β 37044275 β 2025-01-01 00:00:00 β 2025-03-20 15:25:10 β
ββββββββββββ΄ββββββββββββββββββββββ΄ββββββββββββββββββββββ
Now to load it into the denormalised tableβfor this itβs the same query as when weβre just loading incremental changes:
INSERT OR IGNORE INTO readings_enriched
SELECT * FROM vw_readings_enriched;
Letβs "Operationalise" it π
Letβs have a look at a very rough way of running the update for this pipeline automatically.
Weβll create a SQL script to update the dimension data:
-- Load the staging data from the REST API
CREATE OR REPLACE TABLE measures_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/measures'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
CREATE OR REPLACE TABLE stations_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/stations'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
-- Rebuild dimension tables
CREATE OR REPLACE TABLE measures AS
SELECT *
EXCLUDE ("@id", latestReading)
REPLACE(
REGEXP_REPLACE(station,
'http://environment\.data\.gov\.uk/flood-monitoring/id/stations/',
'') AS station
)
FROM measures_stg;
ALTER TABLE measures
ADD CONSTRAINT measures_pk PRIMARY KEY (notation);
CREATE OR REPLACE TABLE stations AS
SELECT * EXCLUDE (measures)
FROM stations_stg;
ALTER TABLE stations
ADD CONSTRAINT stations_pk PRIMARY KEY (notation);
and load the fact data:
-- Load the staging data from the REST API
CREATE OR REPLACE TABLE readings_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/data/readings?latest'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
-- Merge into the fact table
INSERT OR IGNORE INTO readings
SELECT *
EXCLUDE "@id"
REPLACE(
REGEXP_REPLACE(measure,
'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
'') AS measure)
FROM readings_stg
-- Merge into the denormalised table
INSERT OR IGNORE INTO readings_enriched
SELECT * FROM vw_readings_enriched;
Now to schedule it.
An entire industry has been built around workflow scheduling tools;
Iβm going to stick to the humble cron
.
Itβs simple, itβs quick, and LLMs have now learnt how to write the syntax ;)
Well, the syntax to invoke DuckDB is a bit different from what Claude thought, but the fiddly */15
stuff it nailed.
Hereβs the crontab I set up (crontab -e
)
# Run the pipeline every 15 minutes
*/15 * * * * cd ~/env-agency/ && /opt/homebrew/bin/duckdb env-agency.duckdb -f dimensions.sql && /opt/homebrew/bin/duckdb env-agency.duckdb -f fact.sql
Every fifteen minutes this pulls down the latest data, rebuilds the dimension tables, and adds the new data to the fact table.
Analysing the data π
Letβs finish off by looking at how we can analyse the data.
Metabase π
Metabase is fairly quick to get up and running. The complication is that to query DuckDB you need a driver that Motherduck have created that I couldnβt get to work under Docker, hence running Metabase locally:
# You need Java 21
sdk install java 21.0.6-tem
# Download Metabase & Metaduck
mkdir metabase
curl https://downloads.metabase.com/v0.53.7/metabase.jar -O
mkdir plugins && pushd plugins && curl https://github.com/motherduckdb/metabase_duckdb_driver/releases/download/0.2.12-b/duckdb.metabase-driver.jar -O && popd
# Run metabase
java --add-opens java.base/java.nio=ALL-UNNAMED -jar metabase.jar
This launches Metabase on http://localhost:3000, and after an annoying on-boarding survey, itβs remarkably quick to get something created. Adding the database is simple enough:
Once youβve done that Metabase automagically (Iβm surprised it doesnβt say "AI" when it does it) offers a summary of the data in the table:
Itβs not a bad starter for ten; the count of rows is useful from a data-completeness point of view.
Weβd need to do some work to define the value
metric and perhaps some geographic hierarchiesβbut thereβs definitely lots of potential.
You can also poke around in the data itself with a tabular slice & dice approach:
Rill π
I used Rill previously and liked it.
Getting it up and running is easy:
# Installβ¦
curl https://rill.sh | sh
# β¦and go!
rill start rill-env-agency
Rill supports DuckDB out of the box, so adding our data source is easy:
Who can resist a bit of AI magic?
As with Metabase, itβs a pretty good starting point for customising into what you want to analyse.
With a bit of playing around you can create a nice comparison between January and Februaryβs readings for stations on a given river:
I couldnβt figure out how to plot a time series of values for a series of data, but as my children would say to me, thatβs probably a skill issue on my partβ¦
Superset π
This is a bit heavier to install than Metabase, and definitely more so than Rill. After an aborted attempt to install it locally I went the Docker route even though it meant a bit of fiddling to get the DuckDB dependency installed.
Follow the steps in the Quickstart to clone the repo, and then modify the command
for the superset
service to install the DuckDB dependencies before launching Superset itself:
command: ["sh", "-c", "pip install duckdb duckdb-engine && /app/docker/docker-bootstrap.sh app-gunicorn"]
Now bring up Superset:
docker compose -f docker-compose-image-tag.yml up
Youβll find Superset at http://localhost:8088βnote that it does take a few minutes to boot up, so donβt be impatient if it doesnβt seem to be working straight away.
After quite a lot of fiddling around to get this far, I realised that my DuckDB file is on my host machine, not in the Docker container. I couldnβt just mount it as a volume as there are already volumes mounted using a syntax I wasnβt familiar with how to add to:
volumes:
*superset-volumes
Instead I just did a bit of a hack and copied the file onto the container:
docker compose cp ~/env-agency/env-agency.duckdb superset:/tmp/
Finally, within Superset, I could add the database (Settings β Manage Databases).
Somewhat confusingly, if you select "DuckDB" as the type, youβre asked for "DuckDB Credentials" and a Motherduck access token; click the small Connect this database with a SQLAlchemy URI string instead
link (or just select "Other" database type in the first place).
Specify the local path to your DuckDB file, for example:
duckdb:////tmp/env-agency.duckdb
Next, create a Datasetβselect the database you just defined, and the readings_enriched
table:
After all that, fortunately, Superset has a rich set of functionality particularly when it comes to charting. I did hit frequent time-outs when experimenting, but managed to create a nice time-series chart fairly easily:
Wrapping up π
Weβve built cobbled together a pipeline that extracts data from a set of REST APIs, applies some light cleansing and transformation, and loads it into a DuckDB table from where it can be queried and analysed.
With cron
weβve automated the refresh of this data every fifteen minutes.
The total bill of materials is approximately:
-
1 x DuckDB
-
14 SQL statements (16 if you include historical backfill)
-
1 ropey cron job
Would this pass muster in a real deployment? You tell me :)
My guess is that Iβd not want to be on the hook to support it, but itβd do the job until it didnβt. That is, as a prototype with sound modelling to expand on later itβs probably good enough.
But Iβd love to hear from folk who are building this stuff for real day in, day out. What did I overlook here? Is doing it in DuckDB daft? Let me know on Bluesky or LinkedIn.
You can find the full set of SQL files to run this here. |