Introducing xkafka - Kafka, but Simpler (for Go)
I've spent a fair bit of time writing Kafka consumers and producers in Go. If you've used confluent-kafka-go, you know the drill.
Your consumer probably looks something like this:
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{/*...*/}) err = consumer.SubscribeTopics([]string{/*...*/}, nil) // some way to cancel and stop the consumer run := true for run { msg, err := consumer.ReadMessage(time.Second) if !err.(kafka.Error).IsTimeout() { // handle error from consumer/broker } // process message // manually commit the offset, if needed } consumer.Close()
There's a lot that goes into the processing loop:
- read messages
- handle Kafka and application errors
- retry transient errors
- metrics, logging, tracing, etc.
- secondary dead letter queues
- and, of course, wiring all this together
A surprising amount of code isn't really about your application logic. If you're building something that consumes more than one kind of message, this quickly gets verbose. Most of the code is just scaffolding.
What if we could make using Kafka, in Go, feel more like writing a simple HTTP service?
HTTP-like Kafka
xkafka is a Go library that provides HTTP-like abstractions for Kafka. It tries to make working with Kafka feel a bit more like writing a simple HTTP service, and a lot less boilerplate and plumbing.
Here are the core abstractions:
- Message: Like an HTTP request. It has the topic, partition, offset, key, value, headers, and so on. It also allows callbacks to track message processing.
- Handler: Like an HTTP handler. It's where your business logic lives.
- Middleware: Just like HTTP middleware, but for Kafka. You can add logging, metrics, retries, etc., without cluttering your core logic.
Publishing Messages
First, let's get simple things out of the way. Here's what publishing a message looks like with xkafka:
producer, err := xkafka.NewProducer( "producer-id", xkafka.Brokers{"localhost:9092"}, xkafka.ConfigMap{ "socket.keepalive.enable": true, }, ) producer.Use(/* add middlewares */) msg := &xkafka.Message{ Topic: "test", Key: []byte("key"), Value: []byte("value"), } err = producer.Publish(ctx, msg)
That's it. You can also publish asynchronously if you want higher throughput or want to handle delivery events asynchronously:
producer, err := xkafka.NewProducer( // ... // configure a callback to handle delivery events xkafka.DeliveryCallback(func(msg *xkafka.Message) { // ... }), ) // ...create a message // or, configure a callback on the message itself msg.AddCallback(func(msg *xkafka.Message) { // ... }) // start the producer. this will start a background goroutine // that will handle message delivery events. go producer.Run(ctx) // publish a message. this will return immediately. err = producer.AsyncPublish(ctx, msg)
Consuming Messages
Let's talk about the other side of Kafka: consuming messages. In my experience, this is where most of the complexity (and headaches) with Kafka show up. There are so many ways to configure and process messages in a consumer. The tradeoffs between throughput, durability, and delivery guarantees can get confusing and complicated.
xkafka distills the most common patterns into a few simple abstractions and sensible defaults, while still giving you the flexibility to tune things for your needs.
Streaming vs. Batch
There are two main ways to consume messages:
- Streaming (with
xkafka.Consumer
): You process messages one at a time, as soon as they arrive. This is great for low-throughput systems, or when you want to keep memory usage low and have strong processing guarantees. - Batch (with
xkafka.BatchConsumer
): You process messages in batches, either by size or by time window. This is useful for high-throughput systems, or when you want to buffer spikes and avoid hammering downstream systems or databases with every single message.
Both approaches keep messages in order. With batches, you can control the size or frequency of those batches.
consumer, err := xkafka.NewBatchConsumer( // ... xkafka.BatchSize(100), // batch size xkafka.BatchTimeout(15*time.Second), // time window )
Sequential or Async
After reading a message or batch, xkafka.Concurrency(N)
determines how messages or batches are processed:
- Sequential: Default. One message or batch at a time. The next one isn't read until you're done with the current one.
- Asynchronous: N > 1. Multiple messages or batches are processed in parallel.
Offsets
One thing that always tripped me up with Kafka consumers is offset management. By default, Kafka moves the offset forward as soon as it delivers a message, not when you finish processing it. That means if your downstream is temporarily down, or your app crashes mid-processing, you might lose messages.
To solve this, I have seen developers add a separate database or queue to guarantee message processing. This adds another system to maintain and an additional point of failure. This is unnecessary.
xkafka simply sets enable.auto.offset.store=false
and only stores the offset after the handler finishes processing the message or batch. So if something goes wrong, you'll just re-process the last message, not lose it. For batches, it tracks the highest offset, per topic and partition, in the batch.
This means you don't need a separate database or queue just to keep track of what you've processed. Kafka handles it for you.
Note: If you are tracking the Kafka lag, remember that increasing lag is not a bad thing. Instead of optimizing for zero lag by offloading messages to another queue, you should focus on improving throughput of your downstream systems.
At-Most-Once Guarantee
By default, xkafka relies on Kafka's enable.auto.commit=true
and auto.commit.interval.ms
to commit offsets, periodically in the background.
By enabling xkafka.ManualCommit(true)
in sequential mode, you can achieve at-most-once processing guarantees for each message or batch. xkafka ensures that the offset is committed before reading the next message.
At-Least-Once Guarantee
If you combine xkafka.ManualCommit(true)
with xkafka.Concurrency(N > 1)
, you can process messages or batches in parallel, while xkafka will ensure offsets are committed synchronously in order. This way, you get at-least-once processing guarantees.
Error Handling
One of the tricky parts of Kafka is handling broker errors, application errors, transient errors, and retries. xkafka allows you to handle errors in three different ways:
Handler Level
The simplest way is to handle application errors in your handler implementation itself.
handler := func(ctx context.Context, msg *xkafka.Message) error { err := processMessage(ctx, msg) if err != nil { // log and/or trigger alert // optionally, move message to a dead letter topic or queue msg.AckSkip() return nil } msg.AckSuccess() return nil }
Middleware Level
Middleware is a great way to reuse application-specific error handling logic across handlers and consumers.
handler := xkafka.HandlerFunc(func(ctx context.Context, msg *xkafka.Message) error { // ... if err != nil { // propagate error to middlewares msg.AckFail(err) return err } // ack the message msg.AckSuccess() return nil })
You can use a combination of retry and custom error handling middlewares to implement different retry strategies.
consumer.Use( RetryMiddleware(/*...*/), xkafka.MiddlewareFunc(func(next xkafka.Handler) xkafka.Handler { return xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error { err := next.Handle(ctx, m) if errors.Is(err, app.SomeError) { // handle application error } // differentiate between transient, retryable errors // and permanent failures return err }) }), )
Global Level
xkafka.ErrorHandler
is a mandatory option when creating a producer or consumer. Kafka broker and library errors are only visible to the xkafka.ErrorHandler
.
consumer, err := xkafka.NewConsumer( // ... xkafka.ErrorHandler(func(err error) error { // returning a non-nil error will stop the consumer return err }), )
This layered approach forces you to think about error boundaries and how you want to handle errors in your application.