Skipping bad records with the Kafka Connect JDBC sink connector
The Kafka Connect framework provides generic error handling and dead-letter queue capabilities which are available for problems with [de]serialisation and Single Message Transforms. When it comes to errors that a connector may encounter doing the actual pull or put of data from the source/target system, it’s down to the connector itself to implement logic around that. For example, the Elasticsearch sink connector provides configuration (behavior.on.malformed.documents) that can be set so that a single bad record won’t halt the pipeline. Others, such as the JDBC Sink connector, don’t provide this yet. That means that if you hit this problem, you need to manually unblock it yourself. One way is to manually move the offset of the consumer on past the bad message.
TL;DR : You can use kafka-consumer-groups --reset-offsets --to-offset <x> to manually move the connector past a bad message
Resetting a Consumer Group in Kafka
I’ve been using Replicator as a powerful way to copy data from my Kafka rig at home onto my laptop’s Kafka environment. It means that when I’m on the road I can continue to work with the same set of data and develop pipelines etc. With a VPN back home I can even keep them in sync directly if I want to.
I hit a problem the other day where Replicator was running, but I had no data in my target topics on my laptop. After a bit of head-scratching I realised that my local Kafka environment had been rebuilt (I use Docker Compose so complete rebuilds to start from scratch are easy), hence no data in the topic. But, even after restarting the Replicator Kafka Connect worker, I still had no data loaded into the empty topics. What was going on? Well Replicator acts as a consumer from the source Kafka cluster (on my home server), and so far as that Kafka cluster was concerned, Replicator had already read the messages. It thought that because even though I’d rebuilt everything on my laptop, Replicator was using the same connector name as before, and the connector name is used as the Consumer group name - which is how the source Kafka cluster keeps track of the offsets. So my "new" Kafka environment was going back to the source, which viewed it as the existing "old" one, which had already received the messages.