Redis Stream

Redis is the open source, in-memory data store used by millions of developers. Redis stream is a data structure that acts like an append-only log in Redis. We are providing Pub/Sub implementation based on redis/go-redis .

You can find a fully functional example with Redis Stream in the Watermill examples .

Installation

go get github.com/ThreeDotsLabs/watermill-redisstream

Characteristics

FeatureImplementsNote
ConsumerGroupsyes
ExactlyOnceDeliveryno
GuaranteedOrderno
Persistentyes
FanOutyesuse XREAD to fan out messages when there is no consumer group

Configuration

// ...
type PublisherConfig struct {
	Client        redis.UniversalClient
	Marshaller    Marshaller
	Maxlens       map[string]int64
	DefaultMaxlen int64
}

func (c *PublisherConfig) setDefaults() {
	if c.Marshaller == nil {
		c.Marshaller = DefaultMarshallerUnmarshaller{}
	}
}

func (c *PublisherConfig) Validate() error {
	if c.Client == nil {
		return errors.New("redis client is empty")
	}
	for topic, maxlen := range c.Maxlens {
		if maxlen < 0 {
			// zero maxlen stream indicates unlimited stream length
			c.Maxlens[topic] = c.DefaultMaxlen
		}
	}
	return nil
}

// Publish publishes message to redis stream
// ...

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go

// ...
type SubscriberConfig struct {
	Client redis.UniversalClient

	Unmarshaller Unmarshaller

	// Redis stream consumer id, paired with ConsumerGroup.
	Consumer string
	// When empty, fan-out mode will be used.
	ConsumerGroup string

	// How long after Nack message should be redelivered.
	NackResendSleep time.Duration

	// Block to wait next redis stream message.
	BlockTime time.Duration

	// Claim idle pending message interval.
	ClaimInterval time.Duration

	// How many pending messages are claimed at most each claim interval.
	ClaimBatchSize int64

	// How long should we treat a pending message as claimable.
	MaxIdleTime time.Duration

	// Check consumer status interval.
	CheckConsumersInterval time.Duration

	// After this timeout an idle consumer with no pending messages will be removed from the consumer group.
	ConsumerTimeout time.Duration

	// Start consumption from the specified message ID.
	// When using "0", the consumer group will consume from the very first message.
	// When using "$", the consumer group will consume from the latest message.
	OldestId string

	// If consumer group in not set, for fanout start consumption from the specified message ID.
	// When using "0", the consumer will consume from the very first message.
	// When using "$", the consumer will consume from the latest message.
	FanOutOldestId string

	// If this is set, it will be called to decide whether a pending message that
	// has been idle for more than MaxIdleTime should actually be claimed.
	// If this is not set, then all pending messages that have been idle for more than MaxIdleTime will be claimed.
	// This can be useful e.g. for tasks where the processing time can be very variable -
	// so we can't just use a short MaxIdleTime; but at the same time dead
	// consumers should be spotted quickly - so we can't just use a long MaxIdleTime either.
	// In such cases, if we have another way for checking consumers' health, then we can
	// leverage that in this callback.
	ShouldClaimPendingMessage func(redis.XPendingExt) bool

	// If this is set, it will be called to decide whether a reading error
	// should return the read method and close the subscriber or just log the error
	// and continue.
	ShouldStopOnReadErrors func(error) bool
}

func (sc *SubscriberConfig) setDefaults() {
	if sc.Unmarshaller == nil {
		sc.Unmarshaller = DefaultMarshallerUnmarshaller{}
	}
	if sc.Consumer == "" {
		sc.Consumer = watermill.NewShortUUID()
	}
	if sc.NackResendSleep == 0 {
		sc.NackResendSleep = NoSleep
	}
	if sc.BlockTime == 0 {
		sc.BlockTime = DefaultBlockTime
	}
	if sc.ClaimInterval == 0 {
		sc.ClaimInterval = DefaultClaimInterval
	}
	if sc.ClaimBatchSize == 0 {
		sc.ClaimBatchSize = DefaultClaimBatchSize
	}
	if sc.MaxIdleTime == 0 {
		sc.MaxIdleTime = DefaultMaxIdleTime
	}
	if sc.CheckConsumersInterval == 0 {
		sc.CheckConsumersInterval = DefaultCheckConsumersInterval
	}
	if sc.ConsumerTimeout == 0 {
		sc.ConsumerTimeout = DefaultConsumerTimeout
	}
	// Consume from scratch by default
	if sc.OldestId == "" {
		sc.OldestId = "0"
	}

	if sc.FanOutOldestId == "" {
		sc.FanOutOldestId = "$"
	}
}

func (sc *SubscriberConfig) Validate() error {
	if sc.Client == nil {
		return errors.New("redis client is empty")
	}
	return nil
}

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go

Passing redis.UniversalClient

You need to configure and pass your own go-redis client via Client redis.UniversalClient in NewSubscriber and NewPublisher. The client can be either redis.Client or redis.ClusterClient.

Publisher

// ...
// NewPublisher creates a new redis stream Publisher.
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go

Example:

// ...
	pubClient := redis.NewClient(&redis.Options{
		Addr: "redis:6379",
		DB:   0,
	})
	publisher, err := redisstream.NewPublisher(
		redisstream.PublisherConfig{
			Client:     pubClient,
			Marshaller: redisstream.DefaultMarshallerUnmarshaller{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/redisstream/main.go

Subscriber

// ...
// NewSubscriber creates a new redis stream Subscriber.
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go

Example:

// ...
	subClient := redis.NewClient(&redis.Options{
		Addr: "redis:6379",
		DB:   0,
	})
	subscriber, err := redisstream.NewSubscriber(
		redisstream.SubscriberConfig{
			Client:        subClient,
			Unmarshaller:  redisstream.DefaultMarshallerUnmarshaller{},
			ConsumerGroup: "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/redisstream/main.go

Publishing

// ...
// Publish publishes message to redis stream
//
// Publish is blocking and waits for redis response.
// When any of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
// ...

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go

Subscribing

// ...
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go

Marshaler

Watermill’s messages cannot be directly sent to Redis - they need to be marshaled. You can implement your marshaler or use default implementation. The default implementation uses MessagePack for efficient serialization.

// ...
const UUIDHeaderKey = "_watermill_message_uuid"

type Marshaller interface {
	Marshal(topic string, msg *message.Message) (map[string]interface{}, error)
}

type Unmarshaller interface {
	Unmarshal(values map[string]interface{}) (msg *message.Message, err error)
}

type MarshallerUnmarshaller interface {
	Marshaller
	Unmarshaller
}

type DefaultMarshallerUnmarshaller struct{}
// ...

Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/marshaller.go


Check our online hands-on training