Message

Message is one of core parts of the Watermill.

Message

Message is one of core parts of the Watermill. Messages are emmitted by Publishers and received by Subscribers. When message is processed, you should send Ack() or Nack() when processing failed.

Acks and Nacks are processed by Subscribers (in default implementations subscribers are waiting for Ack or Nack).

Full source: message/message.go

// ...
type Message struct {
    // UUID is an unique identifier of message.
   //
   // It is only used by Watermill for debugging.
   // UUID can be empty.
   UUID string

    // Metadata contains the message metadata.
   //
   // Can be used to store data which doesn't require unmarshaling entire payload.
   // It is something similar to HTTP request's headers.
   Metadata Metadata

    // Payload is message's payload.
   Payload Payload

    // ack is closed, when acknowledge is received.
   ack chan struct{}
    // noACk is closed, when negative acknowledge is received.
   noAck chan struct{}

    ackMutex    sync.Mutex
    ackSentType ackType

    ctx context.Context
}

// ...

Ack

Sending Ack

Full source: message/message.go

// ...
// Ack sends message's acknowledgement.
//
// Ack is not blocking.
// Ack is idempotent.
// Error is returned, if Nack is already sent.
func (m *Message) Ack() error {
// ...

Nack

Full source: message/message.go

// ...
// Nack sends message's negative acknowledgement.
//
// Nack is not blocking.
// Nack is idempotent.
// Error is returned, if Ack is already sent.
func (m *Message) Nack() error {
// ...

Receiving Ack/Nack

Full source: docs/content/docs/message/receiving-ack.go

// ...
   select {
    case <-msg.Acked():
        log.Print("ack received")
    case <-msg.Nacked():
        log.Print("nack received")
    }
// ...

Context

Message contains the standard library context, just like an HTTP request.

Full source: message/message.go

// ...
// Context returns the message's context. To change the context, use
// SetContext.
//
// The returned context is always non-nil; it defaults to the
// background context.
func (m Message) Context() context.Context {
    if m.ctx != nil {
        return m.ctx
    }
    return context.Background()
}

// SetContext sets provided context to the message.
func (m *Message) SetContext(ctx context.Context) {
    m.ctx = ctx
}
// ...