Pub/Sub implementations

Golang channel, Kafka, Google Cloud Pub/Sub, RabbitMQ and more!.

NamePublisherSubscriberStatus
Golang Channelxxprod-ready
Kafkaxxprod-ready
HTTPxxprod-ready
Google Cloud Pub/Subxxprod-ready
NATS Streamingxxprod-ready
RabbitMQ (AMQP)xxprod-ready
io.Writer/io.Readerxxexperimental
MySQL Binlogxidea

All built-in implementations can be found in message/infrastructure.

Golang Channel

Full source: message/infrastructure/gochannel/pubsub.go

// ...
// GoChannel is the simplest Pub/Sub implementation.
// It is based on Golang's channels which are sent within the process.
//
// GoChannel has no global state,
// that means that you need to use the same instance for Publishing and Subscribing!
//
// When GoChannel is persistent, messages order is not guaranteed.
type GoChannel struct {
// ...

Characteristics

FeatureImplementsNote
ConsumerGroupsno
ExactlyOnceDeliveryyes
GuaranteedOrderyes
Persistentno

Configuration

You can inject configuration via the constructor.

Full source: message/infrastructure/gochannel/pubsub.go

// ...
func NewGoChannel(config Config, logger watermill.LoggerAdapter) message.PubSub {
    return &GoChannel{
        config: config,

        subscribers:            make(map[string][]*subscriber),
        subscribersByTopicLock: sync.Map{},
        logger: logger.With(watermill.LogFields{
// ...

Publishing

Full source: message/infrastructure/gochannel/pubsub.go

// ...
// Publish in GoChannel is NOT blocking until all consumers consume.
// Messages will be send in background.
//
// Messages may be persisted or not, depending of persistent attribute.
func (g *GoChannel) Publish(topic string, messages ...*message.Message) error {
// ...

Subscribing

Full source: message/infrastructure/gochannel/pubsub.go

// ...
// Subscribe returns channel to which all published messages are sent.
// Messages are not persisted. If there are no subscribers and message is produced it will be gone.
//
// There are no consumer groups support etc. Every consumer will receive every produced message.
func (g *GoChannel) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Marshaler

No marshaling is needed when sending messages within the process.

Kafka

Kafka is one of the most popular Pub/Subs. We are providing Pub/Sub implementation based on Shopify’s Sarama.

Characteristics

FeatureImplementsNote
ConsumerGroupsyes
ExactlyOnceDeliverynoin theory can be achieved with Transactions, currently no support for any Golang client
GuaranteedOrderyesrequire paritition key usage
Persistentyes

Configuration

Full source: message/infrastructure/kafka/subscriber.go

// ...
type SubscriberConfig struct {
    // Kafka brokers list.
   Brokers []string

    // Kafka consumer group.
   // When empty, all messages from all partitions will be returned.
   ConsumerGroup string

    // How long after Nack message should be redelivered.
   NackResendSleep time.Duration

    // How long about unsuccessful reconnecting next reconnect will occur.
   ReconnectRetrySleep time.Duration

    InitializeTopicDetails *sarama.TopicDetail
}

// NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
const NoSleep time.Duration = -1

func (c *SubscriberConfig) setDefaults() {
    if c.NackResendSleep == 0 {
        c.NackResendSleep = time.Millisecond * 100
    }
    if c.ReconnectRetrySleep == 0 {
        c.ReconnectRetrySleep = time.Second
    }
}

func (c SubscriberConfig) Validate() error {
    if len(c.Brokers) == 0 {
        return errors.New("missing brokers")
    }

    return nil
}

// Subscribe subscribers for messages in Kafka.
// ...
Passing custom Sarama config

You can pass custom config parameters via overwriteSaramaConfig *sarama.Config in NewSubscriber and NewPublisher. When nil is passed, default config is used (DefaultSaramaSubscriberConfig).

Full source: message/infrastructure/kafka/config.go

// ...
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
//        saramaConfig := DefaultSaramaSubscriberConfig()
//        saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
//        subscriber, err := NewSubscriber(watermillConfig, saramaConfig, unmarshaler, logger)
//        // ...
//
func DefaultSaramaSubscriberConfig() *sarama.Config {
    config := sarama.NewConfig()
    config.Version = sarama.V1_0_0_0
    config.Consumer.Return.Errors = true
    config.ClientID = "watermill"

    return config
}
// ...

Connecting

Publisher

Full source: message/infrastructure/kafka/publisher.go

// ...
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
    brokers []string,
    marshaler Marshaler,
    overwriteSaramaConfig *sarama.Config,
    logger watermill.LoggerAdapter,
) (message.Publisher, error) {
// ...

Example:

Full source: docs/content/docs/getting-started/kafka/main.go

// ...
   saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
    // equivalent of auto.offset.reset: earliest
   saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

    subscriber, err := kafka.NewSubscriber(
        kafka.SubscriberConfig{
            Brokers:       []string{"kafka:9092"},
            ConsumerGroup: "test_consumer_group",
        },
        saramaSubscriberConfig,
        kafka.DefaultMarshaler{},
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...
Subscriber

Full source: message/infrastructure/kafka/subscriber.go

// ...
// NewSubscriber creates a new Kafka Subscriber.
func NewSubscriber(
    config SubscriberConfig,
    overwriteSaramaConfig *sarama.Config,
    unmarshaler Unmarshaler,
    logger watermill.LoggerAdapter,
) (message.Subscriber, error) {
// ...

Example:

Full source: docs/content/docs/getting-started/kafka/main.go

// ...
   publisher, err := kafka.NewPublisher(
        []string{"kafka:9092"},
        kafka.DefaultMarshaler{},
        nil, // no custom sarama config
       watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...

Publishing

Full source: message/infrastructure/kafka/publisher.go

// ...
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
// ...

Subscribing

Full source: message/infrastructure/kafka/subscriber.go

// ...
// Subscribe subscribers for messages in Kafka.
//
// There are multiple subscribers spawned
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Marshaler

Watermill’s messages cannot be directly sent to Kafka - they need to be marshaled. You can implement your marshaler or use default implementation.

Full source: message/infrastructure/kafka/marshaler.go

// ...
// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
    Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}

// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
    Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}

type MarshalerUnmarshaler interface {
    Marshaler
    Unmarshaler
}

type DefaultMarshaler struct{}

func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...

Partitioning

Our Publisher has support for the partitioning mechanism.

It can be done with special Marshaler implementation:

Full source: message/infrastructure/kafka/marshaler.go

// ...
type kafkaJsonWithPartitioning struct {
    DefaultMarshaler

    generatePartitionKey GeneratePartitionKey
}

func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler {
    return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
}

func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...

When using, you need to pass your function to generate partition key. It’s a good idea to pass this partition key with metadata to not unmarshal entire message.


marshaler := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
    return msg.Metadata.Get("partition"), nil
})

HTTP

The HTTP subscriber listens to HTTP requests (for example - webhooks) and outputs them as messages. You can then post them to any Publisher. Here is an example with sending HTTP messages to Kafka.

The HTTP publisher sends HTTP requests as specified in its configuration. Here is an example with transforming Kafka messages into HTTP webhook requests.

Characteristics

FeatureImplementsNote
ConsumerGroupsno
ExactlyOnceDeliveryyes
GuaranteedOrderyes
Persistentno

Subscriber configuration

Subscriber configuration is done via the config struct passed to the constructor:

Full source: message/infrastructure/http/subscriber.go

// ...
type SubscriberConfig struct {
    Router               chi.Router
    UnmarshalMessageFunc UnmarshalMessageFunc
}
// ...

You can use the Router config option to SubscriberConfig to pass your own chi.Router (see chi). This may be helpful if you’d like to add your own HTTP handlers (e.g. a health check endpoint).

Publisher configuration

Publisher configuration is done via the config struct passed to the constructor:

Full source: message/infrastructure/http/publisher.go

// ...
type PublisherConfig struct {
    MarshalMessageFunc MarshalMessageFunc
    Client             *http.Client
    // if false (default), when server responds with error (>=400) to the webhook request, the response body is logged.
   DoNotLogResponseBodyOnServerError bool
}
// ...

How the message topic and body translate into the URL, method, headers, and payload of the HTTP request is highly configurable through the use of MarshalMessageFunc. Use the provided DefaultMarshalMessageFunc to send POST requests to a specific url:

Full source: message/infrastructure/http/publisher.go

// ...
// MarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
type MarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)

// DefaultMarshalMessageFunc transforms the message into a HTTP POST request.
// It encodes the UUID and Metadata in request headers.
func DefaultMarshalMessageFunc(url string, msg *message.Message) (*http.Request, error) {
    req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(msg.Payload))
    if err != nil {
        return nil, err
    }

    req.Header.Set(HeaderUUID, msg.UUID)

    metadataJson, err := json.Marshal(msg.Metadata)
    if err != nil {
        return nil, errors.Wrap(err, "could not marshal metadata to JSON")
    }
    req.Header.Set(HeaderMetadata, string(metadataJson))
    return req, nil
}

// ...

You can pass your own http.Client to execute the requests or use Golang’s default client.

Running

To run HTTP subscriber you need to run StartHTTPServer(). It needs to be run after Subscribe().

When using with the router, you should wait for the router to start.


<-r.Running()
httpSubscriber.StartHTTPServer()

Subscribing

Full source: message/infrastructure/http/subscriber.go

// ...
// Subscribe adds HTTP handler which will listen in provided url for messages.
//
// Subscribe needs to be called before `StartHTTPServer`.
//
// When request is sent, it will wait for the `Ack`. When Ack is received 200 HTTP status wil be sent.
// When Nack is sent, 500 HTTP status will be sent.
func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message.Message, error) {
// ...

Google Cloud Pub/Sub

Cloud Pub/Sub brings the flexibility and reliability of enterprise message-oriented middleware to the cloud.

At the same time, Cloud Pub/Sub is a scalable, durable event ingestion and delivery system that serves as a foundation for modern stream analytics pipelines. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independently written applications.

Cloud Pub/Sub delivers low-latency, durable messaging that helps developers quickly integrate systems hosted on the Google Cloud Platform and externally.

Documentation: https://cloud.google.com/pubsub/docs/

Characteristics

FeatureImplementsNote
ConsumerGroupsyesmultiple subscribers within the same Subscription name
ExactlyOnceDeliveryno
GuaranteedOrderno
Persistentyes*maximum retention time is 7 days

Configuration

Full source: message/infrastructure/googlecloud/publisher.go

// ...
type PublisherConfig struct {
    // ProjectID is the Google Cloud Engine project ID.
   ProjectID string

    // If false (default), `Publisher` tries to create a topic if there is none with the requested name.
   // Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
   DoNotCreateTopicIfMissing bool

    // Settings for cloud.google.com/go/pubsub client library.
   PublishSettings *pubsub.PublishSettings
    ClientOptions   []option.ClientOption

    Marshaler Marshaler
}

func (c *PublisherConfig) setDefaults() {
    if c.Marshaler == nil {
        c.Marshaler = DefaultMarshalerUnmarshaler{}
    }
}

func NewPublisher(ctx context.Context, config PublisherConfig) (*Publisher, error) {
// ...

Full source: message/infrastructure/googlecloud/subscriber.go

// ...
type SubscriberConfig struct {
    // GenerateSubscriptionName generates subscription name for a given topic.
   // The subscription connects the topic to a subscriber application that receives and processes
   // messages published to the topic.
   //
   // By default, subscriptions expire after 31 days of inactivity.
   //
   // A topic can have multiple subscriptions, but a given subscription belongs to a single topic.
   GenerateSubscriptionName SubscriptionNameFn

    // ProjectID is the Google Cloud Engine project ID.
   ProjectID string

    // If false (default), `Subscriber` tries to create a subscription if there is none with the requested name.
   // Otherwise, trying to use non-existent subscription results in `ErrSubscriptionDoesNotExist`.
   DoNotCreateSubscriptionIfMissing bool

    // If false (default), `Subscriber` tries to create a topic if there is none with the requested name
   // and it is trying to create a new subscription with this topic name.
   // Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`.
   DoNotCreateTopicIfMissing bool

    // Settings for cloud.google.com/go/pubsub client library.
   ReceiveSettings    pubsub.ReceiveSettings
    SubscriptionConfig pubsub.SubscriptionConfig
    ClientOptions      []option.ClientOption

    // Unmarshaler transforms the client library format into watermill/message.Message.
   // Use a custom unmarshaler if needed, otherwise the default Unmarshaler should cover most use cases.
   Unmarshaler Unmarshaler
}

type SubscriptionNameFn func(topic string) string

// TopicSubscriptionName uses the topic name as the subscription name.
func TopicSubscriptionName(topic string) string {
    return topic
}

// TopicSubscriptionNameWithSuffix uses the topic name with a chosen suffix as the subscription name.
func TopicSubscriptionNameWithSuffix(suffix string) SubscriptionNameFn {
    return func(topic string) string {
        return topic + suffix
    }
}

func (c *SubscriberConfig) setDefaults() {
    if c.GenerateSubscriptionName == nil {
        c.GenerateSubscriptionName = TopicSubscriptionName
    }
    if c.Unmarshaler == nil {
        c.Unmarshaler = DefaultMarshalerUnmarshaler{}
    }
}

func NewSubscriber(
// ...
Subscription name

To receive messages published to a topic, you must create a subscription to that topic. Only messages published to the topic after the subscription is created are available to subscriber applications.

The subscription connects the topic to a subscriber application that receives and processes messages published to the topic.

A topic can have multiple subscriptions, but a given subscription belongs to a single topic.

In Watermill, the subscription is created automatically during calling Subscribe(). Subscription name is generated by function passed to SubscriberConfig.GenerateSubscriptionName. By default, it is just the topic name (TopicSubscriptionName).

When you want to consume messages from a topic with multiple subscribers, you should use TopicSubscriptionNameWithSuffix or your custom function to generate the subscription name.

Connecting

Watermill will connect to the instance of Google Cloud Pub/Sub indicated by the environment variables. For production setup, set the GOOGLE_APPLICATION_CREDENTIALS env, as described in the official Google Cloud Pub/Sub docs. Note that you won’t need to install the Cloud SDK, as Watermill will take care of the administrative tasks (creating topics/subscriptions) with the default settings and proper permissions.

For development, you can use a Docker image with the emulator and the PUBSUB_EMULATOR_HOST env (check out the Getting Started guide).

Full source: docs/content/docs/getting-started/googlecloud/main.go

// ...
   publisher, err := googlecloud.NewPublisher(context.Background(), googlecloud.PublisherConfig{
        ProjectID: "test-project",
    })
    if err != nil {
        panic(err)
    }
// ...

Full source: docs/content/docs/getting-started/googlecloud/main.go

// ...
   subscriber, err := googlecloud.NewSubscriber(
        context.Background(),
        googlecloud.SubscriberConfig{
            // custom function to generate Subscription Name,
           // there are also predefined TopicSubscriptionName and TopicSubscriptionNameWithSuffix available.
           GenerateSubscriptionName: func(topic string) string {
                return "test-sub_" + topic
            },
            ProjectID: "test-project",
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...

Publishing

Full source: message/infrastructure/googlecloud/publisher.go

// ...
// Publish publishes a set of messages on a Google Cloud Pub/Sub topic.
// It blocks until all the messages are successfully published or an error occurred.
//
// To receive messages published to a topic, you must create a subscription to that topic.
// Only messages published to the topic after the subscription is created are available to subscriber applications.
//
// See https://cloud.google.com/pubsub/docs/publisher to find out more about how Google Cloud Pub/Sub Publishers work.
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
// ...

Subscribing

Full source: message/infrastructure/googlecloud/subscriber.go

// ...
// Subscribe consumes Google Cloud Pub/Sub and outputs them as Waterfall Message objects on the returned channel.
//
// In Google Cloud Pub/Sub, it is impossible to subscribe directly to a topic. Instead, a *subscription* is used.
// Each subscription has one topic, but there may be multiple subscriptions to one topic (with different names).
//
// The `topic` argument is transformed into subscription name with the configured `GenerateSubscriptionName` function.
// By default, if the subscription or topic don't exist, the are created. This behavior may be changed in the config.
//
// Be aware that in Google Cloud Pub/Sub, only messages sent after the subscription was created can be consumed.
//
// See https://cloud.google.com/pubsub/docs/subscriber to find out more about how Google Cloud Pub/Sub Subscriptions work.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Marshaler

Watermill’s messages cannot be directly sent to Kafka - they need to be marshaled. You can implement your marshaler or use default implementation.

Full source: message/infrastructure/googlecloud/marshaler.go

// ...
// Marshaler transforms a Waterfall Message into the Google Cloud client library Message.
type Marshaler interface {
    Marshal(topic string, msg *message.Message) (*pubsub.Message, error)
}

// Unmarshaler transforms a Google Cloud client library Message into the Waterfall Message.
type Unmarshaler interface {
    Unmarshal(*pubsub.Message) (*message.Message, error)
}

// UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID.
const UUIDHeaderKey = "_watermill_message_uuid"

// DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way:
// All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata.
// Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key.
type DefaultMarshalerUnmarshaler struct{}
// ...

NATS Streaming

NATS Streaming is a data streaming system powered by NATS, and written in the Go programming language. The executable name for the NATS Streaming server is nats-streaming-server. NATS Streaming embeds, extends, and interoperates seamlessly with the core NATS platform.

Characteristics

FeatureImplementsNote
ConsumerGroupsyesyou need to set DurableName and QueueGroup name
ExactlyOnceDeliveryno
GuaranteedOrdernowith the redelivery feature, order can’t be guaranteed
PersistentyesDurableName is required

Configuration

Full source: message/infrastructure/nats/publisher.go

// ...
type StreamingPublisherConfig struct {
    // ClusterID is the NATS Streaming cluster ID.
   ClusterID string

    // ClientID is the NATS Streaming client ID to connect with.
   // ClientID can contain only alphanumeric and `-` or `_` characters.
   ClientID string

    // StanOptions are custom options for a connection.
   StanOptions []stan.Option

    // Marshaler is marshaler used to marshal messages to stan format.
   Marshaler Marshaler
}

type StreamingPublisherPublishConfig struct {
    // Marshaler is marshaler used to marshal messages to stan format.
   Marshaler Marshaler
}

func (c StreamingPublisherConfig) Validate() error {
    if c.Marshaler == nil {
        return errors.New("StreamingPublisherConfig.Marshaler is missing")
    }

    return nil
}

func (c StreamingPublisherConfig) GetStreamingPublisherPublishConfig() StreamingPublisherPublishConfig {
    return StreamingPublisherPublishConfig{
        Marshaler: c.Marshaler,
    }
}

type StreamingPublisher struct {
// ...

Full source: message/infrastructure/nats/subscriber.go

// ...
type StreamingSubscriberConfig struct {
    // ClusterID is the NATS Streaming cluster ID.
   ClusterID string

    // ClientID is the NATS Streaming client ID to connect with.
   // ClientID can contain only alphanumeric and `-` or `_` characters.
   //
   // Using DurableName causes the NATS Streaming server to track
   // the last acknowledged message for that ClientID + DurableName.
   ClientID string

    // QueueGroup is the NATS Streaming queue group.
   //
   // All subscriptions with the same queue name (regardless of the connection they originate from)
   // will form a queue group. Each message will be delivered to only one subscriber per queue group,
   // using queuing semantics.
   //
   // It is recommended to set it with DurableName.
   // For non durable queue subscribers, when the last member leaves the group,
   // that group is removed. A durable queue group (DurableName) allows you to have all members leave
   // but still maintain state. When a member re-joins, it starts at the last position in that group.
   //
   // When QueueGroup is empty, subscribe without QueueGroup will be used.
   QueueGroup string

    // DurableName is the NATS streaming durable name.
   //
   // Subscriptions may also specify a “durable name” which will survive client restarts.
   // Durable subscriptions cause the server to track the last acknowledged message
   // sequence number for a client and durable name. When the client restarts/resubscribes,
   // and uses the same client ID and durable name, the server will resume delivery beginning
   // with the earliest unacknowledged message for this durable subscription.
   //
   // Doing this causes the NATS Streaming server to track
   // the last acknowledged message for that ClientID + DurableName.
   DurableName string

    // SubscribersCount determines wow much concurrent subscribers should be started.
   SubscribersCount int

    // CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
   // When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
   CloseTimeout time.Duration

    // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
   // It is mapped to stan.AckWait option.
   AckWaitTimeout time.Duration

    // StanOptions are custom []stan.Option passed to the connection.
   // It is also used to provide connection parameters, for example:
   //       stan.NatsURL("nats://localhost:4222")
   StanOptions []stan.Option

    // StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription.
   StanSubscriptionOptions []stan.SubscriptionOption

    // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
   Unmarshaler Unmarshaler
}

type StreamingSubscriberSubscriptionConfig struct {

    // StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription.
   StanSubscriptionOptions []stan.SubscriptionOption

    // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
   Unmarshaler Unmarshaler
    // QueueGroup is the NATS Streaming queue group.
   //
   // All subscriptions with the same queue name (regardless of the connection they originate from)
   // will form a queue group. Each message will be delivered to only one subscriber per queue group,
   // using queuing semantics.
   //
   // It is recommended to set it with DurableName.
   // For non durable queue subscribers, when the last member leaves the group,
   // that group is removed. A durable queue group (DurableName) allows you to have all members leave
   // but still maintain state. When a member re-joins, it starts at the last position in that group.
   //
   // When QueueGroup is empty, subscribe without QueueGroup will be used.
   QueueGroup string

    // DurableName is the NATS streaming durable name.
   //
   // Subscriptions may also specify a “durable name” which will survive client restarts.
   // Durable subscriptions cause the server to track the last acknowledged message
   // sequence number for a client and durable name. When the client restarts/resubscribes,
   // and uses the same client ID and durable name, the server will resume delivery beginning
   // with the earliest unacknowledged message for this durable subscription.
   //
   // Doing this causes the NATS Streaming server to track
   // the last acknowledged message for that ClientID + DurableName.
   DurableName string

    // SubscribersCount determines wow much concurrent subscribers should be started.
   SubscribersCount int

    // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
   // It is mapped to stan.AckWait option.
   AckWaitTimeout time.Duration
    // CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
   // When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
   CloseTimeout time.Duration
}

func (c *StreamingSubscriberConfig) GetStreamingSubscriberSubscriptionConfig() StreamingSubscriberSubscriptionConfig {
    return StreamingSubscriberSubscriptionConfig{
        StanSubscriptionOptions: c.StanSubscriptionOptions,
        Unmarshaler:             c.Unmarshaler,
        QueueGroup:              c.QueueGroup,
        DurableName:             c.DurableName,
        SubscribersCount:        c.SubscribersCount,
        AckWaitTimeout:          c.AckWaitTimeout,
        CloseTimeout:            c.CloseTimeout,
    }
}

func (c *StreamingSubscriberConfig) setDefaults() {
    if c.SubscribersCount <= 0 {
        c.SubscribersCount = 1
    }
    if c.CloseTimeout <= 0 {
        c.CloseTimeout = time.Second * 30
    }
    if c.AckWaitTimeout <= 0 {
        c.AckWaitTimeout = time.Second * 30
    }

    c.StanSubscriptionOptions = append(
        c.StanSubscriptionOptions,
        stan.SetManualAckMode(), // manual AckMode is required to support acking/nacking by client
       stan.AckWait(c.AckWaitTimeout),
    )

    if c.DurableName != "" {
        c.StanSubscriptionOptions = append(c.StanSubscriptionOptions, stan.DurableName(c.DurableName))
    }
}

func (c *StreamingSubscriberSubscriptionConfig) Validate() error {
    if c.Unmarshaler == nil {
        return errors.New("StreamingSubscriberConfig.Unmarshaler is missing")
    }

    if c.QueueGroup == "" && c.SubscribersCount > 1 {
        return errors.New(
            "to set StreamingSubscriberConfig.SubscribersCount " +
                "you need to also set StreamingSubscriberConfig.QueueGroup, " +
                "in other case you will receive duplicated messages",
        )
    }

    return nil
}

type StreamingSubscriber struct {
// ...

Connecting

By default NATS client will try to connect to localhost:4222. If you are using different hostname or port you should pass custom stan.Option: stan.NatsURL("nats://your-nats-hostname:4222") to StreamingSubscriberConfig and StreamingPublisherConfig.

Full source: message/infrastructure/nats/publisher.go

// ...
// NewStreamingPublisher creates a new StreamingPublisher.
//
// When using custom NATS hostname, you should pass it by options StreamingPublisherConfig.StanOptions:
//        // ...
//        StanOptions: []stan.Option{
//            stan.NatsURL("nats://your-nats-hostname:4222"),
//        }
//        // ...
func NewStreamingPublisher(config StreamingPublisherConfig, logger watermill.LoggerAdapter) (*StreamingPublisher, error) {
// ...

Example:

Full source: docs/content/docs/getting-started/nats-streaming/main.go

// ...
   publisher, err := nats.NewStreamingPublisher(
        nats.StreamingPublisherConfig{
            ClusterID: "test-cluster",
            ClientID:  "example-publisher",
            StanOptions: []stan.Option{
                stan.NatsURL("nats://nats-streaming:4222"),
            },
            Marshaler: nats.GobMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...

Full source: message/infrastructure/nats/subscriber.go

// ...
// NewStreamingSubscriber creates a new StreamingSubscriber.
//
// When using custom NATS hostname, you should pass it by options StreamingSubscriberConfig.StanOptions:
//        // ...
//        StanOptions: []stan.Option{
//            stan.NatsURL("nats://your-nats-hostname:4222"),
//        }
//        // ...
func NewStreamingSubscriber(config StreamingSubscriberConfig, logger watermill.LoggerAdapter) (*StreamingSubscriber, error) {
// ...

Example:

Full source: docs/content/docs/getting-started/nats-streaming/main.go

// ...
   subscriber, err := nats.NewStreamingSubscriber(
        nats.StreamingSubscriberConfig{
            ClusterID:        "test-cluster",
            ClientID:         "example-subscriber",
            QueueGroup:       "example",
            DurableName:      "my-durable",
            SubscribersCount: 4, // how much goroutines should consume messages
           CloseTimeout:     time.Minute,
            AckWaitTimeout:   time.Second * 30,
            StanOptions: []stan.Option{
                stan.NatsURL("nats://nats-streaming:4222"),
            },
            Unmarshaler: nats.GobMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...

Publishing

Full source: message/infrastructure/nats/publisher.go

// ...
// Publish publishes message to NATS.
//
// Publish will not return until an ack has been received from NATS Streaming.
// When one of messages delivery fails - function is interrupted.
func (p StreamingPublisher) Publish(topic string, messages ...*message.Message) error {
// ...

Subscribing

Full source: message/infrastructure/nats/subscriber.go

// ...
// Subscribe subscribes messages from NATS Streaming.
//
// Subscribe will spawn SubscribersCount goroutines making subscribe.
func (s *StreamingSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Marshaler

NATS doesn’t implement any mechanism like metadata or headers of the message. For that reason we need to marshal entire message to the []byte.

The default implementation is based on Golang’s gob.

Full source: message/infrastructure/nats/marshaler.go

// ...
type Marshaler interface {
    Marshal(topic string, msg *message.Message) ([]byte, error)
}

type Unmarshaler interface {
    Unmarshal(*stan.Msg) (*message.Message, error)
}

type MarshalerUnmarshaler interface {
    Marshaler
    Unmarshaler
}

// GobMarshaler is marshaller which is using Gob to marshal Watermill messages.
type GobMarshaler struct{}
// ...

When you have your own format of the messages, you can implement your own Marshaler, which will serialize messages in your format.

When needed, you can bypass both UUID and Metadata and send just a message.Payload, but some standard middlewares may be not working.

RabbitMQ (AMQP)

RabbitMQ is the most widely deployed open source message broker.

We are providing Pub/Sub implementation based on github.com/streadway/amqp.

Full source: message/infrastructure/amqp/doc.go

// AMQP implementation of Watermill's Pub/Sub interface.
//
// Supported features:
// - Reconnect support
// - Fully customizable configuration
// - Qos settings
// - TLS support
// - Publish Transactions support (optional, can be enabled in config)
//
// Nomenclature
//
// Unfortunately, Watermill's nomenclature is not fully compatible with AMQP's nomenclature.
// Depending of the configuration, topic can be mapped to exchange name, routing key and queue name.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// In case of any problem to find to what exchange name, routing key and queue name are set,
// just enable logging with debug level and check it in logs.
package amqp
// ...

Characteristics

FeatureImplementsNote
ConsumerGroupsyes*there are no literal consumer groups in AMQP, but we can achieve similar behaviour with GenerateQueueNameTopicNameWithSuffix. For more details please check AMQP “Consumer Groups” section
ExactlyOnceDeliveryno
GuaranteedOrderyesyes, please check https://www.rabbitmq.com/semantics.html#ordering
Persistentyes*when using NewDurablePubSubConfig or NewDurableQueueConfig

Configuration

Our AMQP is shipped with some pre-created configurations:

Full source: message/infrastructure/amqp/config.go

// ...
// NewDurablePubSubConfig creates config for durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
    return Config{
        Connection: ConnectionConfig{
            AmqpURI: amqpURI,
        },

        Marshaler: DefaultMarshaler{},

        Exchange: ExchangeConfig{
            GenerateName: func(topic string) string {
                return topic
            },
            Type:    "fanout",
            Durable: true,
        },
        Queue: QueueConfig{
            GenerateName: generateQueueName,
            Durable:      true,
        },
        QueueBind: QueueBindConfig{},
        Publish: PublishConfig{
            GenerateRoutingKey: func(topic string) string {
                return ""
            },
        },
        Consume: ConsumeConfig{
            Qos: QosConfig{
                PrefetchCount: 1,
            },
        },
    }
}

// NewNonDurablePubSubConfig creates config for non durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
    return Config{
        Connection: ConnectionConfig{
            AmqpURI: amqpURI,
        },

        Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

        Exchange: ExchangeConfig{
            GenerateName: func(topic string) string {
                return topic
            },
            Type: "fanout",
        },
        Queue: QueueConfig{
            GenerateName: generateQueueName,
        },
        QueueBind: QueueBindConfig{},
        Publish: PublishConfig{
            GenerateRoutingKey: func(topic string) string {
                return ""
            },
        },
        Consume: ConsumeConfig{
            Qos: QosConfig{
                PrefetchCount: 1,
            },
        },
    }
}

// NewDurableQueueConfig creates config for durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurableQueueConfig(amqpURI string) Config {
    return Config{
        Connection: ConnectionConfig{
            AmqpURI: amqpURI,
        },

        Marshaler: DefaultMarshaler{},

        Exchange: ExchangeConfig{
            GenerateName: func(topic string) string {
                return ""
            },
        },
        Queue: QueueConfig{
            GenerateName: GenerateQueueNameTopicName,
            Durable:      true,
        },
        QueueBind: QueueBindConfig{},
        Publish: PublishConfig{
            GenerateRoutingKey: func(topic string) string {
                return topic
            },
        },
        Consume: ConsumeConfig{
            Qos: QosConfig{
                PrefetchCount: 1,
            },
        },
    }
}

// NewNonDurableQueueConfig creates config for non durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurableQueueConfig(amqpURI string) Config {
    return Config{
        Connection: ConnectionConfig{
            AmqpURI: amqpURI,
        },

        Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

        Exchange: ExchangeConfig{
            GenerateName: func(topic string) string {
                return ""
            },
        },
        Queue: QueueConfig{
            GenerateName: GenerateQueueNameTopicName,
        },
        QueueBind: QueueBindConfig{},
        Publish: PublishConfig{
            GenerateRoutingKey: func(topic string) string {
                return topic
            },
        },
        Consume: ConsumeConfig{
            Qos: QosConfig{
                PrefetchCount: 1,
            },
        },
    }
}

type Config struct {
// ...

For detailed configuration description, please check message/infrastructure/amqp/pubsub_config.go

TLS Config

TLS config can be passed to Config.TLSConfig.

Connecting

Full source: docs/content/docs/getting-started/amqp/main.go

// ...
   publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
    if err != nil {
        panic(err)
    }
// ...

Full source: docs/content/docs/getting-started/amqp/main.go

// ...
   subscriber, err := amqp.NewSubscriber(
        // This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
       // It works as a simple queue.
       //
       // If you want to implement a Pub/Sub style service instead, check
       // https://watermill.io/docs/pub-sub-implementations/#amqp-consumer-groups
       amqpConfig,
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...

Publishing

Full source: message/infrastructure/amqp/publisher.go

// ...
// Publish publishes messages to AMQP broker.
// Publish is blocking until the broker has received and saved the message.
// Publish is always thread safe.
//
// Watermill's topic in Publish is not mapped to AMQP's topic, but depending on configuration it can be mapped
// to exchange, queue or routing key.
// For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
func (p *Publisher) Publish(topic string, messages ...*message.Message) (err error) {
// ...

Subscribing

Full source: message/infrastructure/amqp/subscriber.go

// ...
// Subscribe consumes messages from AMQP broker.
//
// Watermill's topic in Subscribe is not mapped to AMQP's topic, but depending on configuration it can be mapped
// to exchange, queue or routing key.
// For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Marshaler

Marshaler is responsible for mapping AMQP’s messages to Watermill’s messages.

Marshaller can be changed via the Configuration. If you need to customize thing in amqp.Delivery, you can do it PostprocessPublishing function.

Full source: message/infrastructure/amqp/marshaler.go

// ...
// Marshaler marshals Watermill's message to amqp.Publishing and unmarshals amqp.Delivery to Watermill's message.
type Marshaler interface {
    Marshal(msg *message.Message) (amqp.Publishing, error)
    Unmarshal(amqpMsg amqp.Delivery) (*message.Message, error)
}

type DefaultMarshaler struct {
    // PostprocessPublishing can be used to make some extra processing with amqp.Publishing,
   // for example add CorrelationId and ContentType:
   //
   //  amqp.DefaultMarshaler{
   //       PostprocessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing {
   //           publishing.CorrelationId = "correlation"
   //           publishing.ContentType = "application/json"
   //
   //           return publishing
   //       },
   //   }
   PostprocessPublishing func(amqp.Publishing) amqp.Publishing

    // When true, DeliveryMode will be not set to Persistent.
   //
   // DeliveryMode Transient means higher throughput, but messages will not be
   // restored on broker restart. The delivery mode of publishings is unrelated
   // to the durability of the queues they reside on. Transient messages will
   // not be restored to durable queues, persistent messages will be restored to
   // durable queues and lost on non-durable queues during server restart.
   NotPersistentDeliveryMode bool
}

func (d DefaultMarshaler) Marshal(msg *message.Message) (amqp.Publishing, error) {
    headers := make(amqp.Table, len(msg.Metadata)+1) // metadata + plus uuid

    for key, value := range msg.Metadata {
        headers[key] = value
    }
    headers[MessageUUIDHeaderKey] = msg.UUID

    publishing := amqp.Publishing{
        Body:    msg.Payload,
        Headers: headers,
    }
    if !d.NotPersistentDeliveryMode {
        publishing.DeliveryMode = amqp.Persistent
    }

    if d.PostprocessPublishing != nil {
        publishing = d.PostprocessPublishing(publishing)
    }

    return publishing, nil
}

func (DefaultMarshaler) Unmarshal(amqpMsg amqp.Delivery) (*message.Message, error) {
// ...

AMQP “Consumer Groups”

AMQP doesn’t provide mechanism like Kafka’s “consumer groups”. You can still achieve similar behaviour with GenerateQueueNameTopicNameWithSuffix and NewDurablePubSubConfig.

Full source: docs/content/docs/snippets/amqp-consumer-groups/main.go

// ...
func createSubscriber(queueSuffix string) *amqp.Subscriber {
    subscriber, err := amqp.NewSubscriber(
        // This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html
       // to create just a simple queue, you can use NewDurableQueueConfig or create your own config.
       amqp.NewDurablePubSubConfig(
            amqpURI,
            // Rabbit's queue name in this example is based on Watermill's topic passed to Subscribe
           // plus provided suffix.
           //
           // Exchange is Rabbit's "fanout", so when subscribing with suffix other than "test_consumer_group",
           // it will also receive all messages. It will work like separate consumer groups in Kafka.
           amqp.GenerateQueueNameTopicNameWithSuffix(queueSuffix),
        ),
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
    return subscriber
}

func main() {
    subscriber1 := createSubscriber("test_consumer_group_1")
    messages1, err := subscriber1.Subscribe("example.topic")
    if err != nil {
        panic(err)
    }
    go process("subscriber_1", messages1)

    subscriber2 := createSubscriber("test_consumer_group_2")
    messages2, err := subscriber2.Subscribe("example.topic")
    if err != nil {
        panic(err)
    }
    // subscriber2 will receive all messages independently from subscriber1
   go process("subscriber_2", messages2)
// ...

In this example both pubSub1 and pubSub2 will receive some messages independently.

io.Writer/io.Reader

This is an experimental Pub/Sub implementation that leverages the standard library’s io.Writer and io.Reader interfaces as sources of Publisher and Subscriber, respectively.

Note that these aren’t full-fledged Pub/Subs like Kafka, RabbitMQ, or the likes, but given the ubiquity of implementations of Writer and Reader they may come in handy, for uses like:

Characteristics

This is a very bare-bones implementation for now, so no extra features are supported. However, is is still sufficient for applications like a CLI producer/consumer.

FeatureImplementsNote
ConsumerGroupsno
ExactlyOnceDeliveryno
GuaranteedOrderno
Persistentno

Configuration

The publisher configuration is relatively simple.

Full source: message/infrastructure/io/publisher.go

// ...
type PublisherConfig struct {
    // MarshalFunc transforms the Watermill messages into raw bytes for transport.
   // Its behavior may be dependent on the topic.
   MarshalFunc MarshalMessageFunc
}

func (p PublisherConfig) validate() error {
    if p.MarshalFunc == nil {
        return errors.New("marshal func is empty")
    }

    return nil
}

// Publisher writes the messages to the underlying io.Writer.
// ...

The subscriber may work in two modes – either perform buffered reads of constant size from the io.Reader, or split the byte stream into messages using a delimiter byte.

The reading will continue even if the reads come up empty, but they will not be sent out as messages. The time to wait after an empty read is configured through the PollInterval parameter. As soon as a non-empty input is read, it will be packaged as a message and sent out.

Full source: message/infrastructure/io/subscriber.go

// ...
type SubscriberConfig struct {
    // BufferSize configures how many bytes will be read at a time from the Subscriber's Reader.
   // Each message will be treated as having at most BufferSize bytes.
   // If 0, Subscriber works in delimiter mode - it scans for messages delimited by the MessageDelimiter byte.
   BufferSize int
    // MessageDelimiter is the byte that is expected to separate messages if BufferSize is equal to 0.
   MessageDelimiter byte

    // PollInterval is the time between polling for new messages if the last read was empty. Defaults to time.Second.
   PollInterval time.Duration

    // UnmarshalFunc transforms the raw bytes into a Watermill message. Its behavior may be dependent on the topic.
   UnmarshalFunc UnmarshalMessageFunc

    Logger watermill.LoggerAdapter
}

func (c SubscriberConfig) validate() error {
    if c.BufferSize != 0 && c.MessageDelimiter != 0 {
        return errors.New("choose either BufferSize or MessageDelimiter")
    }

    if c.BufferSize < 0 {
        return errors.New("buffer size must be non-negative")
    }

    if c.UnmarshalFunc == nil {
        return errors.New("unmarshal func is empty")
    }

    return nil
}

func (c *SubscriberConfig) setDefaults() {
    if c.BufferSize == 0 && c.MessageDelimiter == 0 {
        c.MessageDelimiter = '\n'
    }

    if c.PollInterval == 0 {
        c.PollInterval = time.Second
    }

    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}

// Subscriber reads bytes from its underlying io.Reader and interprets them as Watermill messages.
// ...

The continuous reading may be used, for example, to emulate the behaviour of a tail -f command, like in this snippet:

Full source: docs/content/docs/snippets/tail-log-file/main.go

// ...
// this will `tail -f` a log file and publish an alert if a line fulfils some criterion

func main() {
    // if an alert is raised, the offending line will be publisher on this
   // this would be set to an actual publisher
   var alertPublisher message.Publisher

    if len(os.Args) < 2 {
        panic(
            fmt.Errorf("usage: %s /path/to/file.log", os.Args[0]),
        )
    }
    logFile, err := os.OpenFile(os.Args[1], os.O_RDONLY, 0444)
    if err != nil {
        panic(err)
    }

    sub, err := io.NewSubscriber(logFile, io.SubscriberConfig{
        UnmarshalFunc: io.PayloadUnmarshalFunc,
    })
    if err != nil {
        panic(err)
    }

    // for io.Subscriber, topic does not matter
   lines, err := sub.Subscribe(context.Background(), "")
    if err != nil {
        panic(err)
    }

    for line := range lines {
        if criterion(string(line.Payload)) {
            _ = alertPublisher.Publish("alerts", line)
        }
    }
}

func criterion(line string) bool {
    // decide whether an action needs to be taken
   return false
}
// ...

Marshaling/Unmarshaling

The MarshalFunc is an important part of io.Publisher, because it fully controls the format in the underlying io.Writer will obtain the messages.

Correspondingly, the UnmarshalFunc regulates how the bytes read by the io.Reader will be interpreted as Watermill messages.

Full source: message/infrastructure/io/marshal.go

// ...
// MarshalMessageFunc packages the message into a byte slice.
// The topic argument is there because some writers (i.e. loggers) might want to present the topic as part of their output.
type MarshalMessageFunc func(topic string, msg *message.Message) ([]byte, error)

// PayloadMarshalFunc dumps the message's payload, discarding the remaining fields of the message.
// ...

Full source: message/infrastructure/io/marshal.go

// ...
// UnmarshalMessageFunc restores the message from a byte slice.
// The topic argument is there to keep symmetry with MarshalMessageFunc, as some unmarshalers might restore the topic as well.
type UnmarshalMessageFunc func(topic string, b []byte) (*message.Message, error)

// PayloadUnmarshalFunc puts the whole byte slice into the message's Payload.
// ...

The package comes with some predefined marshal and unmarshal functions, but you might want to write your own marshaler/unmarshaler to work with the specific implementation of io.Writer/io.Reader that you are working with.

Topic

For the Publisher/Subscriber implementation itself, the topic has no meaning. It is difficult to interpret the meaning of topic in the general context of io.Writer and io.Reader interfaces.

However, the topic is passed as a parameter to the marshal/unmarshal functions, so the adaptations to particular Writer/Reader implementation may take it into account.

Implementing your own Pub/Sub

There aren’t your Pub/Sub implementation? Please check Implementing custom Pub/Sub.