Redis Stream

A fast, open source, in-memory, key-value data store.

Redis Stream

Redis is the open source, in-memory data store used by millions of developers. Redis stream is a data structure that acts like an append-only log in Redis. We are providing Pub/Sub implementation based on redis/go-redis.

Installation

go get github.com/ThreeDotsLabs/watermill-redisstream

Characteristics

FeatureImplementsNote
ConsumerGroupsyes
ExactlyOnceDeliveryno
GuaranteedOrderno
Persistentyes
FanOutyesuse XREAD to fan out messages when there is no consumer group

Configuration

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go

// ...
type PublisherConfig struct {
	Client     redis.UniversalClient
	Marshaller Marshaller
	Maxlens    map[string]int64
}

func (c *PublisherConfig) setDefaults() {
	if c.Marshaller == nil {
		c.Marshaller = DefaultMarshallerUnmarshaller{}
	}
}

func (c *PublisherConfig) Validate() error {
	if c.Client == nil {
		return errors.New("redis client is empty")
	}
	for topic, maxlen := range c.Maxlens {
		if maxlen < 0 {
			// zero maxlen stream indicates unlimited stream length
			c.Maxlens[topic] = 0
		}
	}
	return nil
}

// Publish publishes message to redis stream
// ...

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go

// ...
type SubscriberConfig struct {
	Client redis.UniversalClient

	Unmarshaller Unmarshaller

	// Redis stream consumer id, paired with ConsumerGroup
	Consumer string
	// When empty, fan-out mode will be used
	ConsumerGroup string

	// How long after Nack message should be redelivered
	NackResendSleep time.Duration

	// Block to wait next redis stream message
	BlockTime time.Duration

	// Claim idle pending message interval
	ClaimInterval time.Duration

	// How long should we treat a consumer as offline
	MaxIdleTime time.Duration

	// Start consumption from the specified message ID
	// When using "0", the consumer group will consume from the very first message
	// When using "$", the consumer group will consume from the latest message
	OldestId string
}

func (sc *SubscriberConfig) setDefaults() {
	if sc.Unmarshaller == nil {
		sc.Unmarshaller = DefaultMarshallerUnmarshaller{}
	}
	if sc.Consumer == "" {
		sc.Consumer = watermill.NewShortUUID()
	}
	if sc.NackResendSleep == 0 {
		sc.NackResendSleep = NoSleep
	}
	if sc.BlockTime == 0 {
		sc.BlockTime = DefaultBlockTime
	}
	if sc.ClaimInterval == 0 {
		sc.ClaimInterval = DefaultClaimInterval
	}
	if sc.MaxIdleTime == 0 {
		sc.MaxIdleTime = DefaultMaxIdleTime
	}
	// Consume from scratch by default
	if sc.OldestId == "" {
		sc.OldestId = "0"
	}
}

func (sc *SubscriberConfig) Validate() error {
	if sc.Client == nil {
		return errors.New("redis client is empty")
	}
	return nil
}

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...
Passing redis.UniversalClient

You need to configure and pass your own go-redis client via Client redis.UniversalClient in NewSubscriber and NewPublisher. The client can be either redis.Client or redis.ClusterClient.

Publisher

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go

// ...
// NewPublisher creates a new redis stream Publisher.
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/redisstream/main.go

// ...
	pubClient := redis.NewClient(&redis.Options{
		Addr: "redis:6379",
		DB:   0,
	})
	publisher, err := redisstream.NewPublisher(
		redisstream.PublisherConfig{
			Client:     pubClient,
			Marshaller: redisstream.DefaultMarshallerUnmarshaller{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}
// ...
Subscriber

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go

// ...
// NewSubscriber creates a new redis stream Subscriber
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/redisstream/main.go

// ...
	subClient := redis.NewClient(&redis.Options{
		Addr: "redis:6379",
		DB:   0,
	})
	subscriber, err := redisstream.NewSubscriber(
		redisstream.SubscriberConfig{
			Client:        subClient,
			Unmarshaller:  redisstream.DefaultMarshallerUnmarshaller{},
			ConsumerGroup: "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}
// ...

Publishing

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go

// ...
// Publish publishes message to redis stream
//
// Publish is blocking and wait for redis response
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
// ...

Subscribing

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go

// ...
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Marshaler

Watermill’s messages cannot be directly sent to Redis - they need to be marshaled. You can implement your marshaler or use default implementation. The default implementation uses MessagePack for efficient serialization.

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/marshaller.go

// ...
const UUIDHeaderKey = "_watermill_message_uuid"

type Marshaller interface {
	Marshal(topic string, msg *message.Message) (map[string]interface{}, error)
}

type Unmarshaller interface {
	Unmarshal(values map[string]interface{}) (msg *message.Message, err error)
}

type MarshallerUnmarshaller interface {
	Marshaller
	Unmarshaller
}

type DefaultMarshallerUnmarshaller struct{}
// ...