Requeuing After Error

When a message fails to process (a nack is sent), it usually blocks other messages on the same topic (within the same consumer group or partition).

Depending on your setup, it may be useful to requeue the failed message back to the tail of the queue.

Consider this if:

  • You don’t care about the order of messages.
  • Your system isn’t resilient to blocked messages.

Requeuer

The Requeuer component is a wrapper on the Router that moves messages from one topic to another.

// ...
type Config struct {
	// Subscriber is the subscriber to consume messages from. Required.
	Subscriber message.Subscriber

	// SubscribeTopic is the topic related to the Subscriber to consume messages from. Required.
	SubscribeTopic string

	// Publisher is the publisher to publish requeued messages to. Required.
	Publisher message.Publisher

	// GeneratePublishTopic is the topic related to the Publisher to publish the requeued message to.
	// For example, it could be a constant, or taken from the message's metadata.
	// Required.
	GeneratePublishTopic func(params GeneratePublishTopicParams) (string, error)

	// Delay is the duration to wait before requeuing the message. Optional.
	// The default is no delay.
	//
	// This can be useful to avoid requeuing messages too quickly, for example, to avoid
	// requeuing a message that failed to process due to a temporary issue.
	//
	// Avoid setting this to a very high value, as it will block the message processing.
	Delay time.Duration

	// Router is the custom router to run the requeue handler on. Optional.
	Router *message.Router
}
// ...

Full source: github.com/ThreeDotsLabs/watermill/components/requeuer/requeuer.go

A trivial usage can look like this. It requeues messages from one topic to the same topic after a delay.

Using the delay this way is not recommended, as it blocks the entire requeue process for the given time.

req, err := requeuer.NewRequeuer(requeuer.Config{
    Subscriber:     sub,
    SubscribeTopic: "topic",
    Publisher:      pub,
    GeneratePublishTopic: func(params requeuer.GeneratePublishTopicParams) (string, error) {
        return "topic", nil
    },
    Delay: time.Millisecond * 200,
}, logger)
if err != nil {
	return err
}

err := req.Run(context.Background())
if err != nil {
    return err
}

A better way to use the Requeuer is to combine it with the Poison middleware. The middleware moves messages to a separate “poison” topic. Then, the requeuer moves them back to the original topic based on the metadata.

You combine this with a Pub/Sub that supports delayed messages. See the full example based on PostgreSQL .


Check our online hands-on training