Message Router

Magic glue of Watermill.

Publishers and subscribers are rather low-level parts of Watermill. In production use, we want usually use something which is higher level and provides some features like correlation, metrics, poison queue, retrying, throttling, etc..

We also don’t want to send Ack when processing was successful. Sometimes, we also want to send a message after processing another.

To handle these requirements we created component named Router.

Kiwi standing on oval

Configuration

Full source: message/router.go

// ...
type RouterConfig struct {
    // CloseTimeout determines how long router should work for handlers when closing.
   CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
    if c.CloseTimeout == 0 {
        c.CloseTimeout = time.Second * 30
    }
}

func (c RouterConfig) Validate() error {
    return nil
}
// ...

Handler

At the beginning we need to implement HandlerFunc:

Full source: message/router.go

// ...
// HandlerFunc is function called when message is received.
//
// msg.Ack() is called automatically when HandlerFunc doesn't return error.
// When HandlerFunc returns error, msg.Nack() is called.
// When msg.Ack() was called in handler and HandlerFunc returns error,
// msg.Nack() will be not sent because Ack was already sent.
//
// HandlerFunc's are executed parallel when multiple messages was received
// (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

Next we need to add a new handler with Router.AddHandler:

Full source: message/router.go

// ...
// AddHandler adds a new handler.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// publishTopic is a topic to which router will produce messages retuened by handlerFunc.
// When handler needs to publish to multiple topics,
// it is recommended to just inject Publisher to Handler or implement middleware
// which will catch messages and publish to topic based on metadata for example.
//
// pubSub is PubSub from which messages will be consumed and to which created messages will be published.
// If you have separated Publisher and Subscriber object,
// you can create PubSub object by calling message.NewPubSub(publisher, subscriber).
func (r *Router) AddHandler(
    handlerName string,
    subscribeTopic string,
    publishTopic string,
    pubSub PubSub,
    handlerFunc HandlerFunc,
) error {
// ...

And example usage from Getting Started:

Full source: docs/content/docs/getting-started/router/main.go

// ...
   if err := router.AddHandler(
        "struct_handler",  // handler name, must be unique
       "example.topic_1", // topic from which we will read events
       "example.topic_2", // topic to which we will publish event
       pubSub,
        structHandler{}.Handler,
    ); err != nil {
        panic(err)
    }
// ...

No publisher handler

Not every handler needs to publish messages. You can add this kind of handler by using Router.AddNoPublisherHandler:

Full source: message/router.go

// ...
// AddNoPublisherHandler adds a new handler.
// This handler cannot return messages.
// When message is returned it will occur an error and Nack will be sent.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// subscriber is Subscriber from which messages will be consumed.
func (r *Router) AddNoPublisherHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    handlerFunc HandlerFunc,
) error {
// ...

Ack

You don’t need to call msg.Ack() or msg.Nack() after a message is processed (but you can, of course). msg.Ack() is called when HanderFunc doesn’t return error. If the error was returned, msg.Nack() will be called.

Producing messages

When returning multiple messages in the router, you should be aware that most of Publisher’s implementations don’t support atomically publishing of the messages.

It may lead to producing only part of the messages and sending msg.Nack() when broker or storage is not available.

When it is a problem, you should consider publishing maximum one message with one handler.

Running router

To run the router, you need to call Run().

Full source: message/router.go

// ...
// Run runs all plugins and handlers and starts subscribing to provided topics.
// This call is blocking until router is running.
//
// To stop Run() you should call Close() on the router.
func (r *Router) Run() (err error) {
// ...

Ensuring that router is running

Sometimes, you want to do something after the router was started. You can use Running() method for this.

Full source: message/router.go

// ...
// Running is closed when router is running.
// In other words: you can wait till router is running using
//        fmt.Println("Starting router")
//        go r.Run()
//        <- r.Running()
//        fmt.Println("Router is running")
func (r *Router) Running() chan struct{} {
// ...

Execution model

Some Consumers may support an only single stream of messages - that means that until msg.Ack() is sent you will not receive more messages.

However, some Consumers can, for example, subscribe to multiple partitions in parallel and multiple messages will be sent even previous was not Acked (Kafka Consumer for example). The router can handle this case and spawn multiple HandlerFunc in parallel.

Middleware

Full source: message/router.go

// ...
// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
//
// It can be attached to the router by using `AddMiddleware` method.
//
// Example:
//        func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//            return func(message *message.Message) ([]*message.Message, error) {
//                fmt.Println("executed before handler")
//                producedMessages, err := h(message)
//                fmt.Println("executed after handler")
//
//                return producedMessages, err
//            }
//        }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

A full list of standard middlewares can are in message/router/middleware.

Plugin

Full source: message/router.go

// ...
// RouterPlugin is function which is executed on Router start.
type RouterPlugin func(*Router) error

// ...

A full list of standard plugins can are in message/router/plugin.