Kafka

Apache Kafka is one of the most popular Pub/Subs. We are providing Pub/Sub implementation based on IBM Sarama .

You can find a fully functional example with Kafka in the Watermill examples .

Installation

go get github.com/ThreeDotsLabs/watermill-kafka/v3

Characteristics

FeatureImplementsNote
ConsumerGroupsyes
ExactlyOnceDeliverynoin theory can be achieved with Transactions , currently no support for any Golang client
GuaranteedOrderyesrequire partition key usage
Persistentyes

Configuration

// ...
type SubscriberConfig struct {
	// Kafka brokers list.
	Brokers []string

	// Unmarshaler is used to unmarshal messages from Kafka format into Watermill format.
	Unmarshaler Unmarshaler

	// OverwriteSaramaConfig holds additional sarama settings.
	OverwriteSaramaConfig *sarama.Config

	// Kafka consumer group.
	// When empty, all messages from all partitions will be returned.
	ConsumerGroup string

	// How long after Nack message should be redelivered.
	NackResendSleep time.Duration

	// How long about unsuccessful reconnecting next reconnect will occur.
	ReconnectRetrySleep time.Duration

	InitializeTopicDetails *sarama.TopicDetail

	// If true then each consumed message will be wrapped with Opentelemetry tracing, provided by otelsarama.
	//
	// Deprecated: pass OTELSaramaTracer to Tracer field instead.
	OTELEnabled bool

	// Tracer is used to trace Kafka messages.
	// If nil, then no tracing will be used.
	Tracer SaramaTracer
}

// NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
const NoSleep time.Duration = -1

func (c *SubscriberConfig) setDefaults() {
	if c.OverwriteSaramaConfig == nil {
		c.OverwriteSaramaConfig = DefaultSaramaSubscriberConfig()
	}
	if c.NackResendSleep == 0 {
		c.NackResendSleep = time.Millisecond * 100
	}
	if c.ReconnectRetrySleep == 0 {
		c.ReconnectRetrySleep = time.Second
	}
}

func (c SubscriberConfig) Validate() error {
	if len(c.Brokers) == 0 {
		return errors.New("missing brokers")
	}
	if c.Unmarshaler == nil {
		return errors.New("missing unmarshaler")
	}

	return nil
}

// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
//	saramaConfig := DefaultSaramaSubscriberConfig()
//	saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
//	subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
//	subscriber, err := NewSubscriber(subscriberConfig, logger)
//	// ...
func DefaultSaramaSubscriberConfig() *sarama.Config {
	config := sarama.NewConfig()
	config.Version = sarama.V1_0_0_0
	config.Consumer.Return.Errors = true
	config.ClientID = "watermill"

	return config
}

// Subscribe subscribers for messages in Kafka.
// ...

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go

Passing custom Sarama config

You can pass custom config parameters via overwriteSaramaConfig *sarama.Config in NewSubscriber and NewPublisher. When nil is passed, default config is used (DefaultSaramaSubscriberConfig).

// ...
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
//	saramaConfig := DefaultSaramaSubscriberConfig()
//	saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
//	subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
//	subscriber, err := NewSubscriber(subscriberConfig, logger)
//	// ...
func DefaultSaramaSubscriberConfig() *sarama.Config {
	config := sarama.NewConfig()
	config.Version = sarama.V1_0_0_0
	config.Consumer.Return.Errors = true
	config.ClientID = "watermill"

	return config
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go

Connecting

Publisher

// ...
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
	config PublisherConfig,
	logger watermill.LoggerAdapter,
) (*Publisher, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go

Example:

// ...
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// equivalent of auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

Subscriber

// ...
// NewSubscriber creates a new Kafka Subscriber.
func NewSubscriber(
	config SubscriberConfig,
	logger watermill.LoggerAdapter,
) (*Subscriber, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go

Example:

// ...
	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

Publishing

// ...
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
// ...

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go

Subscribing

// ...
// Subscribe subscribers for messages in Kafka.
//
// There are multiple subscribers spawned
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go

Marshaler

Watermill’s messages cannot be directly sent to Kafka - they need to be marshaled. You can implement your marshaler or use default implementation.

// ...
// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
	Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}

// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
	Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}

type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

type DefaultMarshaler struct{}

func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go

Partitioning

Our Publisher has support for the partitioning mechanism.

It can be done with special Marshaler implementation:

// ...
type kafkaJsonWithPartitioning struct {
	DefaultMarshaler

	generatePartitionKey GeneratePartitionKey
}

func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler {
	return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
}

func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go

When using, you need to pass your function to generate partition key. It’s a good idea to pass this partition key with metadata to not unmarshal entire message.

marshaler := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
    return msg.Metadata.Get("partition"), nil
})

Check our online hands-on training