Pub/Sub's implementations

Golang channel, Kafka, HTTP, Google Cloud Pub/Sub and more!.

NamePublisherSubscriberStatus
Golang Channelxxprod-ready
Kafkaxxprod-ready
HTTPxprod-ready
Google Cloud Pub/Subxxbeta
NATS Streamingxxbeta
MySQL Binlogxidea

All built-in implementations can be found in message/infrastructure.

Golang Channel

Full source: message/infrastructure/gochannel/pubsub.go

// ...
// GoChannel is the simplest Pub/Sub implementation.
// It is based on Golang's channels which are sent within the process.
//
// GoChannel has no global state,
// that means that you need to use the same instance for Publishing and Subscribing!
type GoChannel struct {
// ...

Characteristics

FeatureImplementsNote
ConsumerGroupsno
ExactlyOnceDeliveryyes
GuaranteedOrderyes
Persistentno

Configuration

You can inject configuration via the constructor.

Full source: message/infrastructure/gochannel/pubsub.go

// ...
func NewGoChannel(buffer int64, logger watermill.LoggerAdapter, sendTimeout time.Duration) message.PubSub {
    return &GoChannel{
        sendTimeout: sendTimeout,
        buffer:      buffer,

        subscribers:     make(map[string][]*subscriber),
        subscribersLock: &sync.RWMutex{},
        logger:          logger,
// ...

Publishing

Full source: message/infrastructure/gochannel/pubsub.go

// ...
// Publish in GoChannel is blocking until all consumers consume and acknowledge the message.
// Sending message to one subscriber has timeout equal to GoChannel.sendTimeout configured via constructor.
//
// Messages are not persisted. If there are no subscribers and message is produced it will be gone.
func (g *GoChannel) Publish(topic string, messages ...*message.Message) error {
// ...

Subscribing

Full source: message/infrastructure/gochannel/pubsub.go

// ...
// Subscribe returns channel to which all published messages are sent.
// Messages are not persisted. If there are no subscribers and message is produced it will be gone.
//
// There are no consumer groups support etc. Every consumer will receive every produced message.
func (g *GoChannel) Subscribe(topic string) (chan *message.Message, error) {
// ...

Marshaler

No marshaling is needed when sending messages within the process.

Kafka

Kafka is one of the most popular Pub/Subs. We are providing Pub/Sub implementation based on Shopify’s Sarama.

Characteristics

FeatureImplementsNote
ConsumerGroupsyes
ExactlyOnceDeliverynoin theory can be achieved with Transactions, currently no support for any Golang client
GuaranteedOrderyesrequire paritition key usage
Persistentyes

Configuration

Full source: message/infrastructure/kafka/subscriber.go

// ...
type SubscriberConfig struct {
    // Kafka brokers list.
   Brokers []string

    // Kafka consumer group.
   // When empty, all messages from all partitions will be returned.
   ConsumerGroup string

    // How long after Nack message should be redelivered.
   NackResendSleep time.Duration

    // How long about unsuccessful reconnecting next reconnect will occur.
   ReconnectRetrySleep time.Duration
}

// NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
const NoSleep time.Duration = -1

func (c *SubscriberConfig) setDefaults() {
    if c.NackResendSleep == 0 {
        c.NackResendSleep = time.Millisecond * 100
    }
    if c.ReconnectRetrySleep == 0 {
        c.ReconnectRetrySleep = time.Second
    }
}

func (c SubscriberConfig) Validate() error {
    if len(c.Brokers) == 0 {
        return errors.New("missing brokers")
    }

    return nil
}

// Subscribe subscribers for messages in Kafka.
// ...
Passing custom Sarama config

You can pass custom config parameters via overwriteSaramaConfig *sarama.Config in NewSubscriber and NewPublisher. When nil is passed, default config is used (DefaultSaramaSubscriberConfig).

Full source: message/infrastructure/kafka/config.go

// ...
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
//        saramaConfig := DefaultSaramaSubscriberConfig()
//        saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
//        subscriber, err := NewSubscriber(watermillConfig, saramaConfig, unmarshaler, logger)
//        // ...
//
func DefaultSaramaSubscriberConfig() *sarama.Config {
    config := sarama.NewConfig()
    config.Version = sarama.V1_0_0_0
    config.Consumer.Return.Errors = true
    config.ClientID = "watermill"

    return config
}
// ...

Connecting

Publisher

Full source: message/infrastructure/kafka/publisher.go

// ...
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
    brokers []string,
    marshaler Marshaler,
    overwriteSaramaConfig *sarama.Config,
    logger watermill.LoggerAdapter,
) (message.Publisher, error) {
// ...

Example:

Full source: docs/content/docs/getting-started/kafka/main.go

// ...
   saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
    // equivalent of auto.offset.reset: earliest
   saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

    subscriber, err := kafka.NewSubscriber(
        kafka.SubscriberConfig{
            Brokers:       []string{"kafka:9092"},
            ConsumerGroup: "test_consumer_group",
        },
        saramaSubscriberConfig,
        kafka.DefaultMarshaler{},
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...
Subscriber

Full source: message/infrastructure/kafka/subscriber.go

// ...
// NewSubscriber creates a new Kafka Subscriber.
func NewSubscriber(
    config SubscriberConfig,
    overwriteSaramaConfig *sarama.Config,
    unmarshaler Unmarshaler,
    logger watermill.LoggerAdapter,
) (message.Subscriber, error) {
// ...

Example:

Full source: docs/content/docs/getting-started/kafka/main.go

// ...
   publisher, err := kafka.NewPublisher(
        []string{"kafka:9092"},
        kafka.DefaultMarshaler{},
        nil, // no custom sarama config
       watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...

Publishing

Full source: message/infrastructure/kafka/publisher.go

// ...
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
// ...

Subscribing

Full source: message/infrastructure/kafka/subscriber.go

// ...
// Subscribe subscribers for messages in Kafka.
//
// There are multiple subscribers spawned
func (s *Subscriber) Subscribe(topic string) (chan *message.Message, error) {
// ...

Marshaler

Watermill’s messages cannot be directly sent to Kafka - they need to be marshaled. You can implement your marshaler or use default implementation.

Full source: message/infrastructure/kafka/marshaler.go

// ...
// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
    Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}

// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
    Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}

type MarshalerUnmarshaler interface {
    Marshaler
    Unmarshaler
}

type DefaultMarshaler struct{}

func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...

Partitioning

Our Publisher has support for the partitioning mechanism.

It can be done with special Marshaler implementation:

Full source: message/infrastructure/kafka/marshaler.go

// ...
type kafkaJsonWithPartitioning struct {
    DefaultMarshaler

    generatePartitionKey GeneratePartitionKey
}

func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler {
    return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
}

func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...

When using, you need to pass your function to generate partition key. It’s a good idea to pass this partition key with metadata to not unmarshal entire message.


marshaler := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
    return msg.Metadata.Get("partition"), nil
})

HTTP

At this moment only HTTP subscriber is available. There is an issue for a HTTP publisher.

HTTP subscriber allows us to send messages received by HTTP request (for example - webhooks). You can then post them to any Publisher. Here is an example with sending HTTP messages to Kafka.

When implemented, HTTP publisher can be used as webhooks sender.

Characteristics

FeatureImplementsNote
ConsumerGroupsno
ExactlyOnceDeliveryno
GuaranteedOrderyes
Persistentno

Configuration

The configuration of HTTP subscriber is done via the constructor.

Full source: message/infrastructure/http/subscriber.go

// ...
// NewSubscriber creates new Subscriber.
//
// addr is TCP address to listen on
//
// unmarshalMessageFunc is function which converts HTTP request to Watermill's message.
//
// logger is Watermill's logger.
func NewSubscriber(addr string, unmarshalMessageFunc UnmarshalMessageFunc, logger watermill.LoggerAdapter) (*Subscriber, error) {
// ...

You can also use NewSubscriberWithRouter constructor to pass your own chi.Router (see chi). This may be helpful if you’d like to add your own HTTP handlers (e.g. a health check endpoint).

Full source: message/infrastructure/http/subscriber.go

// ...
// NewSubscriberWithRouter creates new Subscriber with provided router.
//
// addr is TCP address to listen on
//
// unmarshalMessageFunc is function which converts HTTP request to Watermill's message.
//
// logger is Watermill's logger.
func NewSubscriberWithRouter(
    addr string,
    router chi.Router,
    unmarshalMessageFunc UnmarshalMessageFunc,
    logger watermill.LoggerAdapter) (*Subscriber, error) {
// ...

Running

To run HTTP subscriber you need to run StartHTTPServer(). It needs to be run after Subscribe().

When using with the router, you should wait for the router to start.


<-r.Running()
httpSubscriber.StartHTTPServer()

Subscribing

Full source: message/infrastructure/http/subscriber.go

// ...
// Subscribe adds HTTP handler which will listen in provided url for messages.
//
// Subscribe needs to be called before `StartHTTPServer`.
//
// When request is sent, it will wait for the `Ack`. When Ack is received 200 HTTP status wil be sent.
// When Nack is sent, 500 HTTP status will be sent.
func (s *Subscriber) Subscribe(url string) (chan *message.Message, error) {
// ...

Google Cloud Pub/Sub

Cloud Pub/Sub brings the flexibility and reliability of enterprise message-oriented middleware to the cloud.

At the same time, Cloud Pub/Sub is a scalable, durable event ingestion and delivery system that serves as a foundation for modern stream analytics pipelines. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independently written applications.

Cloud Pub/Sub delivers low-latency, durable messaging that helps developers quickly integrate systems hosted on the Google Cloud Platform and externally.

Documentation: https://cloud.google.com/pubsub/docs/

Characteristics

FeatureImplementsNote
ConsumerGroupsyesmultiple subscribers within the same Subscription name
ExactlyOnceDeliveryno
GuaranteedOrderno
Persistentyes*maximum retention time is 7 days

Configuration

Full source: message/infrastructure/googlecloud/publisher.go

// ...
type PublisherConfig struct {
    // ProjectID is the Google Cloud Engine project ID.
   ProjectID string

    // If false (default), `Publisher` tries to create a topic if there is none with the requested name.
   // Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
   DoNotCreateTopicIfMissing bool

    // Settings for cloud.google.com/go/pubsub client library.
   PublishSettings *pubsub.PublishSettings
    ClientOptions   []option.ClientOption

    Marshaler Marshaler
}

func (c *PublisherConfig) setDefaults() {
    if c.Marshaler == nil {
        c.Marshaler = DefaultMarshalerUnmarshaler{}
    }
}

func NewPublisher(ctx context.Context, config PublisherConfig) (*Publisher, error) {
// ...

Full source: message/infrastructure/googlecloud/subscriber.go

// ...
type SubscriberConfig struct {
    // GenerateSubscriptionName generates subscription name for a given topic.
   // The subscription connects the topic to a subscriber application that receives and processes
   // messages published to the topic.
   //
   // By default, subscriptions expire after 31 days of inactivity.
   //
   // A topic can have multiple subscriptions, but a given subscription belongs to a single topic.
   GenerateSubscriptionName SubscriptionNameFn

    // ProjectID is the Google Cloud Engine project ID.
   ProjectID string

    // If false (default), `Subscriber` tries to create a subscription if there is none with the requested name.
   // Otherwise, trying to use non-existent subscription results in `ErrSubscriptionDoesNotExist`.
   DoNotCreateSubscriptionIfMissing bool

    // If false (default), `Subscriber` tries to create a topic if there is none with the requested name
   // and it is trying to create a new subscription with this topic name.
   // Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`.
   DoNotCreateTopicIfMissing bool

    // Settings for cloud.google.com/go/pubsub client library.
   ReceiveSettings    pubsub.ReceiveSettings
    SubscriptionConfig pubsub.SubscriptionConfig
    ClientOptions      []option.ClientOption

    // Unmarshaler transforms the client library format into watermill/message.Message.
   // Use a custom unmarshaler if needed, otherwise the default Unmarshaler should cover most use cases.
   Unmarshaler Unmarshaler
}

type SubscriptionNameFn func(topic string) string

// TopicSubscriptionName uses the topic name as the subscription name.
func TopicSubscriptionName(topic string) string {
    return topic
}

// TopicSubscriptionNameWithSuffix uses the topic name with a chosen suffix as the subscription name.
func TopicSubscriptionNameWithSuffix(suffix string) SubscriptionNameFn {
    return func(topic string) string {
        return topic + suffix
    }
}

func (c *SubscriberConfig) setDefaults() {
    if c.GenerateSubscriptionName == nil {
        c.GenerateSubscriptionName = TopicSubscriptionName
    }
    if c.Unmarshaler == nil {
        c.Unmarshaler = DefaultMarshalerUnmarshaler{}
    }
}

func NewSubscriber(
// ...
Subscription name

To receive messages published to a topic, you must create a subscription to that topic. Only messages published to the topic after the subscription is created are available to subscriber applications.

The subscription connects the topic to a subscriber application that receives and processes messages published to the topic.

A topic can have multiple subscriptions, but a given subscription belongs to a single topic.

In Watermill, the subscription is created automatically during calling Subscribe(). Subscription name is generated by function passed to SubscriberConfig.GenerateSubscriptionName. By default, it is just the topic name (TopicSubscriptionName).

When you want to consume messages from a topic with multiple subscribers, you should use TopicSubscriptionNameWithSuffix or your custom function to generate the subscription name.

Connecting

Watermill will connect to the instance of Google Cloud Pub/Sub indicated by the environment variables. For production setup, set the GOOGLE_APPLICATION_CREDENTIALS env, as described in the official Google Cloud Pub/Sub docs. Note that you won’t need to install the Cloud SDK, as Watermill will take care of the administrative tasks (creating topics/subscriptions) with the default settings and proper permissions.

For development, you can use a Docker image with the emulator and the PUBSUB_EMULATOR_HOST env (check out the Getting Started guide).

Full source: docs/content/docs/getting-started/googlecloud/main.go

// ...
   publisher, err := googlecloud.NewPublisher(context.Background(), googlecloud.PublisherConfig{
        ProjectID: "test-project",
    })
    if err != nil {
        panic(err)
// ...

Full source: docs/content/docs/getting-started/googlecloud/main.go

// ...
   subscriber, err := googlecloud.NewSubscriber(
        context.Background(),
        googlecloud.SubscriberConfig{
            // custom function to generate Subscription Name,
           // there are also predefined TopicSubscriptionName and TopicSubscriptionNameWithSuffix available.
           GenerateSubscriptionName: func(topic string) string {
                return "test-sub_" + topic
            },
            ProjectID: "test-project",
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
// ...

Publishing

Full source: message/infrastructure/googlecloud/publisher.go

// ...
// Publish publishes a set of messages on a Google Cloud Pub/Sub topic.
// It blocks until all the messages are successfully published or an error occurred.
//
// To receive messages published to a topic, you must create a subscription to that topic.
// Only messages published to the topic after the subscription is created are available to subscriber applications.
//
// See https://cloud.google.com/pubsub/docs/publisher to find out more about how Google Cloud Pub/Sub Publishers work.
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
// ...

Subscribing

Full source: message/infrastructure/googlecloud/subscriber.go

// ...
// Subscribe consumes Google Cloud Pub/Sub and outputs them as Waterfall Message objects on the returned channel.
//
// In Google Cloud Pub/Sub, it is impossible to subscribe directly to a topic. Instead, a *subscription* is used.
// Each subscription has one topic, but there may be multiple subscriptions to one topic (with different names).
//
// The `topic` argument is transformed into subscription name with the configured `GenerateSubscriptionName` function.
// By default, if the subscription or topic don't exist, the are created. This behavior may be changed in the config.
//
// Be aware that in Google Cloud Pub/Sub, only messages sent after the subscription was created can be consumed.
//
// See https://cloud.google.com/pubsub/docs/subscriber to find out more about how Google Cloud Pub/Sub Subscriptions work.
func (s *Subscriber) Subscribe(topic string) (chan *message.Message, error) {
// ...

Marshaler

Watermill’s messages cannot be directly sent to Kafka - they need to be marshaled. You can implement your marshaler or use default implementation.

Full source: message/infrastructure/googlecloud/marshaler.go

// ...
// Marshaler transforms a Waterfall Message into the Google Cloud client library Message.
type Marshaler interface {
    Marshal(topic string, msg *message.Message) (*pubsub.Message, error)
}

// Unmarshaler transforms a Google Cloud client library Message into the Waterfall Message.
type Unmarshaler interface {
    Unmarshal(*pubsub.Message) (*message.Message, error)
}

// UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID.
const UUIDHeaderKey = "_watermill_message_uuid"

// DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way:
// All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata.
// Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key.
type DefaultMarshalerUnmarshaler struct{}
// ...

NATS Streaming

NATS Streaming is a data streaming system powered by NATS, and written in the Go programming language. The executable name for the NATS Streaming server is nats-streaming-server. NATS Streaming embeds, extends, and interoperates seamlessly with the core NATS platform.

Characteristics

FeatureImplementsNote
ConsumerGroupsyesyou need to set DurableName and QueueGroup name
ExactlyOnceDeliveryno
GuaranteedOrdernowith the redelivery feature, order can’t be guaranteed
PersistentyesDurableName is required

Configuration

Full source: message/infrastructure/nats/publisher.go

// ...
type StreamingPublisherConfig struct {
    // ClusterID is the NATS Streaming cluster ID.
   ClusterID string

    // ClientID is the NATS Streaming client ID to connect with.
   // ClientID can contain only alphanumeric and `-` or `_` characters.
   ClientID string

    // StanOptions are custom options for a connection.
   StanOptions []stan.Option

    // Marshaler is marshaler used to marshal messages to stan format.
   Marshaler Marshaler
}

func (c StreamingPublisherConfig) Validate() error {
    if c.Marshaler == nil {
        return errors.New("StreamingPublisherConfig.Marshaler is missing")
    }

    return nil
}

type StreamingPublisher struct {
// ...

Full source: message/infrastructure/nats/subscriber.go

// ...
type StreamingSubscriberConfig struct {
    // ClusterID is the NATS Streaming cluster ID.
   ClusterID string

    // ClientID is the NATS Streaming client ID to connect with.
   // ClientID can contain only alphanumeric and `-` or `_` characters.
   //
   // Using DurableName causes the NATS Streaming server to track
   // the last acknowledged message for that ClientID + DurableName.
   ClientID string

    // QueueGroup is the NATS Streaming queue group.
   //
   // All subscriptions with the same queue name (regardless of the connection they originate from)
   // will form a queue group. Each message will be delivered to only one subscriber per queue group,
   // using queuing semantics.
   //
   // It is recommended to set it with DurableName.
   // For non durable queue subscribers, when the last member leaves the group,
   // that group is removed. A durable queue group (DurableName) allows you to have all members leave
   // but still maintain state. When a member re-joins, it starts at the last position in that group.
   //
   // When QueueGroup is empty, subscribe without QueueGroup will be used.
   QueueGroup string

    // DurableName is the NATS streaming durable name.
   //
   // Subscriptions may also specify a “durable name” which will survive client restarts.
   // Durable subscriptions cause the server to track the last acknowledged message
   // sequence number for a client and durable name. When the client restarts/resubscribes,
   // and uses the same client ID and durable name, the server will resume delivery beginning
   // with the earliest unacknowledged message for this durable subscription.
   //
   // Doing this causes the NATS Streaming server to track
   // the last acknowledged message for that ClientID + DurableName.
   DurableName string

    // SubscribersCount determines wow much concurrent subscribers should be started.
   SubscribersCount int

    // CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
   // When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
   CloseTimeout time.Duration

    // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
   // It is mapped to stan.AckWait option.
   AckWaitTimeout time.Duration

    // StanOptions are custom []stan.Option passed to the connection.
   // It is also used to provide connection parameters, for example:
   //       stan.NatsURL("nats://localhost:4222")
   StanOptions []stan.Option

    // StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription.
   StanSubscriptionOptions []stan.SubscriptionOption

    // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
   Unmarshaler Unmarshaler
}

func (c *StreamingSubscriberConfig) setDefaults() {
    if c.SubscribersCount <= 0 {
        c.SubscribersCount = 1
    }
    if c.CloseTimeout <= 0 {
        c.CloseTimeout = time.Second * 30
    }
    if c.AckWaitTimeout <= 0 {
        c.AckWaitTimeout = time.Second * 30
    }

    c.StanSubscriptionOptions = append(
        c.StanSubscriptionOptions,
        stan.SetManualAckMode(), // manual AckMode is required to support acking/nacking by client
       stan.AckWait(c.AckWaitTimeout),
    )

    if c.DurableName != "" {
        c.StanSubscriptionOptions = append(c.StanSubscriptionOptions, stan.DurableName(c.DurableName))
    }
}

func (c *StreamingSubscriberConfig) Validate() error {
    if c.Unmarshaler == nil {
        return errors.New("StreamingSubscriberConfig.Unmarshaler is missing")
    }

    if c.QueueGroup == "" && c.SubscribersCount > 1 {
        return errors.New(
            "to set StreamingSubscriberConfig.SubscribersCount " +
                "you need to also set StreamingSubscriberConfig.QueueGroup, " +
                "in other case you will receive duplicated messages",
        )
    }

    return nil
}

type StreamingSubscriber struct {
// ...

Connecting

By default NATS client will try to connect to localhost:4222. If you are using different hostname or port you should pass custom stan.Option: stan.NatsURL("nats://your-nats-hostname:4222") to StreamingSubscriberConfig and StreamingPublisherConfig.

Full source: message/infrastructure/nats/publisher.go

// ...
// NewStreamingPublisher creates a new StreamingPublisher.
//
// When using custom NATS hostname, you should pass it by options StreamingPublisherConfig.StanOptions:
//        // ...
//        StanOptions: []stan.Option{
//            stan.NatsURL("nats://your-nats-hostname:4222"),
//        }
//        // ...
func NewStreamingPublisher(config StreamingPublisherConfig, logger watermill.LoggerAdapter) (*StreamingPublisher, error) {
// ...

Example:

Full source: docs/content/docs/getting-started/nats-streaming/main.go

// ...
   publisher, err := nats.NewStreamingPublisher(
        nats.StreamingPublisherConfig{
            ClusterID: "test-cluster",
            ClientID:  "example-publisher",
            StanOptions: []stan.Option{
                stan.NatsURL("nats://nats-streaming:4222"),
            },
            Marshaler: nats.GobMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
// ...

Full source: message/infrastructure/nats/subscriber.go

// ...
// NewStreamingSubscriber creates a new StreamingSubscriber.
//
// When using custom NATS hostname, you should pass it by options StreamingSubscriberConfig.StanOptions:
//        // ...
//        StanOptions: []stan.Option{
//            stan.NatsURL("nats://your-nats-hostname:4222"),
//        }
//        // ...
func NewStreamingSubscriber(config StreamingSubscriberConfig, logger watermill.LoggerAdapter) (*StreamingSubscriber, error) {
// ...

Example:

Full source: docs/content/docs/getting-started/nats-streaming/main.go

// ...
   subscriber, err := nats.NewStreamingSubscriber(
        nats.StreamingSubscriberConfig{
            ClusterID:        "test-cluster",
            ClientID:         "example-subscriber",
            QueueGroup:       "example",
            DurableName:      "my-durable",
            SubscribersCount: 4, // how much goroutines should consume messages
           CloseTimeout:     time.Minute,
            AckWaitTimeout:   time.Second * 30,
            StanOptions: []stan.Option{
                stan.NatsURL("nats://nats-streaming:4222"),
            },
            Unmarshaler: nats.GobMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
// ...

Publishing

Full source: message/infrastructure/nats/publisher.go

// ...
// Publish publishes message to NATS.
//
// Publish will not return until an ack has been received from NATS Streaming.
// When one of messages delivery fails - function is interrupted.
func (p StreamingPublisher) Publish(topic string, messages ...*message.Message) error {
// ...

Subscribing

Full source: message/infrastructure/nats/subscriber.go

// ...
// Subscribe subscribes messages from NATS Streaming.
//
// Subscribe will spawn SubscribersCount goroutines making subscribe.
func (s *StreamingSubscriber) Subscribe(topic string) (chan *message.Message, error) {
// ...

Marshaler

NATS doesn’t implement any mechanism like metadata or headers of the message. For that reason we need to marshal entire message to the []byte.

The default implementation is based on Golang’s gob.

Full source: message/infrastructure/nats/marshaler.go

// ...
type Marshaler interface {
    Marshal(topic string, msg *message.Message) ([]byte, error)
}

type Unmarshaler interface {
    Unmarshal(*stan.Msg) (*message.Message, error)
}

type MarshalerUnmarshaler interface {
    Marshaler
    Unmarshaler
}

// GobMarshaler is marshaller which is using Gob to marshal Watermill messages.
type GobMarshaler struct{}
// ...

When you have your own format of the messages, you can implement your own Marshaler, which will serialize messages in your format.

When needed, you can bypass both UUID and Metadata and send just a message.Payload, but some standard middlewares may be not working.

Implementing your own Pub/Sub

There aren’t your Pub/Sub implementation? Please check Implementing custom Pub/Sub.