Message

Message is one of core parts of Watermill. Messages are emitted by Publishers and received by Subscribers . When a message is processed, you should send an Ack() or a Nack() when the processing failed.

Acks and Nacks are processed by Subscribers (in default implementations, the subscribers are waiting for an Ack or a Nack).

// ...
type Message struct {
	// UUID is a unique identifier of the message.
	//
	// It is only used by Watermill for debugging.
	// UUID can be empty.
	UUID string

	// Metadata contains the message metadata.
	//
	// Can be used to store data which doesn't require unmarshalling the entire payload.
	// It is something similar to HTTP request's headers.
	//
	// Metadata is marshaled and will be saved to the PubSub.
	Metadata Metadata

	// Payload is the message's payload.
	Payload Payload

	// ack is closed when acknowledge is received.
	ack chan struct{}
	// noAck is closed when negative acknowledge is received.
	noAck chan struct{}

	ackMutex    sync.Mutex
	ackSentType ackType

	ctx context.Context
}

// ...

Full source: github.com/ThreeDotsLabs/watermill/message/message.go

Ack

Sending Ack

// ...
// Ack sends message's acknowledgement.
//
// Ack is not blocking.
// Ack is idempotent.
// False is returned, if Nack is already sent.
func (m *Message) Ack() bool {
// ...

Full source: github.com/ThreeDotsLabs/watermill/message/message.go

Nack

// ...
// Nack sends message's negative acknowledgement.
//
// Nack is not blocking.
// Nack is idempotent.
// False is returned, if Ack is already sent.
func (m *Message) Nack() bool {
// ...

Full source: github.com/ThreeDotsLabs/watermill/message/message.go

Receiving Ack/Nack

// ...
	select {
	case <-msg.Acked():
		log.Print("ack received")
	case <-msg.Nacked():
		log.Print("nack received")
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/docs/content/docs/message/receiving-ack.go

Context

Message contains the standard library context, just like an HTTP request.

// ...
// Context returns the message's context. To change the context, use
// SetContext.
//
// The returned context is always non-nil; it defaults to the
// background context.
func (m *Message) Context() context.Context {
	if m.ctx != nil {
		return m.ctx
	}
	return context.Background()
}

// SetContext sets provided context to the message.
func (m *Message) SetContext(ctx context.Context) {
	m.ctx = ctx
}
// ...

Full source: github.com/ThreeDotsLabs/watermill/message/message.go


Check our online hands-on training