Kafka

A distributed streaming platform from Apache.

Kafka

Apache Kafka is one of the most popular Pub/Subs. We are providing Pub/Sub implementation based on Shopify’s Sarama.

Characteristics

FeatureImplementsNote
ConsumerGroupsyes
ExactlyOnceDeliverynoin theory can be achieved with Transactions, currently no support for any Golang client
GuaranteedOrderyesrequire partition key usage
Persistentyes

Configuration

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go

// ...
type SubscriberConfig struct {
    // Kafka brokers list.
   Brokers []string

    // Unmarshaler is used to unmarshal messages from Kafka format into Watermill format.
   Unmarshaler Unmarshaler

    // OverwriteSaramaConfig holds additional sarama settings.
   OverwriteSaramaConfig *sarama.Config

    // Kafka consumer group.
   // When empty, all messages from all partitions will be returned.
   ConsumerGroup string

    // How long after Nack message should be redelivered.
   NackResendSleep time.Duration

    // How long about unsuccessful reconnecting next reconnect will occur.
   ReconnectRetrySleep time.Duration

    InitializeTopicDetails *sarama.TopicDetail
}

// NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
const NoSleep time.Duration = -1

func (c *SubscriberConfig) setDefaults() {
    if c.OverwriteSaramaConfig == nil {
        c.OverwriteSaramaConfig = DefaultSaramaSubscriberConfig()
    }
    if c.NackResendSleep == 0 {
        c.NackResendSleep = time.Millisecond * 100
    }
    if c.ReconnectRetrySleep == 0 {
        c.ReconnectRetrySleep = time.Second
    }
}

func (c SubscriberConfig) Validate() error {
    if len(c.Brokers) == 0 {
        return errors.New("missing brokers")
    }
    if c.Unmarshaler == nil {
        return errors.New("missing unmarshaler")
    }

    return nil
}

// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
//        saramaConfig := DefaultSaramaSubscriberConfig()
//        saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
//        subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
//        subscriber, err := NewSubscriber(subscriberConfig, logger)
//        // ...
//
func DefaultSaramaSubscriberConfig() *sarama.Config {
    config := sarama.NewConfig()
    config.Version = sarama.V1_0_0_0
    config.Consumer.Return.Errors = true
    config.ClientID = "watermill"

    return config
}

// Subscribe subscribers for messages in Kafka.
// ...
Passing custom Sarama config

You can pass custom config parameters via overwriteSaramaConfig *sarama.Config in NewSubscriber and NewPublisher. When nil is passed, default config is used (DefaultSaramaSubscriberConfig).

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go

// ...
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
//        saramaConfig := DefaultSaramaSubscriberConfig()
//        saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
//        subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
//        subscriber, err := NewSubscriber(subscriberConfig, logger)
//        // ...
//
func DefaultSaramaSubscriberConfig() *sarama.Config {
    config := sarama.NewConfig()
    config.Version = sarama.V1_0_0_0
    config.Consumer.Return.Errors = true
    config.ClientID = "watermill"

    return config
}
// ...

Connecting

Publisher

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go

// ...
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
    config PublisherConfig,
    logger watermill.LoggerAdapter,
) (*Publisher, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
   saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
    // equivalent of auto.offset.reset: earliest
   saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

    subscriber, err := kafka.NewSubscriber(
        kafka.SubscriberConfig{
            Brokers:               []string{"kafka:9092"},
            Unmarshaler:           kafka.DefaultMarshaler{},
            OverwriteSaramaConfig: saramaSubscriberConfig,
            ConsumerGroup:         "test_consumer_group",
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...
Subscriber

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go

// ...
// NewSubscriber creates a new Kafka Subscriber.
func NewSubscriber(
    config SubscriberConfig,
    logger watermill.LoggerAdapter,
) (*Subscriber, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
   publisher, err := kafka.NewPublisher(
        kafka.PublisherConfig{
            Brokers:   []string{"kafka:9092"},
            Marshaler: kafka.DefaultMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...

Publishing

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go

// ...
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
// ...

Subscribing

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go

// ...
// Subscribe subscribers for messages in Kafka.
//
// There are multiple subscribers spawned
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Marshaler

Watermill’s messages cannot be directly sent to Kafka - they need to be marshaled. You can implement your marshaler or use default implementation.

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go

// ...
// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
    Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}

// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
    Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}

type MarshalerUnmarshaler interface {
    Marshaler
    Unmarshaler
}

type DefaultMarshaler struct{}

func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...

Partitioning

Our Publisher has support for the partitioning mechanism.

It can be done with special Marshaler implementation:

Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go

// ...
type kafkaJsonWithPartitioning struct {
    DefaultMarshaler

    generatePartitionKey GeneratePartitionKey
}

func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler {
    return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
}

func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...

When using, you need to pass your function to generate partition key. It’s a good idea to pass this partition key with metadata to not unmarshal entire message.


marshaler := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
    return msg.Metadata.Get("partition"), nil
})