io.Writer/io.Reader

Pub/Sub implemented as Go stdlib's most loved interfaces.

io.Writer/io.Reader

This is an experimental Pub/Sub implementation that leverages the standard library’s io.Writer and io.Reader interfaces as sources of Publisher and Subscriber, respectively.

Note that these aren’t full-fledged Pub/Subs like Kafka, RabbitMQ, or the likes, but given the ubiquity of implementations of Writer and Reader they may come in handy, for uses like:

Installation

go get github.com/ThreeDotsLabs/watermill-io

Characteristics

This is a very bare-bones implementation for now, so no extra features are supported. However, it is still sufficient for applications like a CLI producer/consumer.

FeatureImplementsNote
ConsumerGroupsno
ExactlyOnceDeliveryno
GuaranteedOrderno
Persistentno

Configuration

The publisher configuration is relatively simple.

Full source: github.com/ThreeDotsLabs/watermill-io/pkg/io/publisher.go

// ...
type PublisherConfig struct {
	// MarshalFunc transforms the Watermill messages into raw bytes for transport.
	// Its behavior may be dependent on the topic.
	MarshalFunc MarshalMessageFunc
}

func (p PublisherConfig) validate() error {
	if p.MarshalFunc == nil {
		return errors.New("marshal func is empty")
	}

	return nil
}

// Publisher writes the messages to the underlying io.Writer.
// ...

The subscriber may work in two modes – either perform buffered reads of constant size from the io.Reader, or split the byte stream into messages using a delimiter byte.

The reading will continue even if the reads come up empty, but they will not be sent out as messages. The time to wait after an empty read is configured through the PollInterval parameter. As soon as a non-empty input is read, it will be packaged as a message and sent out.

Full source: github.com/ThreeDotsLabs/watermill-io/pkg/io/subscriber.go

// ...
type SubscriberConfig struct {
	// BufferSize configures how many bytes will be read at a time from the Subscriber's Reader.
	// Each message will be treated as having at most BufferSize bytes.
	// If 0, Subscriber works in delimiter mode - it scans for messages delimited by the MessageDelimiter byte.
	BufferSize int
	// MessageDelimiter is the byte that is expected to separate messages if BufferSize is equal to 0.
	MessageDelimiter byte

	// PollInterval is the time between polling for new messages if the last read was empty. Defaults to time.Second.
	PollInterval time.Duration

	// UnmarshalFunc transforms the raw bytes into a Watermill message. Its behavior may be dependent on the topic.
	UnmarshalFunc UnmarshalMessageFunc
}

func (c SubscriberConfig) validate() error {
	if c.BufferSize != 0 && c.MessageDelimiter != 0 {
		return errors.New("choose either BufferSize or MessageDelimiter")
	}

	if c.BufferSize < 0 {
		return errors.New("buffer size must be non-negative")
	}

	if c.UnmarshalFunc == nil {
		return errors.New("unmarshal func is empty")
	}

	return nil
}

func (c *SubscriberConfig) setDefaults() {
	if c.BufferSize == 0 && c.MessageDelimiter == 0 {
		c.MessageDelimiter = '\n'
	}

	if c.PollInterval == 0 {
		c.PollInterval = time.Second
	}
}

// Subscriber reads bytes from its underlying io.Reader and interprets them as Watermill messages.
// ...

The continuous reading may be used, for example, to emulate the behaviour of a tail -f command, like in this snippet:

Full source: github.com/ThreeDotsLabs/watermill/docs/content/docs/snippets/tail-log-file/main.go

// ...
// this will `tail -f` a log file and publish an alert if a line fulfils some criterion

func main() {
	// if an alert is raised, the offending line will be publisher on this
	// this would be set to an actual publisher
	var alertPublisher message.Publisher

	if len(os.Args) < 2 {
		panic(
			fmt.Errorf("usage: %s /path/to/file.log", os.Args[0]),
		)
	}
	logFile, err := os.OpenFile(os.Args[1], os.O_RDONLY, 0444)
	if err != nil {
		panic(err)
	}

	sub, err := io.NewSubscriber(logFile, io.SubscriberConfig{
		UnmarshalFunc: io.PayloadUnmarshalFunc,
	}, watermill.NewStdLogger(true, false))
	if err != nil {
		panic(err)
	}

	// for io.Subscriber, topic does not matter
	lines, err := sub.Subscribe(context.Background(), "")
	if err != nil {
		panic(err)
	}

	for line := range lines {
		if criterion(string(line.Payload)) {
			_ = alertPublisher.Publish("alerts", line)
		}
	}
}

func criterion(line string) bool {
	// decide whether an action needs to be taken
	return false
}
// ...

Marshaling/Unmarshaling

The MarshalFunc is an important part of io.Publisher, because it fully controls the format in the underlying io.Writer will obtain the messages.

Correspondingly, the UnmarshalFunc regulates how the bytes read by the io.Reader will be interpreted as Watermill messages.

Full source: github.com/ThreeDotsLabs/watermill-io/pkg/io/marshal.go

// ...
// MarshalMessageFunc packages the message into a byte slice.
// The topic argument is there because some writers (i.e. loggers) might want to present the topic as part of their output.
type MarshalMessageFunc func(topic string, msg *message.Message) ([]byte, error)

// PayloadMarshalFunc dumps the message's payload, discarding the remaining fields of the message.
// ...

Full source: github.com/ThreeDotsLabs/watermill-io/pkg/io/marshal.go

// ...
// UnmarshalMessageFunc restores the message from a byte slice.
// The topic argument is there to keep symmetry with MarshalMessageFunc, as some unmarshalers might restore the topic as well.
type UnmarshalMessageFunc func(topic string, b []byte) (*message.Message, error)

// PayloadUnmarshalFunc puts the whole byte slice into the message's Payload.
// ...

The package comes with some predefined marshal and unmarshal functions, but you might want to write your own marshaler/unmarshaler to work with the specific implementation of io.Writer/io.Reader that you are working with.

Topic

For the Publisher/Subscriber implementation itself, the topic has no meaning. It is difficult to interpret the meaning of topic in the general context of io.Writer and io.Reader interfaces.

However, the topic is passed as a parameter to the marshal/unmarshal functions, so the adaptations to particular Writer/Reader implementation may take it into account.