Message Router

The Magic Glue of Watermill.

Publishers and Subscribers are rather low-level parts of Watermill. In production use, you’d usually want to use a high-level interface and features like correlation, metrics, poison queue, retrying, throttling, etc..

You also might not want to send an Ack when processing was successful. Sometimes, you’d like to send a message after processing of another message finishes.

To handle these requirements, there is a component named Router.

Watermill Router

Configuration

Full source: message/router.go

// ...
type RouterConfig struct {
    // CloseTimeout determines how long router should work for handlers when closing.
   CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
    if c.CloseTimeout == 0 {
        c.CloseTimeout = time.Second * 30
    }
}

func (c RouterConfig) Validate() error {
    return nil
}
// ...

Handler

At the beginning you need to implement HandlerFunc:

Full source: message/router.go

// ...
// HandlerFunc is function called when message is received.
//
// msg.Ack() is called automatically when HandlerFunc doesn't return error.
// When HandlerFunc returns error, msg.Nack() is called.
// When msg.Ack() was called in handler and HandlerFunc returns error,
// msg.Nack() will be not sent because Ack was already sent.
//
// HandlerFunc's are executed parallel when multiple messages was received
// (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

Next, you have to add a new handler with Router.AddHandler:

Full source: message/router.go

// ...
// AddHandler adds a new handler.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// publishTopic is a topic to which router will produce messages returned by handlerFunc.
// When handler needs to publish to multiple topics,
// it is recommended to just inject Publisher to Handler or implement middleware
// which will catch messages and publish to topic based on metadata for example.
//
// pubSub is PubSub from which messages will be consumed and to which created messages will be published.
// If you have separated Publisher and Subscriber object,
// you can create PubSub object by calling message.NewPubSub(publisher, subscriber).
func (r *Router) AddHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    publishTopic string,
    publisher Publisher,
    handlerFunc HandlerFunc,
) {
// ...

See an example usage from Getting Started:

Full source: docs/content/docs/getting-started/router/main.go

// ...
   router.AddHandler(
        "struct_handler",  // handler name, must be unique
       "example.topic_1", // topic from which we will read events
       pubSub,
        "example.topic_2", // topic to which we will publish event
       pubSub,
        structHandler{}.Handler,
    )
// ...

No publisher handler

Not every handler needs to publish messages. You can add this kind of handler by using Router.AddNoPublisherHandler:

Full source: message/router.go

// ...
// AddNoPublisherHandler adds a new handler.
// This handler cannot return messages.
// When message is returned it will occur an error and Nack will be sent.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// subscriber is Subscriber from which messages will be consumed.
func (r *Router) AddNoPublisherHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    handlerFunc HandlerFunc,
) {
// ...

Ack

You don’t have to call msg.Ack() or msg.Nack() after a message is processed (you can if you want, of course). msg.Ack() is called when HanderFunc doesn’t return an error. If an error is returned, msg.Nack() will be called.

Producing messages

When returning multiple messages in the router, you should be aware that most of Publisher’s implementations don’t support atomic publishing of the messages.

It may lead to producing only some of the messages and sending msg.Nack() when the broker or the storage are not available.

If it is an issue, you should consider publishing a maximum of one message with one handler.

Running the Router

To run the Router, you need to call Run().

Full source: message/router.go

// ...
// Run runs all plugins and handlers and starts subscribing to provided topics.
// This call is blocking while the router is running.
//
// When all handlers have stopped (for example, because subscriptions were closed), the router will also stop.
//
// To stop Run() you should call Close() on the router.
func (r *Router) Run() (err error) {
// ...

Ensuring that the Router is running

You might sometimes want to do something after the router starts. You can use the Running() method for this.

Full source: message/router.go

// ...
// Running is closed when router is running.
// In other words: you can wait till router is running using
//        fmt.Println("Starting router")
//        go r.Run()
//        <- r.Running()
//        fmt.Println("Router is running")
func (r *Router) Running() chan struct{} {
// ...

Execution model

Some Consumers may support only a single stream of messages - this means that until a msg.Ack() is sent, you will not receive any more messages.

However, some Consumers support multiple streams. For example, by subscribing to multiple partitions in parallel, multiple messages will be sent, even previous that were not Acked (e.g. the Kafka Consumer works like this). The router can handle this case and spawn multiple HandlerFunc in parallel.

Middleware

Full source: message/router.go

// ...
// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
//
// It can be attached to the router by using `AddMiddleware` method.
//
// Example:
//        func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//            return func(message *message.Message) ([]*message.Message, error) {
//                fmt.Println("executed before handler")
//                producedMessages, err := h(message)
//                fmt.Println("executed after handler")
//
//                return producedMessages, err
//            }
//        }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

A full list of the standard middlewares can be found in message/router/middleware.

Plugin

Full source: message/router.go

// ...
// RouterPlugin is function which is executed on Router start.
type RouterPlugin func(*Router) error

// ...

A full list of the standard plugins can be found in message/router/plugin.