Kafka On this page Apache Kafka is one of the most popular Pub/Subs. We are providing Pub/Sub implementation based on IBM Sarama
.
You can find a fully functional example with Kafka in the Watermill examples
.
Installation# go get github.com/ThreeDotsLabs/watermill-kafka/v3
Characteristics# Feature Implements Note ConsumerGroups yes ExactlyOnceDelivery no in theory can be achieved with Transactions
, currently no support for any Golang client GuaranteedOrder yes require partition key usage Persistent yes
Configuration# // ...
type SubscriberConfig struct {
// Kafka brokers list.
Brokers [] string
// Unmarshaler is used to unmarshal messages from Kafka format into Watermill format.
Unmarshaler Unmarshaler
// OverwriteSaramaConfig holds additional sarama settings.
OverwriteSaramaConfig * sarama . Config
// Kafka consumer group.
// When empty, all messages from all partitions will be returned.
ConsumerGroup string
// How long after Nack message should be redelivered.
NackResendSleep time . Duration
// How long about unsuccessful reconnecting next reconnect will occur.
ReconnectRetrySleep time . Duration
InitializeTopicDetails * sarama . TopicDetail
// If true then each consumed message will be wrapped with Opentelemetry tracing, provided by otelsarama.
//
// Deprecated: pass OTELSaramaTracer to Tracer field instead.
OTELEnabled bool
// Tracer is used to trace Kafka messages.
// If nil, then no tracing will be used.
Tracer SaramaTracer
}
// NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
const NoSleep time . Duration = - 1
func ( c * SubscriberConfig ) setDefaults () {
if c . OverwriteSaramaConfig == nil {
c . OverwriteSaramaConfig = DefaultSaramaSubscriberConfig ()
}
if c . NackResendSleep == 0 {
c . NackResendSleep = time . Millisecond * 100
}
if c . ReconnectRetrySleep == 0 {
c . ReconnectRetrySleep = time . Second
}
}
func ( c SubscriberConfig ) Validate () error {
if len ( c . Brokers ) == 0 {
return errors . New ( "missing brokers" )
}
if c . Unmarshaler == nil {
return errors . New ( "missing unmarshaler" )
}
return nil
}
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
// saramaConfig := DefaultSaramaSubscriberConfig()
// saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
// subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
// subscriber, err := NewSubscriber(subscriberConfig, logger)
// // ...
func DefaultSaramaSubscriberConfig () * sarama . Config {
config := sarama . NewConfig ()
config . Version = sarama . V1_0_0_0
config . Consumer . Return . Errors = true
config . ClientID = "watermill"
return config
}
// Subscribe subscribers for messages in Kafka.
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
Passing custom Sarama
config# You can pass custom config
parameters via overwriteSaramaConfig *sarama.Config
in NewSubscriber
and NewPublisher
.
When nil
is passed, default config is used (DefaultSaramaSubscriberConfig
).
// ...
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
// saramaConfig := DefaultSaramaSubscriberConfig()
// saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
// subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
// subscriber, err := NewSubscriber(subscriberConfig, logger)
// // ...
func DefaultSaramaSubscriberConfig () * sarama . Config {
config := sarama . NewConfig ()
config . Version = sarama . V1_0_0_0
config . Consumer . Return . Errors = true
config . ClientID = "watermill"
return config
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
Connecting# Publisher# // ...
// NewPublisher creates a new Kafka Publisher.
func NewPublisher (
config PublisherConfig ,
logger watermill . LoggerAdapter ,
) ( * Publisher , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go
Example:
// ...
saramaSubscriberConfig := kafka . DefaultSaramaSubscriberConfig ()
// equivalent of auto.offset.reset: earliest
saramaSubscriberConfig . Consumer . Offsets . Initial = sarama . OffsetOldest
subscriber , err := kafka . NewSubscriber (
kafka . SubscriberConfig {
Brokers : [] string { "kafka:9092" },
Unmarshaler : kafka . DefaultMarshaler {},
OverwriteSaramaConfig : saramaSubscriberConfig ,
ConsumerGroup : "test_consumer_group" ,
},
watermill . NewStdLogger ( false , false ),
)
if err != nil {
panic ( err )
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
Subscriber# // ...
// NewSubscriber creates a new Kafka Subscriber.
func NewSubscriber (
config SubscriberConfig ,
logger watermill . LoggerAdapter ,
) ( * Subscriber , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
Example:
// ...
publisher , err := kafka . NewPublisher (
kafka . PublisherConfig {
Brokers : [] string { "kafka:9092" },
Marshaler : kafka . DefaultMarshaler {},
},
watermill . NewStdLogger ( false , false ),
)
if err != nil {
panic ( err )
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
Publishing# // ...
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func ( p * Publisher ) Publish ( topic string , msgs ...* message . Message ) error {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go
Subscribing# // ...
// Subscribe subscribers for messages in Kafka.
//
// There are multiple subscribers spawned
func ( s * Subscriber ) Subscribe ( ctx context . Context , topic string ) ( <- chan * message . Message , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
Marshaler# Watermill’s messages cannot be directly sent to Kafka - they need to be marshaled. You can implement your marshaler or use default implementation.
// ...
// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
Marshal ( topic string , msg * message . Message ) ( * sarama . ProducerMessage , error )
}
// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
Unmarshal ( * sarama . ConsumerMessage ) ( * message . Message , error )
}
type MarshalerUnmarshaler interface {
Marshaler
Unmarshaler
}
type DefaultMarshaler struct {}
func ( DefaultMarshaler ) Marshal ( topic string , msg * message . Message ) ( * sarama . ProducerMessage , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go
Partitioning# Our Publisher has support for the partitioning mechanism.
It can be done with special Marshaler implementation:
// ...
type kafkaJsonWithPartitioning struct {
DefaultMarshaler
generatePartitionKey GeneratePartitionKey
}
func NewWithPartitioningMarshaler ( generatePartitionKey GeneratePartitionKey ) MarshalerUnmarshaler {
return kafkaJsonWithPartitioning { generatePartitionKey : generatePartitionKey }
}
func ( j kafkaJsonWithPartitioning ) Marshal ( topic string , msg * message . Message ) ( * sarama . ProducerMessage , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go
When using, you need to pass your function to generate partition key.
It’s a good idea to pass this partition key with metadata to not unmarshal entire message.
marshaler := kafka . NewWithPartitioningMarshaler ( func ( topic string , msg * message . Message ) ( string , error ) {
return msg . Metadata . Get ( "partition" ), nil
})