Implementing a new Pub/Sub On this page The Pub/Sub interface# To add support for a custom Pub/Sub, you have to implement both message.Publisher
and message.Subscriber
interfaces.
// ...
type Publisher interface {
// Publish publishes provided messages to the given topic.
//
// Publish can be synchronous or asynchronous - it depends on the implementation.
//
// Most publisher implementations don't support atomic publishing of messages.
// This means that if publishing one of the messages fails, the next messages will not be published.
//
// Publish does not work with a single Context.
// Use the Context() method of each message instead.
//
// Publish must be thread safe.
Publish ( topic string , messages ...* Message ) error
// Close should flush unsent messages if publisher is async.
Close () error
}
// Subscriber is the consuming part of the Pub/Sub.
type Subscriber interface {
// Subscribe returns an output channel with messages from the provided topic.
// The channel is closed after Close() is called on the subscriber.
//
// To receive the next message, `Ack()` must be called on the received message.
// If message processing fails and the message should be redelivered `Nack()` should be called instead.
//
// When the provided ctx is canceled, the subscriber closes the subscription and the output channel.
// The provided ctx is passed to all produced messages.
// When Nack or Ack is called on the message, the context of the message is canceled.
Subscribe ( ctx context . Context , topic string ) ( <- chan * Message , error )
// Close closes all subscriptions with their output channels and flushes offsets etc. when needed.
Close () error
}
// SubscribeInitializer is used to initialize subscribers.
type SubscribeInitializer interface {
// ...
Full source: github.com/ThreeDotsLabs/watermill/message/pubsub.go
Testing# Watermill provides a set of test scenarios
that any Pub/Sub implementation can use. Each test suite needs to declare what features it supports and how to construct a new Pub/Sub.
These scenarios check both basic usage and more uncommon use cases. Stress tests are also included.
TODO list# Here are a few things you shouldn’t forget about:
Logging (good messages and proper levels). Replaceable and configurable messages marshaller. Close()
implementation for the publisher and subscriber that is:idempotent working correctly even when the publisher or the subscriber is blocked (for example, waiting for an Ack). working correctly when the subscriber output channel is blocked (because nothing is listening on it). Ack()
and Nack()
support for consumed messages.Redelivery on Nack()
for a consumed message. Use Universal Pub/Sub tests . For debugging tips, you should check tests troubleshooting guide . Performance optimizations. GoDocs, Markdown docs and Getting Started examples . We will also be thankful for submitting a pull requests with the new Pub/Sub implementation.
If anything is not clear, feel free to use any of our support channels to reach us, we will be glad to help.