rmoff's random ramblings
about talks

Learning Golang (some rough notes) - S02E03 - Kafka Go Consumer (Channel-based)

Published Jul 14, 2020 by in Go, Golang, Kafka, Kafka Consumer API at https://rmoff.net/2020/07/14/learning-golang-some-rough-notes-s02e03-kafka-go-consumer-channel-based/

Having written my first Kafka producer in Go, and even added error handling to it, the next step was to write a consumer. It follows closely the pattern of Producer code I finished up with previously, using the channel-based approach for the Consumer:

  • Create Consumer

  • Subscribe to topic

  • Read messages from the channel as the consumer receives them

  • When we’ve read all messages, exit

I’ve used the channel-based consumer because it fitted the most neatly with my existing code that I was adapting to work as a consumer, and the general concept of consuming from a channel also felt quite idiomatic. However, if you consult the client GitHub repo you’ll see that the channel-based consumer is marked as deprecated, and there is a note in the code as to why this is. I’ll take a look in the next article at using the function-based consumer instead :)

The main thing here is that we use the .Events() channel for which there’s a Go Routine, and so this pattern to wait until we’ve finished with it:

// For signalling termination from main to go-routine
termChan := make(chan bool, 1)
// For signalling that termination is done from go-routine to main
doneChan := make(chan bool)

go func() {
    doTerm := false
    for !doTerm {
        select {

            // channels that we're listening to

        case <-termChan:
            doTerm = true
        }
    }

    close(doneChan)
}()

// …

// We're ready to finish
termChan <- true
// wait for go-routine to terminate
<-doneChan
// Now we can exit
p.Close()

To know when to finish, we listen for PartitionEOF events, which we need to enable when creating the consumer

"enable.partition.eof":     true

When we receive one we’ll assume there’s just the single partition (BIG assumption) and set the doTerm to true to break out of the for loop in the Go routine which then closes the doneChan and the program can exit

case kafka.PartitionEOF:
    // We've finished reading messages on this partition so let's wrap up
    pe := ev.(kafka.PartitionEOF)
    fmt.Printf("🌆 Got to the end of partition %v on topic %v at offset %v\n",
        pe.Partition,
        string(*pe.Topic),
        pe.Offset)
    termChan <- true

The full code looks like this:

package main

import (
	"fmt"

	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	topic := "ratings"

	// --
	// Create Consumer instance
	// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewConsumer

	// Store the config
	cm := kafka.ConfigMap{
		"bootstrap.servers":        "localhost:9092",
		"go.events.channel.enable": true,
		"group.id":                 "rmoff_learning_go",
		"enable.partition.eof":     true}

	// Variable p holds the new Consumer instance.
	c, e := kafka.NewConsumer(&cm)

	// Check for errors in creating the Consumer
	if e != nil {
		if ke, ok := e.(kafka.Error); ok == true {
			switch ec := ke.Code(); ec {
			case kafka.ErrInvalidArg:
				fmt.Printf("😢 Can't create the Consumer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
			default:
				fmt.Printf("😢 Can't create the Consumer (Kafka error code %d)\n\tError: %v\n", ec, e)
			}
		} else {
			// It's not a kafka.Error
			fmt.Printf("😢 Oh noes, there's a generic error creating the Consumer! %v", e.Error())
		}

	} else {

		// For signalling termination from main to go-routine
		termChan := make(chan bool, 1)
		// For signalling that termination is done from go-routine to main
		doneChan := make(chan bool)

		// Subscribe to the topic
		if e := c.Subscribe(topic, nil); e != nil {
			fmt.Printf("☠️ Uh oh, there was an error subscribing to the topic :\n\t%v\n", e)
			termChan <- true
		}

		// Handle the events that we get
		go func() {
			doTerm := false
			for !doTerm {
				// The `select` blocks until one of the `case` conditions
				// are met - therefore we run it in a Go Routine.
				select {
				case ev := <-c.Events():
					// Look at the type of Event we've received
					switch ev.(type) {

					case *kafka.Message:
						// It's a message
						km := ev.(*kafka.Message)
						fmt.Printf("✅ Message '%v' received from topic '%v' (partition %d at offset %d)\n",
							string(km.Value),
							string(*km.TopicPartition.Topic),
							km.TopicPartition.Partition,
							km.TopicPartition.Offset)
					case kafka.PartitionEOF:
						// We've finished reading messages on this partition so let's wrap up
						// n.b. this is a BIG assumption that we are only consuming from one partition
						pe := ev.(kafka.PartitionEOF)
						fmt.Printf("🌆 Got to the end of partition %v on topic %v at offset %v\n",
							pe.Partition,
							string(*pe.Topic),
							pe.Offset)
						termChan <- true
					case kafka.OffsetsCommitted:
						continue
					case kafka.Error:
						// It's an error
						em := ev.(kafka.Error)
						fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)
					default:
						// It's not anything we were expecting
						fmt.Printf("Got an event that's not a Message, Error, or PartitionEOF 👻\n\t%v\n", ev)

					}
				case <-termChan:
					doTerm = true

				}
			}
			close(doneChan)
		}()

		// We'll wait for the Go routine to exit, which will happen once we've read all the messages on the topic
		<-doneChan
		// Now we can exit
		c.Close()

	}

}
consumer01

I run it using a Docker Compose which also runs a data generator in Kafka Connect populating a topic for the consumer to read from. When I shut down Kafka Connect the data generator stops, the consumer reads to the end of the topic, and exits:

…
✅ Message 'Struct{ip=233.245.174.233,userid=13,remote_user=-,time=23811,_time=23811,request=GET /index.html HTTP/1.1,status=407,bytes=4006,referrer=-,agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36}' received from topic 'ratings' (partition 0 at offset 2381)
✅ Message 'Struct{ip=122.145.8.244,userid=9,remote_user=-,time=23821,_time=23821,request=GET /images/track.png HTTP/1.1,status=302,bytes=4006,referrer=-,agent=Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)}' received from topic 'ratings' (partition 0 at offset 2382)
✅ Message 'Struct{ip=111.145.8.144,userid=38,remote_user=-,time=23831,_time=23831,request=GET /site/user_status.html HTTP/1.1,status=406,bytes=4096,referrer=-,agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36}' received from topic 'ratings' (partition 0 at offset 2383)
✅ Message 'Struct{ip=222.245.174.248,userid=36,remote_user=-,time=23841,_time=23841,request=GET /site/user_status.html HTTP/1.1,status=200,bytes=4096,referrer=-,agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36}' received from topic 'ratings' (partition 0 at offset 2384)
🌆 Got to the end of partition 0 on topic ratings at offset 2385

📺 More Episodes… 🔗

  • Kafka and Go

    • S02E00 - Kafka and Go

    • S02E01 - My First Kafka Go Producer

    • S02E02 - Adding error handling to the Producer

    • S02E03 - Kafka Go Consumer (Channel-based)

    • S02E04 - Kafka Go Consumer (Function-based)

    • S02E05 - Kafka Go AdminClient

    • S02E06 - Putting the Producer in a function and handling errors in a Go routine

    • S02E07 - Splitting Go code into separate source files and building a binary executable

    • S02E08 - Checking Kafka advertised.listeners with Go

    • S02E09 - Processing chunked responses before EOF is reached

  • Learning Go

    • S01E00 - Background

    • S01E01 - Pointers

    • S01E02 - Slices

    • S01E03 - Maps

    • S01E04 - Function Closures

    • S01E05 - Interfaces

    • S01E06 - Errors

    • S01E07 - Readers

    • S01E08 - Images

    • S01E09 - Concurrency (Channels, Goroutines)

    • S01E10 - Concurrency (Web Crawler)


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025