Kafka
Apache Kafka is one of the most popular Pub/Subs. We are providing Pub/Sub implementation based on IBM Sarama .
You can find a fully functional example with Kafka in the Watermill examples .
Installation
go get github.com/ThreeDotsLabs/watermill-kafka/v3
Characteristics
Feature | Implements | Note |
---|---|---|
ConsumerGroups | yes | |
ExactlyOnceDelivery | no | in theory can be achieved with Transactions , currently no support for any Golang client |
GuaranteedOrder | yes | require partition key usage |
Persistent | yes |
Configuration
// ...
type SubscriberConfig struct {
// Kafka brokers list.
Brokers []string
// Unmarshaler is used to unmarshal messages from Kafka format into Watermill format.
Unmarshaler Unmarshaler
// OverwriteSaramaConfig holds additional sarama settings.
OverwriteSaramaConfig *sarama.Config
// Kafka consumer group.
// When empty, all messages from all partitions will be returned.
ConsumerGroup string
// How long after Nack message should be redelivered.
NackResendSleep time.Duration
// How long about unsuccessful reconnecting next reconnect will occur.
ReconnectRetrySleep time.Duration
InitializeTopicDetails *sarama.TopicDetail
// If true then each consumed message will be wrapped with Opentelemetry tracing, provided by otelsarama.
//
// Deprecated: pass OTELSaramaTracer to Tracer field instead.
OTELEnabled bool
// Tracer is used to trace Kafka messages.
// If nil, then no tracing will be used.
Tracer SaramaTracer
}
// NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
const NoSleep time.Duration = -1
func (c *SubscriberConfig) setDefaults() {
if c.OverwriteSaramaConfig == nil {
c.OverwriteSaramaConfig = DefaultSaramaSubscriberConfig()
}
if c.NackResendSleep == 0 {
c.NackResendSleep = time.Millisecond * 100
}
if c.ReconnectRetrySleep == 0 {
c.ReconnectRetrySleep = time.Second
}
}
func (c SubscriberConfig) Validate() error {
if len(c.Brokers) == 0 {
return errors.New("missing brokers")
}
if c.Unmarshaler == nil {
return errors.New("missing unmarshaler")
}
return nil
}
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
// saramaConfig := DefaultSaramaSubscriberConfig()
// saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
// subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
// subscriber, err := NewSubscriber(subscriberConfig, logger)
// // ...
func DefaultSaramaSubscriberConfig() *sarama.Config {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.ClientID = "watermill"
return config
}
// Subscribe subscribers for messages in Kafka.
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
Passing custom Sarama
config
You can pass custom config
parameters via overwriteSaramaConfig *sarama.Config
in NewSubscriber
and NewPublisher
.
When nil
is passed, default config is used (DefaultSaramaSubscriberConfig
).
// ...
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
// saramaConfig := DefaultSaramaSubscriberConfig()
// saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
// subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
// subscriber, err := NewSubscriber(subscriberConfig, logger)
// // ...
func DefaultSaramaSubscriberConfig() *sarama.Config {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.ClientID = "watermill"
return config
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
Connecting
Publisher
// ...
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
config PublisherConfig,
logger watermill.LoggerAdapter,
) (*Publisher, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go
Example:
// ...
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
Subscriber
// ...
// NewSubscriber creates a new Kafka Subscriber.
func NewSubscriber(
config SubscriberConfig,
logger watermill.LoggerAdapter,
) (*Subscriber, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
Example:
// ...
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// equivalent of auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
Publishing
// ...
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go
Subscribing
// ...
// Subscribe subscribers for messages in Kafka.
//
// There are multiple subscribers spawned
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
Marshaler
Watermill’s messages cannot be directly sent to Kafka - they need to be marshaled. You can implement your marshaler or use default implementation.
// ...
// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}
// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}
type MarshalerUnmarshaler interface {
Marshaler
Unmarshaler
}
type DefaultMarshaler struct{}
func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go
Partitioning
Our Publisher has support for the partitioning mechanism.
It can be done with special Marshaler implementation:
// ...
type kafkaJsonWithPartitioning struct {
DefaultMarshaler
generatePartitionKey GeneratePartitionKey
}
func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler {
return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
}
func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go
When using, you need to pass your function to generate partition key. It’s a good idea to pass this partition key with metadata to not unmarshal entire message.
marshaler := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
return msg.Metadata.Get("partition"), nil
})
Please note that in the example above, if the partition
key is missing from the message metadata, an empty string ""
will be used as the partitioning key. This will cause all such messages to be routed to the same partition, which may not be the desired behavior and could lead to uneven load distribution.