Publishers and Subscribers.


Full source: message/pubsub.go

// ...
type publisher interface {
    // Publish publishes provided messages to given topic.
   // Publish can be synchronous or asynchronous - it depends of implementation.
   // Most publishers implementations doesn't support atomic publishing of messages.
   // That means, that when publishing one of messages failed next messages will be not published.
   // Publish must be thread safe.
   Publish(topic string, messages ...*Message) error

type Publisher interface {

    // Close should flush unsent messages, if publisher is async.
   Close() error
// ...

Publishing multiple messages

Most publishers implementations don’t support atomic publishing of messages. This means that if publishing one of the messages fails, the next messages won’t be published.

Async publish

Publish can be synchronous or asynchronous - it depends on the implementation.


Close should flush unsent messages if the publisher is asynchronous. It is important to not forget to close the subscriber. Otherwise you may lose some of the messages.


Full source: message/pubsub.go

// ...
type subscriber interface {
    // Subscribe returns output channel with messages from provided topic.
   // Channel is closed, when Close() was called to the subscriber.
   // To receive next message, `Ack()` must be called on the received message.
   // If message processing was failed and message should be redelivered `Nack()` should be called.
   // When provided ctx is cancelled, subscriber will close subscribe and close output channel.
   // Provided ctx is set to all produced messages.
   // When Nack or Ack is called on the message, context of the message is canceled.
   Subscribe(ctx context.Context, topic string) (<-chan *Message, error)

type Subscriber interface {

    // Close closes all subscriptions with their output channels and flush offsets etc. when needed.
   Close() error
// ...

Ack/Nack mechanism

It is the Subscriber’s responsibility to handle an Ack and a Nack from a message. A proper implementation should wait for an Ack or a Nack before consuming the next message.

Important Subscriber’s implementation notice: Ack/offset to message’s storage/broker must be sent after Ack from Watermill’s message. Otherwise there is a chance to lose messages if the process dies before the messages have been processed.


Close closes all subscriptions with their output channels and flushes offsets, etc. when needed.

At-least-once delivery

Watermill is built with at-least-once delivery semantics. That means when some error occurs when processing a message and an Ack cannot be sent, the message will be redelivered.

You need to keep it in mind and build your application to be idempotent or implement a deduplication mechanism.

Unfortunately, it’s not possible to create an universal middleware for deduplication, so we encourage you to build your own.

Universal tests

Every Pub/Sub is similar in most aspects. To avoid implementing separate tests for every Pub/Sub, we’ve created a test suite which should be passed by any Pub/Sub implementation.

These tests can be found in message/infrastructure/test_pubsub.go.

Built-in implementations

To check available Pub/Sub implementations, see Pub/Sub’s implementations.

Implementing custom Pub/Sub

See Implementing custom Pub/Sub for instructions on how to introduce support for a new Pub/Sub.

We will also be thankful for submitting pull requests with the new Pub/Sub implementations.

You can also request a new Pub/Sub implementation by submitting a new issue.

Keep going!

Now that you already know how a Pub/Sub is working, we recommend learning about the Message Router component.