Message Router

The Magic Glue of Watermill.

Publishers and Subscribers are rather low-level parts of Watermill. In production use, you’d usually want to use a high-level interface and features like correlation, metrics, poison queue, retrying, throttling, etc..

You also might not want to send an Ack when processing was successful. Sometimes, you’d like to send a message after processing of another message finishes.

To handle these requirements, there is a component named Router.

Watermill Router

Configuration

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
    // CloseTimeout determines how long router should work for handlers when closing.
   CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
    if c.CloseTimeout == 0 {
        c.CloseTimeout = time.Second * 30
    }
}

func (c RouterConfig) Validate() error {
    return nil
}
// ...

Handler

At the beginning you need to implement HandlerFunc:

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerFunc is function called when message is received.
//
// msg.Ack() is called automatically when HandlerFunc doesn't return error.
// When HandlerFunc returns error, msg.Nack() is called.
// When msg.Ack() was called in handler and HandlerFunc returns error,
// msg.Nack() will be not sent because Ack was already sent.
//
// HandlerFunc's are executed parallel when multiple messages was received
// (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

Next, you have to add a new handler with Router.AddHandler:

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddHandler adds a new handler.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// publishTopic is a topic to which router will produce messages returned by handlerFunc.
// When handler needs to publish to multiple topics,
// it is recommended to just inject Publisher to Handler or implement middleware
// which will catch messages and publish to topic based on metadata for example.
func (r *Router) AddHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    publishTopic string,
    publisher Publisher,
    handlerFunc HandlerFunc,
) *Handler {
    r.logger.Info("Adding handler", watermill.LogFields{
        "handler_name": handlerName,
        "topic":        subscribeTopic,
    })

    if _, ok := r.handlers[handlerName]; ok {
        panic(DuplicateHandlerNameError{handlerName})
    }

    publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

    newHandler := &handler{
        name:   handlerName,
        logger: r.logger,

        subscriber:     subscriber,
        subscribeTopic: subscribeTopic,
        subscriberName: subscriberName,

        publisher:     publisher,
        publishTopic:  publishTopic,
        publisherName: publisherName,

        handlerFunc:       handlerFunc,
        runningHandlersWg: r.runningHandlersWg,
        messagesCh:        nil,
        closeCh:           r.closeCh,
    }

    r.handlers[handlerName] = newHandler

    return &Handler{
        router:  r,
        handler: newHandler,
    }
}

// AddNoPublisherHandler adds a new handler.
// This handler cannot return messages.
// When message is returned it will occur an error and Nack will be sent.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// subscriber is Subscriber from which messages will be consumed.
func (r *Router) AddNoPublisherHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    handlerFunc NoPublishHandlerFunc,
) {
// ...

See an example usage from Getting Started:

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
   handler := router.AddHandler(
        "struct_handler",          // handler name, must be unique
       "incoming_messages_topic", // topic from which we will read events
       pubSub,
        "outgoing_messages_topic", // topic to which we will publish events
       pubSub,
        structHandler{}.Handler,
    )
// ...

No publisher handler

Not every handler will produce new messages. You can add this kind of handler by using Router.AddNoPublisherHandler:

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler adds a new handler.
// This handler cannot return messages.
// When message is returned it will occur an error and Nack will be sent.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// subscriber is Subscriber from which messages will be consumed.
func (r *Router) AddNoPublisherHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    handlerFunc NoPublishHandlerFunc,
) {
// ...

Ack

By default, msg.Ack() is called when HanderFunc doesn’t return an error. If an error is returned, msg.Nack() will be called. Because of this, you don’t have to call msg.Ack() or msg.Nack() after a message is processed (you can if you want, of course).

Producing messages

When returning multiple messages from a handler, be aware that most Publisher implementations don’t support atomic publishing of messages. It may end up producing only some of messages and sending msg.Nack() if the broker or the storage are not available.

If it is an issue, consider publishing just one message with each handler.

Running the Router

To run the Router, you need to call Run().

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run runs all plugins and handlers and starts subscribing to provided topics.
// This call is blocking while the router is running.
//
// When all handlers have stopped (for example, because subscriptions were closed), the router will also stop.
//
// To stop Run() you should call Close() on the router.
//
// ctx will be propagated to all subscribers.
//
// When all handlers are stopped (for example: because of closed connection), Run() will be also stopped.
func (r *Router) Run(ctx context.Context) (err error) {
// ...

Ensuring that the Router is running

It can be useful to know if the router is running. You can use the Running() method for this.

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running is closed when router is running.
// In other words: you can wait till router is running using
//        fmt.Println("Starting router")
//        go r.Run(ctx)
//        <- r.Running()
//        fmt.Println("Router is running")
func (r *Router) Running() chan struct{} {
// ...

Execution models

Subscribers can consume either one message at a time or multiple messages in parallel.

  • Single stream of messages is the simplest approach and it means that until a msg.Ack() is called, the subscriber will not receive any new messages.
  • Multiple message streams are supported only by some subscribers. By subscribing to multiple topic partitions at once, several messages can be consumed in parallel, even previous messages that were not acked (for example, the Kafka subscriber works like this). Router handles this model by running concurrent HandlerFuncs, one for each partition.

See the chosen Pub/Sub documentation for supported execution models.

Middleware

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
//
// It can be attached to the router by using `AddMiddleware` method.
//
// Example:
//        func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//            return func(message *message.Message) ([]*message.Message, error) {
//                fmt.Println("executed before handler")
//                producedMessages, err := h(message)
//                fmt.Println("executed after handler")
//
//                return producedMessages, err
//            }
//        }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

A full list of standard middlewares can be found in Middlewares.

Plugin

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin is function which is executed on Router start.
type RouterPlugin func(*Router) error

// ...

A full list of standard plugins can be found in message/router/plugin.

Context

Each message received by handler holds some useful values in the context:

Full source: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx returns the name of the message handler in the router that consumed the message.
func HandlerNameFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx returns the name of the message publisher type that published the message in the router.
// For example, for Kafka it will be `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx returns the name of the message subscriber type that subscribed to the message in the router.
// For example, for Kafka it will be `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx returns the topic from which message was received in the router.
func SubscribeTopicFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx returns the topic to which message will be published by the router.
func PublishTopicFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, publishTopicKey)
}
// ...