io.Writer/io.Reader
This is an experimental Pub/Sub implementation that leverages the standard library’s io.Writer
and io.Reader
interfaces as sources of Publisher and Subscriber, respectively.
Note that these aren’t full-fledged Pub/Subs like Kafka, RabbitMQ, or the likes, but given the ubiquity of implementations of Writer
and Reader
they may come in handy, for uses like:
- Writing messages to file or stdout
- Subscribing for data on a file or stdin and packaging it as messages
- Interfacing with third-party libraries that implement
io.Writer
orio.Reader
, like github.com/colinmarc/hdfs or github.com/mholt/archiver.
Installation
go get github.com/ThreeDotsLabs/watermill-io
Characteristics
This is a very bare-bones implementation for now, so no extra features are supported. However, it is still sufficient for applications like a CLI producer/consumer.
Feature | Implements | Note |
---|---|---|
ConsumerGroups | no | |
ExactlyOnceDelivery | no | |
GuaranteedOrder | no | |
Persistent | no |
Configuration
The publisher configuration is relatively simple.
Full source: github.com/ThreeDotsLabs/watermill-io/pkg/io/publisher.go
// ...
type PublisherConfig struct {
// MarshalFunc transforms the Watermill messages into raw bytes for transport.
// Its behavior may be dependent on the topic.
MarshalFunc MarshalMessageFunc
}
func (p PublisherConfig) validate() error {
if p.MarshalFunc == nil {
return errors.New("marshal func is empty")
}
return nil
}
// Publisher writes the messages to the underlying io.Writer.
// ...
The subscriber may work in two modes – either perform buffered reads of constant size from the io.Reader, or split the byte stream into messages using a delimiter byte.
The reading will continue even if the reads come up empty, but they will not be sent out as messages. The time to wait after an empty read is configured through the PollInterval
parameter. As soon as a non-empty input is read, it will be packaged as a message and sent out.
Full source: github.com/ThreeDotsLabs/watermill-io/pkg/io/subscriber.go
// ...
type SubscriberConfig struct {
// BufferSize configures how many bytes will be read at a time from the Subscriber's Reader.
// Each message will be treated as having at most BufferSize bytes.
// If 0, Subscriber works in delimiter mode - it scans for messages delimited by the MessageDelimiter byte.
BufferSize int
// MessageDelimiter is the byte that is expected to separate messages if BufferSize is equal to 0.
MessageDelimiter byte
// PollInterval is the time between polling for new messages if the last read was empty. Defaults to time.Second.
PollInterval time.Duration
// UnmarshalFunc transforms the raw bytes into a Watermill message. Its behavior may be dependent on the topic.
UnmarshalFunc UnmarshalMessageFunc
}
func (c SubscriberConfig) validate() error {
if c.BufferSize != 0 && c.MessageDelimiter != 0 {
return errors.New("choose either BufferSize or MessageDelimiter")
}
if c.BufferSize < 0 {
return errors.New("buffer size must be non-negative")
}
if c.UnmarshalFunc == nil {
return errors.New("unmarshal func is empty")
}
return nil
}
func (c *SubscriberConfig) setDefaults() {
if c.BufferSize == 0 && c.MessageDelimiter == 0 {
c.MessageDelimiter = '\n'
}
if c.PollInterval == 0 {
c.PollInterval = time.Second
}
}
// Subscriber reads bytes from its underlying io.Reader and interprets them as Watermill messages.
// ...
The continuous reading may be used, for example, to emulate the behaviour of a tail -f
command, like in this snippet:
Full source: github.com/ThreeDotsLabs/watermill/docs/content/docs/snippets/tail-log-file/main.go
// ...
// this will `tail -f` a log file and publish an alert if a line fulfils some criterion
func main() {
// if an alert is raised, the offending line will be publisher on this
// this would be set to an actual publisher
var alertPublisher message.Publisher
if len(os.Args) < 2 {
panic(
fmt.Errorf("usage: %s /path/to/file.log", os.Args[0]),
)
}
logFile, err := os.OpenFile(os.Args[1], os.O_RDONLY, 0444)
if err != nil {
panic(err)
}
sub, err := io.NewSubscriber(logFile, io.SubscriberConfig{
UnmarshalFunc: io.PayloadUnmarshalFunc,
}, watermill.NewStdLogger(true, false))
if err != nil {
panic(err)
}
// for io.Subscriber, topic does not matter
lines, err := sub.Subscribe(context.Background(), "")
if err != nil {
panic(err)
}
for line := range lines {
if criterion(string(line.Payload)) {
_ = alertPublisher.Publish("alerts", line)
}
}
}
func criterion(line string) bool {
// decide whether an action needs to be taken
return false
}
// ...
Marshaling/Unmarshaling
The MarshalFunc is an important part of io.Publisher
, because it fully controls the format in the underlying io.Writer
will obtain the messages.
Correspondingly, the UnmarshalFunc regulates how the bytes read by the io.Reader
will be interpreted as Watermill messages.
Full source: github.com/ThreeDotsLabs/watermill-io/pkg/io/marshal.go
// ...
// MarshalMessageFunc packages the message into a byte slice.
// The topic argument is there because some writers (i.e. loggers) might want to present the topic as part of their output.
type MarshalMessageFunc func(topic string, msg *message.Message) ([]byte, error)
// PayloadMarshalFunc dumps the message's payload, discarding the remaining fields of the message.
// ...
Full source: github.com/ThreeDotsLabs/watermill-io/pkg/io/marshal.go
// ...
// UnmarshalMessageFunc restores the message from a byte slice.
// The topic argument is there to keep symmetry with MarshalMessageFunc, as some unmarshalers might restore the topic as well.
type UnmarshalMessageFunc func(topic string, b []byte) (*message.Message, error)
// PayloadUnmarshalFunc puts the whole byte slice into the message's Payload.
// ...
The package comes with some predefined marshal and unmarshal functions, but you might want to write your own marshaler/unmarshaler to work with the specific implementation of io.Writer/io.Reader
that you are working with.
Topic
For the Publisher/Subscriber implementation itself, the topic has no meaning. It is difficult to interpret the meaning of topic in the general context of io.Writer
and io.Reader
interfaces.
However, the topic is passed as a parameter to the marshal/unmarshal functions, so the adaptations to particular Writer/Reader
implementation may take it into account.