rmoff's random ramblings
about talks

Resetting a Consumer Group in Kafka

Published Aug 9, 2019 by in Kafka Connect, Kafka, Consumer, Consumer Group, Replicator at https://rmoff.net/2019/08/09/resetting-a-consumer-group-in-kafka/

I’ve been using Replicator as a powerful way to copy data from my Kafka rig at home onto my laptop’s Kafka environment. It means that when I’m on the road I can continue to work with the same set of data and develop pipelines etc. With a VPN back home I can even keep them in sync directly if I want to.

I hit a problem the other day where Replicator was running, but I had no data in my target topics on my laptop. After a bit of head-scratching I realised that my local Kafka environment had been rebuilt (I use Docker Compose so complete rebuilds to start from scratch are easy), hence no data in the topic. But, even after restarting the Replicator Kafka Connect worker, I still had no data loaded into the empty topics. What was going on? Well Replicator acts as a consumer from the source Kafka cluster (on my home server), and so far as that Kafka cluster was concerned, Replicator had already read the messages. It thought that because even though I’d rebuilt everything on my laptop, Replicator was using the same connector name as before, and the connector name is used as the Consumer group name - which is how the source Kafka cluster keeps track of the offsets. So my "new" Kafka environment was going back to the source, which viewed it as the existing "old" one, which had already received the messages.

Two options from here:

  • Hacky but effective : chose a new Replicator connector name and restart. New name = new consumer group = offsets reset.

  • Proper way : reset the offsets for the consumer group on the source side.

Here’s how to do the latter:

  1. Shut down the consumers - in this case, Replicator on my laptop:

    $ docker-compose stop replicator
    Stopping replicator ... done
  2. Confirm the consumer group name on the source cluster:

    $ docker exec -it kafka kafka-consumer-groups \
                        --bootstrap-server kafka.moffatt.me:9092 \
                        --list | \
                        grep replicator
    replicator-source-CIF_FULL_DAILY
    replicator-source
    replicator-source-networkrail_TRAIN_MVT

    Here there are three; I was interested in just the first one, replicator-source-CIF_FULL_DAILY

  3. Inspect the consumer group:

    $ docker exec -it kafka kafka-consumer-groups \
                        --bootstrap-server kafka.moffatt.me:9092 \
                        --describe \
                        --group replicator-source-CIF_FULL_DAILY
    
    Consumer group 'replicator-source-CIF_FULL_DAILY' has no active members.
    
    GROUP                            TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    replicator-source-CIF_FULL_DAILY __consumer_timestamps 49         421             421             0               -               -               -
    replicator-source-CIF_FULL_DAILY __consumer_timestamps 3          39              39              0               -               -               -
    replicator-source-CIF_FULL_DAILY CIF_FULL_DAILY        0          4718955         4718955         0               -               -               -
    replicator-source-CIF_FULL_DAILY __consumer_timestamps 6          16              16              0               -               -               -
    replicator-source-CIF_FULL_DAILY __consumer_timestamps 42         422             422             0               -               -               -
    replicator-source-CIF_FULL_DAILY __consumer_timestamps 40         69              69              0               -               -               -
    replicator-source-CIF_FULL_DAILY __consumer_timestamps 41         2               2               0               -               -               -
  4. Reset the offsets for the consumer group. In this case I reset them to the earliest, so that Replicator would consume all of the topic’s data. Other reset options are available.

    $ docker exec -it kafka kafka-consumer-groups \
                        --bootstrap-server kafka.moffatt.me:9092 \
                        --group replicator-source-CIF_FULL_DAILY \
                        --reset-offsets \
                        --all-topics \
                        --to-earliest \
                        --execute
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET
    replicator-source-CIF_FULL_DAILY __consumer_timestamps          49         0
    replicator-source-CIF_FULL_DAILY __consumer_timestamps          3          0
    replicator-source-CIF_FULL_DAILY CIF_FULL_DAILY                 0          3847807
    replicator-source-CIF_FULL_DAILY __consumer_timestamps          6          0
    replicator-source-CIF_FULL_DAILY __consumer_timestamps          42         0
    replicator-source-CIF_FULL_DAILY __consumer_timestamps          40         0
    replicator-source-CIF_FULL_DAILY __consumer_timestamps          41         0
  5. Restart the consumer(s), in this case, Replicator on my laptop:

    $ docker-compose start replicator

Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025