Loading CSV data into Confluent Cloud using the FilePulse connector
The FilePulse connector from Florian Hussonnois is a really useful connector for Kafka Connect which enables you to ingest flat files including CSV, JSON, XML, etc into Kafka. You can read more it in its overview here. Other connectors for ingested CSV data include kafka-connect-spooldir (which I wrote about previously), and kafka-connect-fs.
Here I’ll show how to use it to stream CSV data into a topic in Confluent Cloud. You can apply the same config pattern to any other secured Kafka cluster.
Using ksqlDB to process data ingested from ActiveMQ with Kafka Connect
The ActiveMQ source connector creates a Struct holding the value of the message from ActiveMQ (as well as its key). This is as would be expected. However, you can encounter challenges in working with the data if the ActiveMQ data of interest within the payload is complex. Things like converters and schemas can get really funky, really quick.
Kafka Connect JDBC Sink deep-dive: Working with Primary Keys
The Kafka Connect JDBC Sink can be used to stream data from a Kafka topic to a database such as Oracle, Postgres, MySQL, DB2, etc.
It supports many permutations of configuration around how primary keys are handled. The documentation details these. This article aims to illustrate and expand on this.
Kafka Connect - SQLSyntaxErrorException: BLOB/TEXT column … used in key specification without a key length
I got the error SQLSyntaxErrorException: BLOB/TEXT column 'MESSAGE_KEY' used in key specification without a key length
with Kafka Connect JDBC Sink connector (v10.0.2) and MySQL (8.0.23)
Running a self-managed Kafka Connect worker for Confluent Cloud
Confluent Cloud is not only a fully-managed Apache Kafka service, but also provides important additional pieces for building applications and pipelines including managed connectors, Schema Registry, and ksqlDB. Managed Connectors are run for you (hence, managed!) within Confluent Cloud - you just specify the technology to which you want to integrate in or out of Kafka and Confluent Cloud does the rest.
Kafka Connect - Deep Dive into Single Message Transforms
KIP-66 was added in Apache Kafka 0.10.2 and brought new functionality called Single Message Transforms (SMT). Using SMT you can modify the data and its characteristics as it passes through Kafka Connect pipeline, without needing additional stream processors. For things like manipulating fields, changing topic names, conditionally dropping messages, and more, SMT are a perfect solution. If you get to things like aggregation, joining streams, and lookups then SMT may not be the best for you and you should head over to Kafka Streams or ksqlDB instead.
🎄 Twelve Days of SMT 🎄 - Day 11: Predicate and Filter
Apache Kafka 2.6 included KIP-585 which adds support for defining predicates against which transforms are conditionally executed, as well as a Filter
Single Message Transform to drop messages - which in combination means that you can conditionally drop messages.
As part of Apache Kafka, Kafka Connect ships with pre-built Single Message Transforms and Predicates, but you can also write you own. The API for each is documented: Transformation
/ Predicate
. The predicates that ship with Apache Kafka are:
-
RecordIsTombstone
- The value part of the message is null (denoting a tombstone message) -
HasHeaderKey
- Matches if a header exists with the name given -
TopicNameMatches
- Matches based on topic