Message
Message is one of core parts of Watermill. Messages are emitted by Publishers
and received by Subscribers
.
When a message is processed, you should send an Ack()
or a Nack()
when the processing failed.
Acks
and Nacks
are processed by Subscribers (in default implementations, the subscribers are waiting for an Ack
or a Nack
).
// ...
type Message struct {
// UUID is a unique identifier of the message.
//
// It is only used by Watermill for debugging.
// UUID can be empty.
UUID string
// Metadata contains the message metadata.
//
// Can be used to store data which doesn't require unmarshalling the entire payload.
// It is something similar to HTTP request's headers.
//
// Metadata is marshaled and will be saved to the PubSub.
Metadata Metadata
// Payload is the message's payload.
Payload Payload
// ack is closed when acknowledge is received.
ack chan struct{}
// noAck is closed when negative acknowledge is received.
noAck chan struct{}
ackMutex sync.Mutex
ackSentType ackType
ctx context.Context
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/message/message.go
Ack
Sending Ack
// ...
// Ack sends message's acknowledgement.
//
// Ack is not blocking.
// Ack is idempotent.
// False is returned, if Nack is already sent.
func (m *Message) Ack() bool {
// ...
Full source: github.com/ThreeDotsLabs/watermill/message/message.go
Nack
// ...
// Nack sends message's negative acknowledgement.
//
// Nack is not blocking.
// Nack is idempotent.
// False is returned, if Ack is already sent.
func (m *Message) Nack() bool {
// ...
Full source: github.com/ThreeDotsLabs/watermill/message/message.go
Receiving Ack/Nack
// ...
select {
case <-msg.Acked():
log.Print("ack received")
case <-msg.Nacked():
log.Print("nack received")
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/docs/content/docs/message/receiving-ack.go
Context
Message contains the standard library context, just like an HTTP request.
This context is used to propagate cancellation and deadlines when publishing and processing messages.
You can set the context using the SetContext
method or the NewMessageWithContext
constructor.
// ...
// NewMessageWithContext creates a new Message with given uuid, payload, and context.
func NewMessageWithContext(ctx context.Context, uuid string, payload Payload) *Message {
msg := NewMessage(uuid, payload)
msg.SetContext(ctx)
return msg
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/message/message.go
// ...
// Context returns the message's context. To change the context, use
// SetContext.
//
// The returned context is always non-nil; it defaults to the
// background context.
func (m *Message) Context() context.Context {
if m.ctx != nil {
return m.ctx
}
return context.Background()
}
// SetContext sets provided context to the message.
func (m *Message) SetContext(ctx context.Context) {
m.ctx = ctx
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/message/message.go