Implementing a new Pub/Sub
On this page
The Pub/Sub interface
To add support for a custom Pub/Sub, you have to implement both message.Publisher
and message.Subscriber
interfaces.
// ...
type Publisher interface {
// Publish publishes provided messages to the given topic.
//
// Publish can be synchronous or asynchronous - it depends on the implementation.
//
// Most publisher implementations don't support atomic publishing of messages.
// This means that if publishing one of the messages fails, the next messages will not be published.
//
// Publish does not work with a single Context.
// Use the Context() method of each message instead.
//
// Publish must be thread safe.
Publish(topic string, messages ...*Message) error
// Close should flush unsent messages if publisher is async.
Close() error
}
// Subscriber is the consuming part of the Pub/Sub.
type Subscriber interface {
// Subscribe returns an output channel with messages from the provided topic.
// The channel is closed after Close() is called on the subscriber.
//
// To receive the next message, `Ack()` must be called on the received message.
// If message processing fails and the message should be redelivered `Nack()` should be called instead.
//
// When the provided ctx is canceled, the subscriber closes the subscription and the output channel.
// The provided ctx is passed to all produced messages.
// When Nack or Ack is called on the message, the context of the message is canceled.
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
// Close closes all subscriptions with their output channels and flushes offsets etc. when needed.
Close() error
}
// SubscribeInitializer is used to initialize subscribers.
type SubscribeInitializer interface {
// ...
Full source: github.com/ThreeDotsLabs/watermill/message/pubsub.go
Testing
Watermill provides a set of test scenarios that any Pub/Sub implementation can use. Each test suite needs to declare what features it supports and how to construct a new Pub/Sub. These scenarios check both basic usage and more uncommon use cases. Stress tests are also included.
TODO list
Here are a few things you shouldn’t forget about:
- Logging (good messages and proper levels).
- Replaceable and configurable messages marshaller.
Close()
implementation for the publisher and subscriber that is:- idempotent
- working correctly even when the publisher or the subscriber is blocked (for example, waiting for an Ack).
- working correctly when the subscriber output channel is blocked (because nothing is listening on it).
Ack()
andNack()
support for consumed messages.- Redelivery on
Nack()
for a consumed message. - Use Universal Pub/Sub tests . For debugging tips, you should check tests troubleshooting guide .
- Performance optimizations.
- GoDocs, Markdown docs and Getting Started examples .
We will also be thankful for submitting a pull requests with the new Pub/Sub implementation.
If anything is not clear, feel free to use any of our support channels to reach us, we will be glad to help.