Getting started

What is Watermill?

Watermill is a Go library for working with messages the easy way.

You can use it to build message-driven and event-driven applications with Pub/Subs like Kafka, RabbitMQ, PostgreSQL, and many more.

Watermill comes with batteries included. It gives you tools used by every message-driven application.

Why use Watermill?

When you run an HTTP server, you don’t deal directly with TCP sockets, parsing HTTP requests, or managing connections. Instead, you use a high-level library like net/http that handles all that complexity for you.

It’s what Watermill aims to be for messages. It provides all you need to build an application based on events or other asynchronous patterns.

There are many different message queues, each with different features, client libraries, and APIs. Watermill hides all that complexity behind an API that is easy to use and understand.

Watermill is NOT a framework. It’s a lightweight library that’s easy to plug in or remove from your project.

Install

go get -u github.com/ThreeDotsLabs/watermill

Learn in practice

Docs too boring? Prefer learning by doing?

Try the free hands-on training where you’ll solve exercises to learn how to use Watermill in your projects.

It’ll guide you through the basics and a few advanced concepts like message ordering and the Outbox pattern.

One-Minute Background

The idea behind event-driven applications is always the same: one part publishes messages, and another part subscribes to them.

Watermill supports this behavior for multiple publishers and subscribers .

Three APIs

Watermill comes with three APIs for working with messages. They build on top of each other, each step providing a higher-level API.

In this guide, we’re going to start from the bottom and move up. It’s good to know the fundamentals, even if you’re going to use the high-level APIs.

Watermill components pyramid

Publisher & Subscriber

Most Pub/Sub libraries come with complex features.

Watermill hides this complexity behind two interfaces: the Publisher and Subscriber.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
	Close() error
}

Creating Messages

The core part of Watermill is the Message . It is what http.Request is for the net/http package. Most Watermill features work with this struct.

Watermill doesn’t enforce any message format. NewMessage expects a slice of bytes as the payload. You can use strings, JSON, protobuf, Avro, gob, or anything else that serializes to []byte.

The message UUID is optional but recommended for debugging.

msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

Publishing Messages

Publish expects a topic and one or more Messages to be published.

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

Subscribing for Messages

Subscribe expects a topic name and returns a channel of incoming messages.

What topic exactly means depends on the Pub/Sub implementation. Usually, it needs to match the topic name used by the publisher.

Messages need to be acknowledged after processing by calling the Ack() method.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

See detailed examples below for supported PubSubs.

Router

Publishers and subscribers are the low-level parts of Watermill. For most cases, you want to use a high-level API: the Router component.

Router configuration

Start with configuring the router and adding plugins and middlewares.

A middleware is a function executed for each incoming message. You can use one of the existing ones for things like correlation, metrics, poison queue, retrying, throttling, etc. . You can also create your own.

// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
	panic(err)
}

// SignalsHandler will gracefully shutdown Router when SIGTERM is received.
// You can also close the router by just calling `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)

// Router level middleware are executed for every message sent to the router
router.AddMiddleware(
	// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
	middleware.CorrelationID,

	// The handler function is retried if it returns an error.
	// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.
	middleware.Retry{
		MaxRetries:      3,
		InitialInterval: time.Millisecond * 100,
		Logger:          logger,
	}.Middleware,

	// Recoverer handles panics from handlers.
	// In this case, it passes them as errors to the Retry middleware.
	middleware.Recoverer,
)
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

Handlers

Set up handlers that the router uses. Each handler independently handles incoming messages.

A handler listens to messages from the given subscriber and topic. Any messages returned from the handler function will be published to the given publisher and topic.

// ...
// AddHandler returns a handler which can be used to add handler level middleware
// or to stop handler.
handler := router.AddHandler(
	"struct_handler",          // handler name, must be unique
	"incoming_messages_topic", // topic from which we will read events
	pubSub,
	"outgoing_messages_topic", // topic to which we will publish events
	pubSub,
	structHandler{}.Handler,
)
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

Note: the example above uses one pubSub argument for both the subscriber and publisher. It’s because we use the GoChannel implementation, which is a simple in-memory Pub/Sub.

Alternatively, if you don’t plan to publish messages from within the handler, you can use the simpler AddConsumerHandler method.

// ...
router.AddConsumerHandler(
	"print_incoming_messages",
	"incoming_messages_topic",
	pubSub,
	printMessages,
)
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

You can use two types of handler functions:

  1. a function func(msg *message.Message) ([]*message.Message, error)
  2. a struct method func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

Use the first one if your handler is a function without any dependencies. The second option is useful when your handler requires dependencies such as a database handle or a logger.

// ...
func printMessages(msg *message.Message) error {
	fmt.Printf(
		"\n> Received message: %s\n> %s\n> metadata: %v\n\n",
		msg.UUID, string(msg.Payload), msg.Metadata,
	)
	return nil
}

type structHandler struct {
	// we can add some dependencies here
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
	log.Println("structHandler received message", msg.UUID)

	msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by structHandler"))
	return message.Messages{msg}, nil
}

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

Finally, run the router.

// ...
if err := router.Run(ctx); err != nil {
	panic(err)
}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

The complete example’s source can be found at /_examples/basic/3-router/main.go .

Logging

To see Watermill’s logs, pass any logger that implements the LoggerAdapter . For experimental development, you can use NewStdLogger.

Watermill provides ready-to-use slog adapter. You can create it with watermill.NewSlogLogger . You can also map Watermill’s log levels to slog levels with watermill.NewSlogLoggerWithLevelMapping .

What’s next?

See the CQRS component for the generic high-level API.

For more details, see documentation topics .

The Outbox Pattern is a key pattern to know in event-driven applications.

We recommend checking the examples below to see how Watermill works in practice. You can also try the free hands-on training to learn how to use Watermill in practice.

Examples

Check out the examples that will show you how to start using Watermill.

The recommended entry point is Your first Watermill application . It contains the entire environment in docker-compose.yml, including Go and Kafka, which you can run with one command.

After that, you can see the Realtime feed example. It uses more middlewares and contains two handlers.

For a different subscriber implementation (HTTP), see the receiving-webhooks example. It is a straightforward application that saves webhooks to Kafka.

You can find the complete list of examples in the README .

Support

If anything is not clear, feel free to use any of our support channels ; we will be glad to help.


Check our online hands-on training