Learning Golang (some rough notes) - S02E04 - Kafka Go Consumer (Function-based)

Published by in Go, Golang, Kafka, Kafka Consumer API at https://rmoff.net/2020/07/14/learning-golang-some-rough-notes-s02e04-kafka-go-consumer-function-based/

Last time I looked at creating my first Apache Kafka consumer in Go, which used the now-deprecated channel-based consumer. Whilst idiomatic for Go, it has some issues which mean that the function-based consumer is recommended for use instead. So let’s go and use it!

Instead of reading from the Events() channel of the consumer, we read events using the Poll() function with a timeout. The way we handle events (a switch based on their type) is the same:

switch ev.(type) {

    case *kafka.Message:
        // It's a message

    case kafka.PartitionEOF:
        // We've finished reading messages on this partition so let's wrap up
}

We also remove the Go routine and its slightly more complex execution logic in which channels were used to indicate when to terminate processing, and instead just use a for loop:

doTerm := false
for !doTerm {
    // do polling until we're done
}

Just like in the previous example, when we receive a PartitionEOF we then go to exit (since we make the BIG assumption that we’re only consuming from one partition)

The full code looks like this:

package main

import (
	"fmt"

	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	topic := "ratings"

	// --
	// Create Consumer instance
	// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewConsumer

	// Store the config
	cm := kafka.ConfigMap{
		"bootstrap.servers":    "localhost:9092",
		"group.id":             "rmoff_learning_go_foo",
		"enable.partition.eof": true}

	// Variable p holds the new Consumer instance.
	c, e := kafka.NewConsumer(&cm)

	// Check for errors in creating the Consumer
	if e != nil {
		if ke, ok := e.(kafka.Error); ok == true {
			switch ec := ke.Code(); ec {
			case kafka.ErrInvalidArg:
				fmt.Printf("😢 Can't create the Consumer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
			default:
				fmt.Printf("😢 Can't create the Consumer (Kafka error code %d)\n\tError: %v\n", ec, e)
			}
		} else {
			// It's not a kafka.Error
			fmt.Printf("😢 Oh noes, there's a generic error creating the Consumer! %v", e.Error())
		}

	} else {

		// Subscribe to the topic
		if e := c.Subscribe(topic, nil); e != nil {
			fmt.Printf("☠️ Uh oh, there was an error subscribing to the topic :\n\t%v\n", e)

		} else {

			doTerm := false
			for !doTerm {
				if ev := c.Poll(1000); ev == nil {
					// the Poll timed out and we got nothin'
					fmt.Printf("……\n")
					continue
				} else {
					// The poll pulled an event, let's now
					// look at the type of Event we've received
					switch ev.(type) {

					case *kafka.Message:
						// It's a message
						km := ev.(*kafka.Message)
						fmt.Printf("✅ Message '%v' received from topic '%v' (partition %d at offset %d)\n",
							string(km.Value),
							string(*km.TopicPartition.Topic),
							km.TopicPartition.Partition,
							km.TopicPartition.Offset)

					case kafka.PartitionEOF:
						// We've finished reading messages on this partition so let's wrap up
						// n.b. this is a BIG assumption that we are only consuming from one partition
						pe := ev.(kafka.PartitionEOF)
						fmt.Printf("🌆 Got to the end of partition %v on topic %v at offset %v\n",
							pe.Partition,
							string(*pe.Topic),
							pe.Offset)
						doTerm = true

					case kafka.OffsetsCommitted:
						continue

					case kafka.Error:
						// It's an error
						em := ev.(kafka.Error)
						fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)

					default:
						// It's not anything we were expecting
						fmt.Printf("Got an event that's not a Message, Error, or PartitionEOF 👻\n\t%v\n", ev)

					}

				}
			}
			fmt.Printf("👋 … and we're done. Closing the consumer and exiting.\n")
			// Now we can exit
			c.Close()

		}
	}

}

I run it using a Docker Compose which also runs a data generator in Kafka Connect populating a topic for the consumer to read from. When I shut down Kafka Connect the data generator stops, the consumer reads to the end of the topic, and exits:

……
……
……
✅ Message 'Struct{ip=122.249.79.233,userid=20,remote_user=-,time=81,_time=81,request=GET /site/login.html HTTP/1.1,status=405,bytes=1289,referrer=-,agent=Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)}' received from topic 'ratings' (partition 0 at offset 2522)
✅ Message 'Struct{ip=222.245.174.248,userid=14,remote_user=-,time=91,_time=91,request=GET /index.html HTTP/1.1,status=404,bytes=278,referrer=-,agent=Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)}' received from topic 'ratings' (partition 0 at offset 2523)
🌆 Got to the end of partition 0 on topic ratings at offset 2524
👋 … and we're done. Closing the consumer and exiting.
TABLE OF CONTENTS