Having written my first Kafka producer in Go, and even added error handling to it, the next step was to write a consumer. It follows closely the pattern of Producer code I finished up with previously, using the channel-based approach for the Consumer:
-
Create Consumer
-
Subscribe to topic
-
Read messages from the channel as the consumer receives them
-
When we’ve read all messages, exit
I’ve used the channel-based consumer because it fitted the most neatly with my existing code that I was adapting to work as a consumer, and the general concept of consuming from a channel also felt quite idiomatic. However, if you consult the client GitHub repo you’ll see that the channel-based consumer is marked as deprecated, and there is a note in the code as to why this is. I’ll take a look in the next article at using the function-based consumer instead :) |
The main thing here is that we use the .Events()
channel for which there’s a Go Routine, and so this pattern to wait until we’ve finished with it:
// For signalling termination from main to go-routine
termChan := make(chan bool, 1)
// For signalling that termination is done from go-routine to main
doneChan := make(chan bool)
go func() {
doTerm := false
for !doTerm {
select {
// channels that we're listening to
case <-termChan:
doTerm = true
}
}
close(doneChan)
}()
// …
// We're ready to finish
termChan <- true
// wait for go-routine to terminate
<-doneChan
// Now we can exit
p.Close()
To know when to finish, we listen for PartitionEOF
events, which we need to enable when creating the consumer
"enable.partition.eof": true
When we receive one we’ll assume there’s just the single partition (BIG assumption) and set the doTerm
to true to break out of the for
loop in the Go routine which then closes the doneChan
and the program can exit
case kafka.PartitionEOF:
// We've finished reading messages on this partition so let's wrap up
pe := ev.(kafka.PartitionEOF)
fmt.Printf("🌆 Got to the end of partition %v on topic %v at offset %v\n",
pe.Partition,
string(*pe.Topic),
pe.Offset)
termChan <- true
The full code looks like this:
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
topic := "ratings"
// --
// Create Consumer instance
// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewConsumer
// Store the config
cm := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"go.events.channel.enable": true,
"group.id": "rmoff_learning_go",
"enable.partition.eof": true}
// Variable p holds the new Consumer instance.
c, e := kafka.NewConsumer(&cm)
// Check for errors in creating the Consumer
if e != nil {
if ke, ok := e.(kafka.Error); ok == true {
switch ec := ke.Code(); ec {
case kafka.ErrInvalidArg:
fmt.Printf("😢 Can't create the Consumer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
default:
fmt.Printf("😢 Can't create the Consumer (Kafka error code %d)\n\tError: %v\n", ec, e)
}
} else {
// It's not a kafka.Error
fmt.Printf("😢 Oh noes, there's a generic error creating the Consumer! %v", e.Error())
}
} else {
// For signalling termination from main to go-routine
termChan := make(chan bool, 1)
// For signalling that termination is done from go-routine to main
doneChan := make(chan bool)
// Subscribe to the topic
if e := c.Subscribe(topic, nil); e != nil {
fmt.Printf("☠️ Uh oh, there was an error subscribing to the topic :\n\t%v\n", e)
termChan <- true
}
// Handle the events that we get
go func() {
doTerm := false
for !doTerm {
// The `select` blocks until one of the `case` conditions
// are met - therefore we run it in a Go Routine.
select {
case ev := <-c.Events():
// Look at the type of Event we've received
switch ev.(type) {
case *kafka.Message:
// It's a message
km := ev.(*kafka.Message)
fmt.Printf("✅ Message '%v' received from topic '%v' (partition %d at offset %d)\n",
string(km.Value),
string(*km.TopicPartition.Topic),
km.TopicPartition.Partition,
km.TopicPartition.Offset)
case kafka.PartitionEOF:
// We've finished reading messages on this partition so let's wrap up
// n.b. this is a BIG assumption that we are only consuming from one partition
pe := ev.(kafka.PartitionEOF)
fmt.Printf("🌆 Got to the end of partition %v on topic %v at offset %v\n",
pe.Partition,
string(*pe.Topic),
pe.Offset)
termChan <- true
case kafka.OffsetsCommitted:
continue
case kafka.Error:
// It's an error
em := ev.(kafka.Error)
fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)
default:
// It's not anything we were expecting
fmt.Printf("Got an event that's not a Message, Error, or PartitionEOF 👻\n\t%v\n", ev)
}
case <-termChan:
doTerm = true
}
}
close(doneChan)
}()
// We'll wait for the Go routine to exit, which will happen once we've read all the messages on the topic
<-doneChan
// Now we can exit
c.Close()
}
}
I run it using a Docker Compose which also runs a data generator in Kafka Connect populating a topic for the consumer to read from. When I shut down Kafka Connect the data generator stops, the consumer reads to the end of the topic, and exits:
…
✅ Message 'Struct{ip=233.245.174.233,userid=13,remote_user=-,time=23811,_time=23811,request=GET /index.html HTTP/1.1,status=407,bytes=4006,referrer=-,agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36}' received from topic 'ratings' (partition 0 at offset 2381)
✅ Message 'Struct{ip=122.145.8.244,userid=9,remote_user=-,time=23821,_time=23821,request=GET /images/track.png HTTP/1.1,status=302,bytes=4006,referrer=-,agent=Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)}' received from topic 'ratings' (partition 0 at offset 2382)
✅ Message 'Struct{ip=111.145.8.144,userid=38,remote_user=-,time=23831,_time=23831,request=GET /site/user_status.html HTTP/1.1,status=406,bytes=4096,referrer=-,agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36}' received from topic 'ratings' (partition 0 at offset 2383)
✅ Message 'Struct{ip=222.245.174.248,userid=36,remote_user=-,time=23841,_time=23841,request=GET /site/user_status.html HTTP/1.1,status=200,bytes=4096,referrer=-,agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36}' received from topic 'ratings' (partition 0 at offset 2384)
🌆 Got to the end of partition 0 on topic ratings at offset 2385