$ kafkacat -b broker:29092 -t mytestopic -C -e -q| wc -l
Why is kcat showing the wrong topics?
Much as I love kcat (🤫 it’ll always be kafkacat to me…), this morning I nearly fell out with it 👇
😖 I thought I was going stir crazy, after listing topics on a broker and seeing topics from a different broker.
😵 WTF 😵
Quick profiling of data in Apache Kafka using kafkacat and visidata
ksqlDB is a fantastically powerful tool for processing and analysing streams of data in Apache Kafka. But sometimes, you just want a quick way to profile the data in a topic in Kafka. I wrote about this previously with a convoluted (but effective) set of bash commands pipelined together to perform a GROUP BY
on data. Then someone introduced me to visidata
, which makes it all a lot quicker!
Loading delimited data into Kafka - quick & dirty (but effective)
Whilst Apache Kafka is an event streaming platform designed for, well, streams of events, it’s perfectly valid to use it as a store of data which perhaps changes only occasionally (or even never). I’m thinking here of reference data (lookup data) that’s used to enrich regular streams of events.
You might well get your reference data from a database where it resides and do so effectively using CDC - but sometimes it comes down to those pesky CSV files that we all know and love/hate. Simple, awful, but effective. I wrote previously about loading CSV data into Kafka from files that are updated frequently, but here I want to look at CSV files that are not changing. Kafka Connect simplifies getting data in to (and out of) Kafka but even Kafka Connect becomes a bit of an overhead when you just have a single file that you want to load into a topic and then never deal with again. I spent this afternoon wrangling with a couple of CSV-ish files, and building on my previous article about neat tricks you can do in bash with data, I have some more to share with you here :)
Performing a GROUP BY on data in bash
One of the fun things about working with data over the years is learning how to use the tools of the day—but also learning to fall back on the tools that are always there for you - and one of those is bash and its wonderful library of shell tools.
There’s an even better way than I’ve described here, and it’s called visidata . I’ve written about it more over here.
I’ve been playing around with a new data source recently, and needed to understand more about its structure. Within a single stream there were multiple message types.
Ingesting XML data into Kafka - Option 1: The Dirty Hack
What would a blog post on rmoff.net
be if it didn’t include the dirty hack option? 😁
The secret to dirty hacks is that they are often rather effective and when needs must, they can suffice. If you’re prototyping and need to JFDI, a dirty hack is just fine. If you’re looking for code to run in Production, then a dirty hack probably is not fine.
Setting key value when piping from jq to kafkacat
One of my favourite hacks for getting data into Kafka is using kafkacat and stdin
, often from jq
. You can see this in action with Wi-Fi data, IoT data, and data from a REST endpoint. This is fine for getting values into a Kafka message - but Kafka messages are key/value, and being able to specify a key is can often be important.
Here’s a way to do that, using a separator and some jq
magic. Note that at the moment kafkacat only supports single byte separator characters, so you need to choose carefully. If you pick a separator that also appears in your data, it’s possibly going to have unintended consequences.
Counting the number of messages in a Kafka topic
There’s ways, and then there’s ways, to count the number of records/events/messages in a Kafka topic. Most of them are potentially inaccurate, or inefficient, or both. Here’s one that falls into the potentially inefficient category, using kafkacat
to read all the messages and pipe to wc
which with the -l
will tell you how many lines there are, and since each message is a line, how many messages you have in the Kafka topic:
Why JSON isn’t the same as JSON Schema in Kafka Connect converters and ksqlDB (Viewing Kafka messages bytes as hex)
I’ve been playing around with the new SerDes (serialisers/deserialisers) that shipped with Confluent Platform 5.5 - Protobuf, and JSON Schema (these were added to the existing support for Avro). The serialisers (and associated Kafka Connect converters) take a payload and serialise it into bytes for sending to Kafka, and I was interested in what those bytes look like. For that I used my favourite Kafka swiss-army knife: kafkacat.
How to install kafkacat on Fedora
A quick and dirty way to monitor data arriving on Kafka
I’ve been poking around recently with capturing Wi-Fi packet data and streaming it into Apache Kafka, from where I’m processing and analysing it. Kafka itself is rock-solid - because I’m using ☁️Confluent Cloud and someone else worries about provisioning it, scaling it, and keeping it running for me. But whilst Kafka works just great, my side of the setup—tshark
running on a Raspberry Pi—is less than stable. For whatever reason it sometimes stalls and I have to restart the Raspberry Pi and restart the capture process.
Streaming Wi-Fi trace data from Raspberry Pi to Apache Kafka with Confluent Cloud
Primitive Keys in ksqlDB
ksqlDB 0.7 will add support for message keys as primitive data types beyond just STRING
(which is all we’ve had to date). That means that Kafka messages are going to be much easier to work with, and require less wrangling to get into the form in which you need them. Take an example of a database table that you’ve ingested into a Kafka topic, and want to join to a stream of events. Previously you’d have had to take the Kafka topic into which the table had been ingested and run a ksqlDB processor to re-key the messages such that ksqlDB could join on them. Friends, I am here to tell you that this is no longer needed!
Notes on getting data into InfluxDB from Kafka with Kafka Connect
When a message from your source Kafka topic is written to InfluxDB the InfluxDB values are set thus:
Timestamp is taken from the Kafka message timestamp (which is either set by your producer, or the time at which it was received by the broker)
Tag(s) are taken from the
field in the message. This field must be amap
type - see below -
Value fields are taken from the rest of the message, and must be numeric or boolean
Measurement name can be specified as a field of the message, or hardcoded in the connector config.
Monitoring Sonos with ksqlDB, InfluxDB, and Grafana
Using Kafka Connect and Debezium with Confluent Cloud
This is based on using Confluent Cloud to provide your managed Kafka and Schema Registry. All that you run yourself is the Kafka Connect worker.
Optionally, you can use this Docker Compose to run the worker and a sample MySQL database.
Skipping bad records with the Kafka Connect JDBC sink connector
The Kafka Connect framework provides generic error handling and dead-letter queue capabilities which are available for problems with [de]serialisation and Single Message Transforms. When it comes to errors that a connector may encounter doing the actual pull
or put
of data from the source/target system, it’s down to the connector itself to implement logic around that. For example, the Elasticsearch sink connector provides configuration (behavior.on.malformed.documents
) that can be set so that a single bad record won’t halt the pipeline. Others, such as the JDBC Sink connector, don’t provide this yet. That means that if you hit this problem, you need to manually unblock it yourself. One way is to manually move the offset of the consumer on past the bad message.
TL;DR : You can use kafka-consumer-groups --reset-offsets --to-offset <x>
to manually move the connector past a bad message
Copying data between Kafka clusters with Kafkacat
kafkacat gives you Kafka super powers 😎
I’ve written before about kafkacat and what a great tool it is for doing lots of useful things as a developer with Kafka. I used it too in a recent demo that I built in which data needed manipulating in a way that I couldn’t easily elsewhere. Today I want share a very simple but powerful use for kafkacat as both a consumer and producer: copying data from one Kafka cluster to another. In this instance it’s getting data from Confluent Cloud down to a local cluster.
Reset Kafka Connect Source Connector Offsets
Manually delete a connector from Kafka Connect
Kafka Connect has as REST API through which all config should be done, including removing connectors that have been created. Sometimes though, you might have reason to want to manually do this—and since Kafka Connect running in distributed mode uses Kafka as its persistent data store, you can achieve this by manually writing to the topic yourself.