Redis Stream On this page Redis is the open source, in-memory data store used by millions of developers. Redis stream is a data structure that acts like an append-only log in Redis. We are providing Pub/Sub implementation based on redis/go-redis
.
You can find a fully functional example with Redis Stream in the Watermill examples
.
Installation# go get github.com/ThreeDotsLabs/watermill-redisstream
Characteristics# Feature Implements Note ConsumerGroups yes ExactlyOnceDelivery no GuaranteedOrder no Persistent yes FanOut yes use XREAD to fan out messages when there is no consumer group
Configuration# // ...
type PublisherConfig struct {
Client redis . UniversalClient
Marshaller Marshaller
Maxlens map [ string ] int64
DefaultMaxlen int64
}
func ( c * PublisherConfig ) setDefaults () {
if c . Marshaller == nil {
c . Marshaller = DefaultMarshallerUnmarshaller {}
}
}
func ( c * PublisherConfig ) Validate () error {
if c . Client == nil {
return errors . New ( "redis client is empty" )
}
for topic , maxlen := range c . Maxlens {
if maxlen < 0 {
// zero maxlen stream indicates unlimited stream length
c . Maxlens [ topic ] = c . DefaultMaxlen
}
}
return nil
}
// Publish publishes message to redis stream
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go
// ...
type SubscriberConfig struct {
Client redis . UniversalClient
Unmarshaller Unmarshaller
// Redis stream consumer id, paired with ConsumerGroup.
Consumer string
// When empty, fan-out mode will be used.
ConsumerGroup string
// How long after Nack message should be redelivered.
NackResendSleep time . Duration
// Block to wait next redis stream message.
BlockTime time . Duration
// Claim idle pending message interval.
ClaimInterval time . Duration
// How many pending messages are claimed at most each claim interval.
ClaimBatchSize int64
// How long should we treat a pending message as claimable.
MaxIdleTime time . Duration
// Check consumer status interval.
CheckConsumersInterval time . Duration
// After this timeout an idle consumer with no pending messages will be removed from the consumer group.
ConsumerTimeout time . Duration
// Start consumption from the specified message ID.
// When using "0", the consumer group will consume from the very first message.
// When using "$", the consumer group will consume from the latest message.
OldestId string
// If consumer group in not set, for fanout start consumption from the specified message ID.
// When using "0", the consumer will consume from the very first message.
// When using "$", the consumer will consume from the latest message.
FanOutOldestId string
// If this is set, it will be called to decide whether a pending message that
// has been idle for more than MaxIdleTime should actually be claimed.
// If this is not set, then all pending messages that have been idle for more than MaxIdleTime will be claimed.
// This can be useful e.g. for tasks where the processing time can be very variable -
// so we can't just use a short MaxIdleTime; but at the same time dead
// consumers should be spotted quickly - so we can't just use a long MaxIdleTime either.
// In such cases, if we have another way for checking consumers' health, then we can
// leverage that in this callback.
ShouldClaimPendingMessage func ( redis . XPendingExt ) bool
// If this is set, it will be called to decide whether a reading error
// should return the read method and close the subscriber or just log the error
// and continue.
ShouldStopOnReadErrors func ( error ) bool
}
func ( sc * SubscriberConfig ) setDefaults () {
if sc . Unmarshaller == nil {
sc . Unmarshaller = DefaultMarshallerUnmarshaller {}
}
if sc . Consumer == "" {
sc . Consumer = watermill . NewShortUUID ()
}
if sc . NackResendSleep == 0 {
sc . NackResendSleep = NoSleep
}
if sc . BlockTime == 0 {
sc . BlockTime = DefaultBlockTime
}
if sc . ClaimInterval == 0 {
sc . ClaimInterval = DefaultClaimInterval
}
if sc . ClaimBatchSize == 0 {
sc . ClaimBatchSize = DefaultClaimBatchSize
}
if sc . MaxIdleTime == 0 {
sc . MaxIdleTime = DefaultMaxIdleTime
}
if sc . CheckConsumersInterval == 0 {
sc . CheckConsumersInterval = DefaultCheckConsumersInterval
}
if sc . ConsumerTimeout == 0 {
sc . ConsumerTimeout = DefaultConsumerTimeout
}
// Consume from scratch by default
if sc . OldestId == "" {
sc . OldestId = "0"
}
if sc . FanOutOldestId == "" {
sc . FanOutOldestId = "$"
}
}
func ( sc * SubscriberConfig ) Validate () error {
if sc . Client == nil {
return errors . New ( "redis client is empty" )
}
return nil
}
func ( s * Subscriber ) Subscribe ( ctx context . Context , topic string ) ( <- chan * message . Message , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go
Passing redis.UniversalClient
# You need to configure and pass your own go-redis client via Client redis.UniversalClient
in NewSubscriber
and NewPublisher
. The client can be either redis.Client
or redis.ClusterClient
.
Publisher# // ...
// NewPublisher creates a new redis stream Publisher.
func NewPublisher ( config PublisherConfig , logger watermill . LoggerAdapter ) ( * Publisher , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go
Example:
// ...
pubClient := redis . NewClient ( & redis . Options {
Addr : "redis:6379" ,
DB : 0 ,
})
publisher , err := redisstream . NewPublisher (
redisstream . PublisherConfig {
Client : pubClient ,
Marshaller : redisstream . DefaultMarshallerUnmarshaller {},
},
watermill . NewStdLogger ( false , false ),
)
if err != nil {
panic ( err )
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/redisstream/main.go
Subscriber# // ...
// NewSubscriber creates a new redis stream Subscriber.
func NewSubscriber ( config SubscriberConfig , logger watermill . LoggerAdapter ) ( * Subscriber , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go
Example:
// ...
subClient := redis . NewClient ( & redis . Options {
Addr : "redis:6379" ,
DB : 0 ,
})
subscriber , err := redisstream . NewSubscriber (
redisstream . SubscriberConfig {
Client : subClient ,
Unmarshaller : redisstream . DefaultMarshallerUnmarshaller {},
ConsumerGroup : "test_consumer_group" ,
},
watermill . NewStdLogger ( false , false ),
)
if err != nil {
panic ( err )
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/redisstream/main.go
Publishing# // ...
// Publish publishes message to redis stream
//
// Publish is blocking and waits for redis response.
// When any of messages delivery fails - function is interrupted.
func ( p * Publisher ) Publish ( topic string , msgs ...* message . Message ) error {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/publisher.go
Subscribing# // ...
func ( s * Subscriber ) Subscribe ( ctx context . Context , topic string ) ( <- chan * message . Message , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/subscriber.go
Marshaler# Watermill’s messages cannot be directly sent to Redis - they need to be marshaled. You can implement your marshaler or use default implementation. The default implementation uses MessagePack
for efficient serialization.
// ...
const UUIDHeaderKey = "_watermill_message_uuid"
type Marshaller interface {
Marshal ( topic string , msg * message . Message ) ( map [ string ] interface {}, error )
}
type Unmarshaller interface {
Unmarshal ( values map [ string ] interface {}) ( msg * message . Message , err error )
}
type MarshallerUnmarshaller interface {
Marshaller
Unmarshaller
}
type DefaultMarshallerUnmarshaller struct {}
// ...
Full source: github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream/marshaller.go