NATS Jetstream

NATS Jetstream is a data streaming system powered by NATS, and written in the Go programming language.

As of v2.0.2 this middleware will contain a beta implementation in pkg/jetstream based on the nats.go Jetstream package . This implementation is considered experimental tracking with the upstream client though we target a stable watermill API by v2.1. For production use it is recommended to use the pubsub implementations in pkg/nats with Jetstream enabled.

You can find a fully functional example with NATS JetStream in the Watermill examples .

Installation

go get github.com/ThreeDotsLabs/watermill-nats/v2

Characteristics

FeatureImplementsNote
ConsumerGroupsyesyou need to set QueueGroupPrefix name or provide an optional calculator
ExactlyOnceDeliveryyesyou need to ensure ‘AckAsync’ has default false value and set ‘TrackMsgId’ to true on the Jetstream configuration
GuaranteedOrdernowith the redelivery feature, order can’t be guaranteed
Persistentyes

Configuration

Configuration is done through PublisherConfig and SubscriberConfig types. These share a common JetStreamConfig. To use the experimental nats-core support, set Disabled=true.

// ...
// JetStreamConfig contains configuration settings specific to running in JetStream mode
type JetStreamConfig struct {
	// Disabled controls whether JetStream semantics should be used
	Disabled bool

	// AutoProvision indicates the application should create the configured stream if missing on the broker
	AutoProvision bool

	// ConnectOptions contains JetStream-specific options to be used when establishing context
	ConnectOptions []nats.JSOpt

	// SubscribeOptions contains options to be used when establishing subscriptions
	SubscribeOptions []nats.SubOpt

	// PublishOptions contains options to be sent on every publish operation
	PublishOptions []nats.PubOpt

	// TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication (needed for exactly once processing)
	TrackMsgId bool

	// AckAsync enables asynchronous acknowledgement
	AckAsync bool

	// DurablePrefix is the prefix used by to derive the durable name from the topic.
	//
	// By default the prefix will be used on its own to form the durable name.  This only allows use
	// of a single subscription per configuration.  For more flexibility provide a DurableCalculator
	// that will receive durable prefix + topic.
	//
	// Subscriptions may also specify a “durable name” which will survive client restarts.
	// Durable subscriptions cause the server to track the last acknowledged message
	// sequence number for a client and durable name. When the client restarts/resubscribes,
	// and uses the same client ID and durable name, the server will resume delivery beginning
	// with the earliest unacknowledged message for this durable subscription.
	//
	// Doing this causes the JetStream server to track
	// the last acknowledged message for that ClientID + Durable.
	DurablePrefix string

	// DurableCalculator is a custom function used to derive a durable name from a topic + durable prefix
	DurableCalculator DurableCalculator
}

type DurableCalculator = func(string, string) string
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/jetstream.go

PublisherConfig:

// ...
type PublisherConfig struct {
	// URL is the NATS URL.
	URL string

	// NatsOptions are custom options for a connection.
	NatsOptions []nats.Option

	// Marshaler is marshaler used to marshal messages between watermill and wire formats
	Marshaler Marshaler

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
	SubjectCalculator SubjectCalculator

	// JetStream holds JetStream specific settings
	JetStream JetStreamConfig
}

// PublisherPublishConfig is the configuration subset needed for an individual publish call
type PublisherPublishConfig struct {
	// Marshaler is marshaler used to marshal messages between watermill and wire formats
	Marshaler Marshaler

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
	SubjectCalculator SubjectCalculator

	// JetStream holds JetStream specific settings
	JetStream JetStreamConfig
}

func (c *PublisherConfig) setDefaults() {
	if c.Marshaler == nil {
		c.Marshaler = &NATSMarshaler{}
	}
	if c.SubjectCalculator == nil {
		c.SubjectCalculator = DefaultSubjectCalculator
	}
}

// Validate ensures configuration is valid before use
func (c PublisherConfig) Validate() error {
	if c.Marshaler == nil {
		return errors.New("PublisherConfig.Marshaler is missing")
	}

	if c.SubjectCalculator == nil {
		return errors.New("PublisherConfig.SubjectCalculator is missing")
	}
	return nil
}

// GetPublisherPublishConfig gets the configuration subset needed for individual publish calls once a connection has been established
func (c PublisherConfig) GetPublisherPublishConfig() PublisherPublishConfig {
	return PublisherPublishConfig{
		Marshaler:         c.Marshaler,
		SubjectCalculator: c.SubjectCalculator,
		JetStream:         c.JetStream,
	}
}

// Publisher provides the nats implementation for watermill publish operations
type Publisher struct {
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/publisher.go

Subscriber Config:

// ...
type SubscriberConfig struct {
	// URL is the URL to the broker
	URL string

	// QueueGroupPrefix is the prefix used by SubjectCalculator to derive queue group from the topic.
	//
	// All subscriptions with the same queue name (regardless of the connection they originate from)
	// will form a queue group. Each message will be delivered to only one subscriber per queue group,
	// using queuing semantics.
	//
	// For JetStream is recommended to set it with DurablePrefix.
	// For non durable queue subscribers, when the last member leaves the group,
	// that group is removed. A durable queue group (DurablePrefix) allows you to have all members leave
	// but still maintain state. When a member re-joins, it starts at the last position in that group.
	QueueGroupPrefix string

	// SubscribersCount determines how many concurrent subscribers should be started.
	SubscribersCount int

	// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
	// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
	CloseTimeout time.Duration

	// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
	AckWaitTimeout time.Duration

	// SubscribeTimeout determines how long subscriber will wait for a successful subscription
	SubscribeTimeout time.Duration

	// NatsOptions are custom []nats.Option passed to the connection.
	// It is also used to provide connection parameters, for example:
	// 		nats.URL("nats://localhost:4222")
	NatsOptions []nats.Option

	// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
	Unmarshaler Unmarshaler

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
	SubjectCalculator SubjectCalculator

	// NakDelay sets duration after which the NACKed message will be resent.
	// By default, it's NACKed without delay.
	NakDelay Delay

	// JetStream holds JetStream specific settings
	JetStream JetStreamConfig
}

// SubscriberSubscriptionConfig is the configurationz
type SubscriberSubscriptionConfig struct {
	// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
	Unmarshaler Unmarshaler

	// SubscribersCount determines wow much concurrent subscribers should be started.
	SubscribersCount int

	// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
	AckWaitTimeout time.Duration

	// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
	// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
	CloseTimeout time.Duration

	// SubscribeTimeout determines how long subscriber will wait for a successful subscription
	SubscribeTimeout time.Duration

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
	SubjectCalculator SubjectCalculator

	// NakDelay sets duration after which the NACKed message will be resent.
	// By default, it's NACKed without delay.
	NakDelay Delay

	// JetStream holds JetStream specific settings
	JetStream JetStreamConfig

	// QueueGroupPrefix is the prefix used by SubjectCalculator to derive queue group from the topic.
	//
	// All subscriptions with the same queue name (regardless of the connection they originate from)
	// will form a queue group. Each message will be delivered to only one subscriber per queue group,
	// using queuing semantics.
	//
	// For JetStream is recommended to set it with DurablePrefix.
	// For non durable queue subscribers, when the last member leaves the group,
	// that group is removed. A durable queue group (DurablePrefix) allows you to have all members leave
	// but still maintain state. When a member re-joins, it starts at the last position in that group.
	QueueGroupPrefix string
}

// GetSubscriberSubscriptionConfig gets the configuration subset needed for individual subscribe calls once a connection has been established
func (c *SubscriberConfig) GetSubscriberSubscriptionConfig() SubscriberSubscriptionConfig {
	return SubscriberSubscriptionConfig{
		Unmarshaler:       c.Unmarshaler,
		SubscribersCount:  c.SubscribersCount,
		AckWaitTimeout:    c.AckWaitTimeout,
		CloseTimeout:      c.CloseTimeout,
		SubscribeTimeout:  c.SubscribeTimeout,
		SubjectCalculator: c.SubjectCalculator,
		NakDelay:          c.NakDelay,
		JetStream:         c.JetStream,
		QueueGroupPrefix:  c.QueueGroupPrefix,
	}
}

func (c *SubscriberSubscriptionConfig) setDefaults() {
	if c.SubjectCalculator == nil {
		c.SubjectCalculator = DefaultSubjectCalculator
	}

	if c.SubscribersCount <= 0 {
		c.SubscribersCount = 1
	}
	if c.CloseTimeout <= 0 {
		c.CloseTimeout = time.Second * 30
	}
	if c.AckWaitTimeout <= 0 {
		c.AckWaitTimeout = time.Second * 30
	}
	if c.SubscribeTimeout <= 0 {
		c.SubscribeTimeout = time.Second * 30
	}

	if c.Unmarshaler == nil {
		c.Unmarshaler = &NATSMarshaler{}
	}
}

// Validate ensures configuration is valid before use
func (c *SubscriberSubscriptionConfig) Validate() error {
	if c.Unmarshaler == nil {
		return errors.New("SubscriberConfig.Unmarshaler is missing")
	}

	//TODO: how best to validate this with dynamic queue group
	/*
		if c.QueueGroup == "" && c.SubscribersCount > 1 {
			return errors.New(
				"to set SubscriberConfig.SubscribersCount " +
					"you need to also set SubscriberConfig.QueueGroupPrefix, " +
					"in other case you will receive duplicated messages",
			)
		}
	*/

	if c.SubjectCalculator == nil {
		return errors.New("SubscriberSubscriptionConfig.SubjectCalculator is required.")
	}

	return nil
}

// Subscriber provides the nats implementation for watermill subscribe operations
type Subscriber struct {
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/subscriber.go

Connecting

By default NATS client will try to connect to localhost:4222. If you are using different hostname or port you should specify using the URL property of SubscriberConfig and PublisherConfig.

// ...
// NewPublisher creates a new Publisher.
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/publisher.go

Example:

// ...
	publisher, err := nats.NewPublisher(
		nats.PublisherConfig{
			URL:         natsURL,
			NatsOptions: options,
			Marshaler:   marshaler,
			JetStream:   jsConfig,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/nats-jetstream/main.go

// ...
// NewSubscriber creates a new Subscriber.
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/subscriber.go

Example:

// ...
	subscriber, err := nats.NewSubscriber(
		nats.SubscriberConfig{
			URL:            natsURL,
			CloseTimeout:   30 * time.Second,
			AckWaitTimeout: 30 * time.Second,
			NatsOptions:    options,
			Unmarshaler:    marshaler,
			JetStream:      jsConfig,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/nats-jetstream/main.go

You can also use NewSubscriberWithNatsConn and NewPublisherWithNatsConn to use a custom *nats.Conn.

Publishing

// ...
// Publish publishes message to NATS.
//
// Publish will not return until an ack has been received from JetStream.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/publisher.go

Subscribing

// ...
// Subscribe subscribes messages from JetStream.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/subscriber.go

Marshaler

NATS provides a header passing mechanism that allows conveying the watermill message details as metadata. This is done by default with only the binary payload sent in the message body. The header _watermill_message_uuid is reserved.

Other builtin marshalers are based on Golang’s gob and json packages.

// ...
type Marshaler interface {
	// Marshal transforms a watermill message into NATS wire format.
	Marshal(topic string, msg *message.Message) (*nats.Msg, error)
}

// Unmarshaler provides transport decoding function
type Unmarshaler interface {
	// Unmarshal produces a watermill message from NATS wire format.
	Unmarshal(*nats.Msg) (*message.Message, error)
}

// MarshalerUnmarshaler provides both Marshaler and Unmarshaler implementations
type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

func defaultNatsMsg(topic string, data []byte, hdr nats.Header) *nats.Msg {
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/marshaler.go

When you have your own format of the messages, you can implement your own Marshaler, which will serialize messages in your format. An example protobuf implementation with tests and benchmarks can be found here

When needed, you can bypass both UUID and Metadata and send just a message.Payload, but some standard middlewares may be not working.

Core-Nats

This package also includes limited support for connecting to core-nats . While core-nats does not support many of the streaming features needed for a perfect fit with watermill and most acks end up implemented as no-ops, in environments with a mix of jetstream and core-nats messaging in play it can be nice to use watermill consistently on the application side.


Check our online hands-on training