Redis Stream
Redis is the open source, in-memory data store used by millions of developers. Redis stream is a data structure that acts like an append-only log in Redis. We are providing Pub/Sub implementation based on redis/go-redis .
You can find a fully functional example with Redis Stream in the Watermill examples .
Installation
go get github.com/ThreeDotsLabs/watermill-redisstream
Characteristics
Feature | Implements | Note |
---|---|---|
ConsumerGroups | yes | |
ExactlyOnceDelivery | no | |
GuaranteedOrder | no | |
Persistent | yes | |
FanOut | yes | use XREAD to fan out messages when there is no consumer group |
Configuration
// ...
type PublisherConfig struct {
Client redis.UniversalClient
Marshaller Marshaller
Maxlens map[string]int64
DefaultMaxlen int64
}
func (c *PublisherConfig) setDefaults() {
if c.Marshaller == nil {
c.Marshaller = DefaultMarshallerUnmarshaller{}
}
}
func (c *PublisherConfig) Validate() error {
if c.Client == nil {
return errors.New("redis client is empty")
}
for topic, maxlen := range c.Maxlens {
if maxlen < 0 {
// zero maxlen stream indicates unlimited stream length
c.Maxlens[topic] = c.DefaultMaxlen
}
}
return nil
}
// Publish publishes message to redis stream
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go
// ...
type SubscriberConfig struct {
Client redis.UniversalClient
Unmarshaller Unmarshaller
// Redis stream consumer id, paired with ConsumerGroup.
Consumer string
// When empty, fan-out mode will be used.
ConsumerGroup string
// How long after Nack message should be redelivered.
NackResendSleep time.Duration
// Block to wait next redis stream message.
BlockTime time.Duration
// Claim idle pending message interval.
ClaimInterval time.Duration
// How many pending messages are claimed at most each claim interval.
ClaimBatchSize int64
// How long should we treat a pending message as claimable.
MaxIdleTime time.Duration
// Check consumer status interval.
CheckConsumersInterval time.Duration
// After this timeout an idle consumer with no pending messages will be removed from the consumer group.
ConsumerTimeout time.Duration
// Start consumption from the specified message ID.
// When using "0", the consumer group will consume from the very first message.
// When using "$", the consumer group will consume from the latest message.
OldestId string
// If consumer group in not set, for fanout start consumption from the specified message ID.
// When using "0", the consumer will consume from the very first message.
// When using "$", the consumer will consume from the latest message.
FanOutOldestId string
// If this is set, it will be called to decide whether a pending message that
// has been idle for more than MaxIdleTime should actually be claimed.
// If this is not set, then all pending messages that have been idle for more than MaxIdleTime will be claimed.
// This can be useful e.g. for tasks where the processing time can be very variable -
// so we can't just use a short MaxIdleTime; but at the same time dead
// consumers should be spotted quickly - so we can't just use a long MaxIdleTime either.
// In such cases, if we have another way for checking consumers' health, then we can
// leverage that in this callback.
ShouldClaimPendingMessage func(redis.XPendingExt) bool
// If this is set, it will be called to decide whether a reading error
// should return the read method and close the subscriber or just log the error
// and continue.
ShouldStopOnReadErrors func(error) bool
}
func (sc *SubscriberConfig) setDefaults() {
if sc.Unmarshaller == nil {
sc.Unmarshaller = DefaultMarshallerUnmarshaller{}
}
if sc.Consumer == "" {
sc.Consumer = watermill.NewShortUUID()
}
if sc.NackResendSleep == 0 {
sc.NackResendSleep = NoSleep
}
if sc.BlockTime == 0 {
sc.BlockTime = DefaultBlockTime
}
if sc.ClaimInterval == 0 {
sc.ClaimInterval = DefaultClaimInterval
}
if sc.ClaimBatchSize == 0 {
sc.ClaimBatchSize = DefaultClaimBatchSize
}
if sc.MaxIdleTime == 0 {
sc.MaxIdleTime = DefaultMaxIdleTime
}
if sc.CheckConsumersInterval == 0 {
sc.CheckConsumersInterval = DefaultCheckConsumersInterval
}
if sc.ConsumerTimeout == 0 {
sc.ConsumerTimeout = DefaultConsumerTimeout
}
// Consume from scratch by default
if sc.OldestId == "" {
sc.OldestId = "0"
}
if sc.FanOutOldestId == "" {
sc.FanOutOldestId = "$"
}
}
func (sc *SubscriberConfig) Validate() error {
if sc.Client == nil {
return errors.New("redis client is empty")
}
return nil
}
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go
Passing redis.UniversalClient
You need to configure and pass your own go-redis client via Client redis.UniversalClient
in NewSubscriber
and NewPublisher
. The client can be either redis.Client
or redis.ClusterClient
.
Publisher
// ...
// NewPublisher creates a new redis stream Publisher.
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go
Example:
// ...
pubClient := redis.NewClient(&redis.Options{
Addr: "redis:6379",
DB: 0,
})
publisher, err := redisstream.NewPublisher(
redisstream.PublisherConfig{
Client: pubClient,
Marshaller: redisstream.DefaultMarshallerUnmarshaller{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/redisstream/main.go
Subscriber
// ...
// NewSubscriber creates a new redis stream Subscriber.
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go
Example:
// ...
subClient := redis.NewClient(&redis.Options{
Addr: "redis:6379",
DB: 0,
})
subscriber, err := redisstream.NewSubscriber(
redisstream.SubscriberConfig{
Client: subClient,
Unmarshaller: redisstream.DefaultMarshallerUnmarshaller{},
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/redisstream/main.go
Publishing
// ...
// Publish publishes message to redis stream
//
// Publish is blocking and waits for redis response.
// When any of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go
Subscribing
// ...
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go
Marshaler
Watermill’s messages cannot be directly sent to Redis - they need to be marshaled. You can implement your marshaler or use default implementation. The default implementation uses MessagePack for efficient serialization.
// ...
const UUIDHeaderKey = "_watermill_message_uuid"
type Marshaller interface {
Marshal(topic string, msg *message.Message) (map[string]interface{}, error)
}
type Unmarshaller interface {
Unmarshal(values map[string]interface{}) (msg *message.Message, err error)
}
type MarshallerUnmarshaller interface {
Marshaller
Unmarshaller
}
type DefaultMarshallerUnmarshaller struct{}
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/marshaller.go