rmoff's random ramblings
about talks

Learning Golang (some rough notes) - S02E04 - Kafka Go Consumer (Function-based)

Published Jul 14, 2020 by in Go, Golang, Kafka, Kafka Consumer API at https://rmoff.net/2020/07/14/learning-golang-some-rough-notes-s02e04-kafka-go-consumer-function-based/

Last time I looked at creating my first Apache Kafka consumer in Go, which used the now-deprecated channel-based consumer. Whilst idiomatic for Go, it has some issues which mean that the function-based consumer is recommended for use instead. So let’s go and use it!

Instead of reading from the Events() channel of the consumer, we read events using the Poll() function with a timeout. The way we handle events (a switch based on their type) is the same:

switch ev.(type) {

    case *kafka.Message:
        // It's a message

    case kafka.PartitionEOF:
        // We've finished reading messages on this partition so let's wrap up
}

We also remove the Go routine and its slightly more complex execution logic in which channels were used to indicate when to terminate processing, and instead just use a for loop:

doTerm := false
for !doTerm {
    // do polling until we're done
}

Just like in the previous example, when we receive a PartitionEOF we then go to exit (since we make the BIG assumption that we’re only consuming from one partition)

The full code looks like this:

package main

import (
	"fmt"

	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	topic := "ratings"

	// --
	// Create Consumer instance
	// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewConsumer

	// Store the config
	cm := kafka.ConfigMap{
		"bootstrap.servers":    "localhost:9092",
		"group.id":             "rmoff_learning_go_foo",
		"enable.partition.eof": true}

	// Variable p holds the new Consumer instance.
	c, e := kafka.NewConsumer(&cm)

	// Check for errors in creating the Consumer
	if e != nil {
		if ke, ok := e.(kafka.Error); ok == true {
			switch ec := ke.Code(); ec {
			case kafka.ErrInvalidArg:
				fmt.Printf("😒 Can't create the Consumer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
			default:
				fmt.Printf("😒 Can't create the Consumer (Kafka error code %d)\n\tError: %v\n", ec, e)
			}
		} else {
			// It's not a kafka.Error
			fmt.Printf("😒 Oh noes, there's a generic error creating the Consumer! %v", e.Error())
		}

	} else {

		// Subscribe to the topic
		if e := c.Subscribe(topic, nil); e != nil {
			fmt.Printf("☠️ Uh oh, there was an error subscribing to the topic :\n\t%v\n", e)

		} else {

			doTerm := false
			for !doTerm {
				if ev := c.Poll(1000); ev == nil {
					// the Poll timed out and we got nothin'
					fmt.Printf("……\n")
					continue
				} else {
					// The poll pulled an event, let's now
					// look at the type of Event we've received
					switch ev.(type) {

					case *kafka.Message:
						// It's a message
						km := ev.(*kafka.Message)
						fmt.Printf("βœ… Message '%v' received from topic '%v' (partition %d at offset %d)\n",
							string(km.Value),
							string(*km.TopicPartition.Topic),
							km.TopicPartition.Partition,
							km.TopicPartition.Offset)

					case kafka.PartitionEOF:
						// We've finished reading messages on this partition so let's wrap up
						// n.b. this is a BIG assumption that we are only consuming from one partition
						pe := ev.(kafka.PartitionEOF)
						fmt.Printf("πŸŒ† Got to the end of partition %v on topic %v at offset %v\n",
							pe.Partition,
							string(*pe.Topic),
							pe.Offset)
						doTerm = true

					case kafka.OffsetsCommitted:
						continue

					case kafka.Error:
						// It's an error
						em := ev.(kafka.Error)
						fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)

					default:
						// It's not anything we were expecting
						fmt.Printf("Got an event that's not a Message, Error, or PartitionEOF πŸ‘»\n\t%v\n", ev)

					}

				}
			}
			fmt.Printf("πŸ‘‹ … and we're done. Closing the consumer and exiting.\n")
			// Now we can exit
			c.Close()

		}
	}

}

I run it using a Docker Compose which also runs a data generator in Kafka Connect populating a topic for the consumer to read from. When I shut down Kafka Connect the data generator stops, the consumer reads to the end of the topic, and exits:

……
……
……
βœ… Message 'Struct{ip=122.249.79.233,userid=20,remote_user=-,time=81,_time=81,request=GET /site/login.html HTTP/1.1,status=405,bytes=1289,referrer=-,agent=Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)}' received from topic 'ratings' (partition 0 at offset 2522)
βœ… Message 'Struct{ip=222.245.174.248,userid=14,remote_user=-,time=91,_time=91,request=GET /index.html HTTP/1.1,status=404,bytes=278,referrer=-,agent=Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)}' received from topic 'ratings' (partition 0 at offset 2523)
πŸŒ† Got to the end of partition 0 on topic ratings at offset 2524
πŸ‘‹ … and we're done. Closing the consumer and exiting.

πŸ“Ί More Episodes… πŸ”—

  • Kafka and Go

    • S02E00 - Kafka and Go

    • S02E01 - My First Kafka Go Producer

    • S02E02 - Adding error handling to the Producer

    • S02E03 - Kafka Go Consumer (Channel-based)

    • S02E04 - Kafka Go Consumer (Function-based)

    • S02E05 - Kafka Go AdminClient

    • S02E06 - Putting the Producer in a function and handling errors in a Go routine

    • S02E07 - Splitting Go code into separate source files and building a binary executable

    • S02E08 - Checking Kafka advertised.listeners with Go

    • S02E09 - Processing chunked responses before EOF is reached

  • Learning Go

    • S01E00 - Background

    • S01E01 - Pointers

    • S01E02 - Slices

    • S01E03 - Maps

    • S01E04 - Function Closures

    • S01E05 - Interfaces

    • S01E06 - Errors

    • S01E07 - Readers

    • S01E08 - Images

    • S01E09 - Concurrency (Channels, Goroutines)

    • S01E10 - Concurrency (Web Crawler)


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025