I’ve been using Replicator as a powerful way to copy data from my Kafka rig at home onto my laptop’s Kafka environment. It means that when I’m on the road I can continue to work with the same set of data and develop pipelines etc. With a VPN back home I can even keep them in sync directly if I want to.
I hit a problem the other day where Replicator was running, but I had no data in my target topics on my laptop. After a bit of head-scratching I realised that my local Kafka environment had been rebuilt (I use Docker Compose so complete rebuilds to start from scratch are easy), hence no data in the topic. But, even after restarting the Replicator Kafka Connect worker, I still had no data loaded into the empty topics. What was going on? Well Replicator acts as a consumer from the source Kafka cluster (on my home server), and so far as that Kafka cluster was concerned, Replicator had already read the messages. It thought that because even though I’d rebuilt everything on my laptop, Replicator was using the same connector name as before, and the connector name is used as the Consumer group name - which is how the source Kafka cluster keeps track of the offsets. So my "new" Kafka environment was going back to the source, which viewed it as the existing "old" one, which had already received the messages.
Two options from here:
Hacky but effective : chose a new Replicator connector name and restart. New name = new consumer group = offsets reset.
Proper way : reset the offsets for the consumer group on the source side.
Here’s how to do the latter:
Shut down the consumers - in this case, Replicator on my laptop:
$ docker-compose stop replicator Stopping replicator ... done
Confirm the consumer group name on the source cluster:
$ docker exec -it kafka kafka-consumer-groups \ --bootstrap-server kafka.moffatt.me:9092 \ --list | \ grep replicator replicator-source-CIF_FULL_DAILY replicator-source replicator-source-networkrail_TRAIN_MVT
Here there are three; I was interested in just the first one,
Inspect the consumer group:
$ docker exec -it kafka kafka-consumer-groups \ --bootstrap-server kafka.moffatt.me:9092 \ --describe \ --group replicator-source-CIF_FULL_DAILY Consumer group 'replicator-source-CIF_FULL_DAILY' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID replicator-source-CIF_FULL_DAILY __consumer_timestamps 49 421 421 0 - - - replicator-source-CIF_FULL_DAILY __consumer_timestamps 3 39 39 0 - - - replicator-source-CIF_FULL_DAILY CIF_FULL_DAILY 0 4718955 4718955 0 - - - replicator-source-CIF_FULL_DAILY __consumer_timestamps 6 16 16 0 - - - replicator-source-CIF_FULL_DAILY __consumer_timestamps 42 422 422 0 - - - replicator-source-CIF_FULL_DAILY __consumer_timestamps 40 69 69 0 - - - replicator-source-CIF_FULL_DAILY __consumer_timestamps 41 2 2 0 - - -
Reset the offsets for the consumer group. In this case I reset them to the earliest, so that Replicator would consume all of the topic’s data. Other reset options are available.
$ docker exec -it kafka kafka-consumer-groups \ --bootstrap-server kafka.moffatt.me:9092 \ --group replicator-source-CIF_FULL_DAILY \ --reset-offsets \ --all-topics \ --to-earliest \ --execute GROUP TOPIC PARTITION NEW-OFFSET replicator-source-CIF_FULL_DAILY __consumer_timestamps 49 0 replicator-source-CIF_FULL_DAILY __consumer_timestamps 3 0 replicator-source-CIF_FULL_DAILY CIF_FULL_DAILY 0 3847807 replicator-source-CIF_FULL_DAILY __consumer_timestamps 6 0 replicator-source-CIF_FULL_DAILY __consumer_timestamps 42 0 replicator-source-CIF_FULL_DAILY __consumer_timestamps 40 0 replicator-source-CIF_FULL_DAILY __consumer_timestamps 41 0
Restart the consumer(s), in this case, Replicator on my laptop:
$ docker-compose start replicator