Message Router

The Magic Glue of Watermill.

Publishers and Subscribers are rather low-level parts of Watermill. In production use, you’d usually want to use a high-level interface and features like correlation, metrics, poison queue, retrying, throttling, etc..

You also might not want to send an Ack when processing was successful. Sometimes, you’d like to send a message after processing of another message finishes.

To handle these requirements, there is a component named Router.

Watermill Router

Configuration

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout determines how long router should work for handlers when closing.
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate returns Router configuration error, if any.
func (c RouterConfig) Validate() error {
	return nil
}
// ...

Handler

At the beginning you need to implement HandlerFunc:

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerFunc is function called when message is received.
//
// msg.Ack() is called automatically when HandlerFunc doesn't return error.
// When HandlerFunc returns error, msg.Nack() is called.
// When msg.Ack() was called in handler and HandlerFunc returns error,
// msg.Nack() will be not sent because Ack was already sent.
//
// HandlerFunc's are executed parallel when multiple messages was received
// (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

Next, you have to add a new handler with Router.AddHandler:

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddHandler adds a new handler.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// publishTopic is a topic to which router will produce messages returned by handlerFunc.
// When handler needs to publish to multiple topics,
// it is recommended to just inject Publisher to Handler or implement middleware
// which will catch messages and publish to topic based on metadata for example.
//
// If handler is added while router is already running, you need to explicitly call RunHandlers().
func (r *Router) AddHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	publishTopic string,
	publisher Publisher,
	handlerFunc HandlerFunc,
) *Handler {
	r.logger.Info("Adding handler", watermill.LogFields{
		"handler_name": handlerName,
		"topic":        subscribeTopic,
	})

	r.handlersLock.Lock()
	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {
		panic(DuplicateHandlerNameError{handlerName})
	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{
		name:   handlerName,
		logger: r.logger,

		subscriber:     subscriber,
		subscribeTopic: subscribeTopic,
		subscriberName: subscriberName,

		publisher:     publisher,
		publishTopic:  publishTopic,
		publisherName: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,
		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,
		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),
	}

	r.handlersWg.Add(1)
	r.handlers[handlerName] = newHandler

	select {
	case r.handlerAdded <- struct{}{}:
	default:
		// closeWhenAllHandlersStopped is not always waiting for handlerAdded
	}

	return &Handler{
		router:  r,
		handler: newHandler,
	}
}

// AddNoPublisherHandler adds a new handler.
// This handler cannot return messages.
// When message is returned it will occur an error and Nack will be sent.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// subscriber is Subscriber from which messages will be consumed.
//
// If handler is added while router is already running, you need to explicitly call RunHandlers().
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

See an example usage from Getting Started:

Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler returns a handler which can be used to add handler level middleware
	// or to stop handler.
	handler := router.AddHandler(
		"struct_handler",          // handler name, must be unique
		"incoming_messages_topic", // topic from which we will read events
		pubSub,
		"outgoing_messages_topic", // topic to which we will publish events
		pubSub,
		structHandler{}.Handler,
	)

	// Handler level middleware is only executed for a specific handler
	// Such middleware can be added the same way the router level ones
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("executing handler specific middleware for ", message.UUID)

			return h(message)
		}
	})

// ...

No publisher handler

Not every handler will produce new messages. You can add this kind of handler by using Router.AddNoPublisherHandler:

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler adds a new handler.
// This handler cannot return messages.
// When message is returned it will occur an error and Nack will be sent.
//
// handlerName must be unique. For now, it is used only for debugging.
//
// subscribeTopic is a topic from which handler will receive messages.
//
// subscriber is Subscriber from which messages will be consumed.
//
// If handler is added while router is already running, you need to explicitly call RunHandlers().
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

Ack

By default, msg.Ack() is called when HanderFunc doesn’t return an error. If an error is returned, msg.Nack() will be called. Because of this, you don’t have to call msg.Ack() or msg.Nack() after a message is processed (you can if you want, of course).

Producing messages

When returning multiple messages from a handler, be aware that most Publisher implementations don’t support atomic publishing of messages. It may end up producing only some of messages and sending msg.Nack() if the broker or the storage are not available.

If it is an issue, consider publishing just one message with each handler.

Running the Router

To run the Router, you need to call Run().

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run runs all plugins and handlers and starts subscribing to provided topics.
// This call is blocking while the router is running.
//
// When all handlers have stopped (for example, because subscriptions were closed), the router will also stop.
//
// To stop Run() you should call Close() on the router.
//
// ctx will be propagated to all subscribers.
//
// When all handlers are stopped (for example: because of closed connection), Run() will be also stopped.
func (r *Router) Run(ctx context.Context) (err error) {
// ...

Ensuring that the Router is running

It can be useful to know if the router is running. You can use the Running() method for this.

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running is closed when router is running.
// In other words: you can wait till router is running using
//
//	fmt.Println("Starting router")
//	go r.Run(ctx)
//	<- r.Running()
//	fmt.Println("Router is running")
//
// Warning: for historical reasons, this channel is not aware of router closing - the channel will be closed if the router has been running and closed.
func (r *Router) Running() chan struct{} {
// ...

You can also use IsRunning function, that returns bool:

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning returns true when router is running.
//
// Warning: for historical reasons, this method is not aware of router closing.
// If you want to know if the router was closed, use IsClosed.
func (r *Router) IsRunning() bool {
// ...

Closing the Router

To close the Router, you need to call Close().

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() will close all publishers and subscribers, and wait for all handlers to finish.

Close() will wait for a timeout configured in RouterConfig.CloseTimeout. If the timeout is reached, Close() will return an error.

Adding handler after the router has started

You can add a new handler while the router is already running. To do that, you need to call AddNoPublisherHandler or AddHandler and call RunHandlers.

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

Stopping running handler

It is possible to stop just one running handler by calling Stop().

Please keep in mind, that router will be closed when there are no running handlers.

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if handler was stopped with Stopped() function.
func (h *Handler) Stop() {
// ...

Execution models

Subscribers can consume either one message at a time or multiple messages in parallel.

  • Single stream of messages is the simplest approach and it means that until a msg.Ack() is called, the subscriber will not receive any new messages.
  • Multiple message streams are supported only by some subscribers. By subscribing to multiple topic partitions at once, several messages can be consumed in parallel, even previous messages that were not acked (for example, the Kafka subscriber works like this). Router handles this model by running concurrent HandlerFuncs, one for each partition.

See the chosen Pub/Sub documentation for supported execution models.

Middleware

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
//
// It can be attached to the router by using `AddMiddleware` method.
//
// Example:
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("executed before handler")
//			producedMessages, err := h(message)
//			fmt.Println("executed after handler")
//
//			return producedMessages, err
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

A full list of standard middlewares can be found in Middlewares.

Plugin

Full source: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin is function which is executed on Router start.
type RouterPlugin func(*Router) error

// ...

A full list of standard plugins can be found in message/router/plugin.

Context

Each message received by handler holds some useful values in the context:

Full source: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx returns the name of the message handler in the router that consumed the message.
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx returns the name of the message publisher type that published the message in the router.
// For example, for Kafka it will be `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx returns the name of the message subscriber type that subscribed to the message in the router.
// For example, for Kafka it will be `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx returns the topic from which message was received in the router.
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx returns the topic to which message will be published by the router.
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...