rmoff's random ramblings
about talks

Learning Golang (some rough notes) - S02E05 - Kafka Go AdminClient

Published Jul 15, 2020 by in Go, Golang, Kafka, Kafka AdminClient API at https://rmoff.net/2020/07/15/learning-golang-some-rough-notes-s02e05-kafka-go-adminclient/

Having ticked off the basics with an Apache Kafka producer and consumer in Go, let’s now check out the AdminClient. This is useful for checking out metadata about the cluster, creating topics, and stuff like that.

Contexts 🔗

To use some of the functions that the AdminClient provides I had to read up on Context, which I’d not encountered on my brief journey with Go so far. The tl;dr is that a context provides a clean way for functions to timeout or cancel their operation across function calls. Or to put it another way:

Package context defines the Context type, which carries deadlines, cancellation signals, and other request-scoped values across API boundaries and between processes.

This is what it looks like in operation. You define the context (in this with a timeout):

import (
	"context"
	"time"
// …
)

// …
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Note that you can also do it like this…

ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

…but per the code comment in the example given it’s good practice to make sure cancel is called as soon as the context is done with.

Another thing to note is the nice way to specify time periods in Go. Instead of having to check the API documentation each time as to whether you’re specifying seconds, microseconds, etc, and then doing the necessary maths on the time period that you want to specify, you can instead just use human-friendly notation such as:

  • 5*time.Microsecond

  • 30*time.Second

So, having defined the context, we pass it when invoking a function call that requires it, such as ClusterID() in the AdminClient:

c, e := a.ClusterID(ctx)

More completely, the code looks something like this:

// Get the ClusterID
if c, e := a.ClusterID(ctx); e != nil {
    fmt.Printf("😢 Error getting ClusterID\n\tError: %v\n", e)
} else {
    fmt.Printf("✔️ ClusterID: %v\n", c)
}

If the context times out then an error is returned:

😢 Error getting ClusterID
	Error: context.deadlineExceededError context deadline exceeded

Note that if you want to use the context in successive calls, the timeout does not reset on each use. So if you have something like this:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Call the first thing
doSomething(ctx)

// Call the second thing
doSomethingElse(ctx)

The timeout of five seconds includes the execution of the second function. If you want to reset it in between then you’d do this:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Call the first thing
doSomething(ctx)

// Start the context timer again
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)

// Call the second thing
doSomethingElse(ctx)

AdminClient 🔗

The docs list comprehensively the functions available from the AdminClient. Here’s a simple example that shows using some of them to list information about the cluster:

package main

import (
	"context"
	"fmt"
	"time"

	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	// --
	// Create AdminClient instance
	// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewAdminClient

	// Store the config
	cm := kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092"}

	// Variable p holds the new AdminClient instance.
	a, e := kafka.NewAdminClient(&cm)
	// Make sure we close it when we're done
	defer a.Close()

	// Check for errors in creating the AdminClient
	if e != nil {
		if ke, ok := e.(kafka.Error); ok == true {
			switch ec := ke.Code(); ec {
			case kafka.ErrInvalidArg:
				fmt.Printf("😢 Can't create the AdminClient because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
			default:
				fmt.Printf("😢 Can't create the AdminClient (Kafka error code %d)\n\tError: %v\n", ec, e)
			}
		} else {
			// It's not a kafka.Error
			fmt.Printf("😢 Oh noes, there's a generic error creating the AdminClient! %v", e.Error())
		}

	} else {

		fmt.Println("✔️ Created AdminClient")

		// Create a context for use when calling some of these functions
		// This lets you set a variable timeout on invoking these calls
		// If the timeout passes then an error is returned.
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()

		// Get the ClusterID
		if c, e := a.ClusterID(ctx); e != nil {
			fmt.Printf("😢 Error getting ClusterID\n\tError: %v\n", e)
		} else {
			fmt.Printf("✔️ ClusterID: %v\n", c)
		}

		// Start the context timer again (otherwise it carries on from the original deadline)
		ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)

		// Get the ControllerID
		if c, e := a.ControllerID(ctx); e != nil {
			fmt.Printf("😢 Error getting ControllerID\n\tError: %v\n", e)
		} else {
			fmt.Printf("✔️ ControllerID: %v\n", c)
		}

		// Get some metadata
		if md, e := a.GetMetadata(nil, false, int(5*time.Second)); e != nil {
			fmt.Printf("😢 Error getting cluster Metadata\n\tError: %v\n", e)
		} else {
			// Print the originating broker info
			fmt.Printf("✔️ Metadata [Originating broker]\n")
			b := md.OriginatingBroker
			fmt.Printf("\t[ID %d] %v\n", b.ID, b.Host)

			// Print the brokers
			fmt.Printf("✔️ Metadata [brokers]\n")
			for _, b := range md.Brokers {
				fmt.Printf("\t[ID %d] %v:%d\n", b.ID, b.Host, b.Port)
			}
			// Print the topics
			fmt.Printf("✔️ Metadata [topics]\n")
			for _, t := range md.Topics {
				fmt.Printf("\t(%v partitions)\t%v\n", len(t.Partitions), t.Topic)
			}
		}
		fmt.Printf("\n\n👋 … and we're done.\n")
	}
}

The output looks like this:

✔️ Created AdminClient
✔️ ClusterID: hukPYvRVTF2nU8efMXUq6g
✔️ ControllerID: 1
✔️ Metadata [Originating broker]
	[ID 1] localhost:9092/1
✔️ Metadata [brokers]
	[ID 1] localhost:9092
✔️ Metadata [topics]
	(5 partitions)	_kafka-connect-01-status
	(1 partitions)	ratings
	(1 partitions)	__confluent.support.metrics
	(25 partitions)	_kafka-connect-01-offsets
	(1 partitions)	_kafka-connect-01-configs
	(50 partitions)	__consumer_offsets


👋 … and we're done.

📺 More Episodes… 🔗

  • Kafka and Go

    • S02E00 - Kafka and Go

    • S02E01 - My First Kafka Go Producer

    • S02E02 - Adding error handling to the Producer

    • S02E03 - Kafka Go Consumer (Channel-based)

    • S02E04 - Kafka Go Consumer (Function-based)

    • S02E05 - Kafka Go AdminClient

    • S02E06 - Putting the Producer in a function and handling errors in a Go routine

    • S02E07 - Splitting Go code into separate source files and building a binary executable

    • S02E08 - Checking Kafka advertised.listeners with Go

    • S02E09 - Processing chunked responses before EOF is reached

  • Learning Go

    • S01E00 - Background

    • S01E01 - Pointers

    • S01E02 - Slices

    • S01E03 - Maps

    • S01E04 - Function Closures

    • S01E05 - Interfaces

    • S01E06 - Errors

    • S01E07 - Readers

    • S01E08 - Images

    • S01E09 - Concurrency (Channels, Goroutines)

    • S01E10 - Concurrency (Web Crawler)


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025