NATS Streaming

A simple, secure and high performance open source messaging system.

NATS Streaming

NATS Streaming is a data streaming system powered by NATS, and written in the Go programming language. The executable name for the NATS Streaming server is nats-streaming-server. NATS Streaming embeds, extends, and interoperates seamlessly with the core NATS platform.

Characteristics

FeatureImplementsNote
ConsumerGroupsyesyou need to set DurableName and QueueGroup name
ExactlyOnceDeliveryno
GuaranteedOrdernowith the redelivery feature, order can’t be guaranteed
PersistentyesDurableName is required

Configuration

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/publisher.go

// ...
type StreamingPublisherConfig struct {
    // ClusterID is the NATS Streaming cluster ID.
   ClusterID string

    // ClientID is the NATS Streaming client ID to connect with.
   // ClientID can contain only alphanumeric and `-` or `_` characters.
   ClientID string

    // StanOptions are custom options for a connection.
   StanOptions []stan.Option

    // Marshaler is marshaler used to marshal messages to stan format.
   Marshaler Marshaler
}

type StreamingPublisherPublishConfig struct {
    // Marshaler is marshaler used to marshal messages to stan format.
   Marshaler Marshaler
}

func (c StreamingPublisherConfig) Validate() error {
    if c.Marshaler == nil {
        return errors.New("StreamingPublisherConfig.Marshaler is missing")
    }

    return nil
}

func (c StreamingPublisherConfig) GetStreamingPublisherPublishConfig() StreamingPublisherPublishConfig {
    return StreamingPublisherPublishConfig{
        Marshaler: c.Marshaler,
    }
}

type StreamingPublisher struct {
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/subscriber.go

// ...
type StreamingSubscriberConfig struct {
    // ClusterID is the NATS Streaming cluster ID.
   ClusterID string

    // ClientID is the NATS Streaming client ID to connect with.
   // ClientID can contain only alphanumeric and `-` or `_` characters.
   //
   // Using DurableName causes the NATS Streaming server to track
   // the last acknowledged message for that ClientID + DurableName.
   ClientID string

    // QueueGroup is the NATS Streaming queue group.
   //
   // All subscriptions with the same queue name (regardless of the connection they originate from)
   // will form a queue group. Each message will be delivered to only one subscriber per queue group,
   // using queuing semantics.
   //
   // It is recommended to set it with DurableName.
   // For non durable queue subscribers, when the last member leaves the group,
   // that group is removed. A durable queue group (DurableName) allows you to have all members leave
   // but still maintain state. When a member re-joins, it starts at the last position in that group.
   //
   // When QueueGroup is empty, subscribe without QueueGroup will be used.
   QueueGroup string

    // DurableName is the NATS streaming durable name.
   //
   // Subscriptions may also specify a “durable name” which will survive client restarts.
   // Durable subscriptions cause the server to track the last acknowledged message
   // sequence number for a client and durable name. When the client restarts/resubscribes,
   // and uses the same client ID and durable name, the server will resume delivery beginning
   // with the earliest unacknowledged message for this durable subscription.
   //
   // Doing this causes the NATS Streaming server to track
   // the last acknowledged message for that ClientID + DurableName.
   DurableName string

    // SubscribersCount determines wow much concurrent subscribers should be started.
   SubscribersCount int

    // CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
   // When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
   CloseTimeout time.Duration

    // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
   // It is mapped to stan.AckWait option.
   AckWaitTimeout time.Duration

    // StanOptions are custom []stan.Option passed to the connection.
   // It is also used to provide connection parameters, for example:
   //       stan.NatsURL("nats://localhost:4222")
   StanOptions []stan.Option

    // StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription.
   StanSubscriptionOptions []stan.SubscriptionOption

    // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
   Unmarshaler Unmarshaler
}

type StreamingSubscriberSubscriptionConfig struct {
    // StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription.
   StanSubscriptionOptions []stan.SubscriptionOption

    // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
   Unmarshaler Unmarshaler
    // QueueGroup is the NATS Streaming queue group.
   //
   // All subscriptions with the same queue name (regardless of the connection they originate from)
   // will form a queue group. Each message will be delivered to only one subscriber per queue group,
   // using queuing semantics.
   //
   // It is recommended to set it with DurableName.
   // For non durable queue subscribers, when the last member leaves the group,
   // that group is removed. A durable queue group (DurableName) allows you to have all members leave
   // but still maintain state. When a member re-joins, it starts at the last position in that group.
   //
   // When QueueGroup is empty, subscribe without QueueGroup will be used.
   QueueGroup string

    // DurableName is the NATS streaming durable name.
   //
   // Subscriptions may also specify a “durable name” which will survive client restarts.
   // Durable subscriptions cause the server to track the last acknowledged message
   // sequence number for a client and durable name. When the client restarts/resubscribes,
   // and uses the same client ID and durable name, the server will resume delivery beginning
   // with the earliest unacknowledged message for this durable subscription.
   //
   // Doing this causes the NATS Streaming server to track
   // the last acknowledged message for that ClientID + DurableName.
   DurableName string

    // SubscribersCount determines wow much concurrent subscribers should be started.
   SubscribersCount int

    // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
   // It is mapped to stan.AckWait option.
   AckWaitTimeout time.Duration
    // CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
   // When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
   CloseTimeout time.Duration
}

func (c *StreamingSubscriberConfig) GetStreamingSubscriberSubscriptionConfig() StreamingSubscriberSubscriptionConfig {
    return StreamingSubscriberSubscriptionConfig{
        StanSubscriptionOptions: c.StanSubscriptionOptions,
        Unmarshaler:             c.Unmarshaler,
        QueueGroup:              c.QueueGroup,
        DurableName:             c.DurableName,
        SubscribersCount:        c.SubscribersCount,
        AckWaitTimeout:          c.AckWaitTimeout,
        CloseTimeout:            c.CloseTimeout,
    }
}

func (c *StreamingSubscriberSubscriptionConfig) setDefaults() {
    if c.SubscribersCount <= 0 {
        c.SubscribersCount = 1
    }
    if c.CloseTimeout <= 0 {
        c.CloseTimeout = time.Second * 30
    }
    if c.AckWaitTimeout <= 0 {
        c.AckWaitTimeout = time.Second * 30
    }

    c.StanSubscriptionOptions = append(
        c.StanSubscriptionOptions,
        stan.SetManualAckMode(), // manual AckMode is required to support acking/nacking by client
       stan.AckWait(c.AckWaitTimeout),
    )

    if c.DurableName != "" {
        c.StanSubscriptionOptions = append(c.StanSubscriptionOptions, stan.DurableName(c.DurableName))
    }
}

func (c *StreamingSubscriberSubscriptionConfig) Validate() error {
    if c.Unmarshaler == nil {
        return errors.New("StreamingSubscriberConfig.Unmarshaler is missing")
    }

    if c.QueueGroup == "" && c.SubscribersCount > 1 {
        return errors.New(
            "to set StreamingSubscriberConfig.SubscribersCount " +
                "you need to also set StreamingSubscriberConfig.QueueGroup, " +
                "in other case you will receive duplicated messages",
        )
    }

    return nil
}

type StreamingSubscriber struct {
// ...

Connecting

By default NATS client will try to connect to localhost:4222. If you are using different hostname or port you should pass custom stan.Option: stan.NatsURL("nats://your-nats-hostname:4222") to StreamingSubscriberConfig and StreamingPublisherConfig.

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/publisher.go

// ...
// NewStreamingPublisher creates a new StreamingPublisher.
//
// When using custom NATS hostname, you should pass it by options StreamingPublisherConfig.StanOptions:
//        // ...
//        StanOptions: []stan.Option{
//            stan.NatsURL("nats://your-nats-hostname:4222"),
//        }
//        // ...
func NewStreamingPublisher(config StreamingPublisherConfig, logger watermill.LoggerAdapter) (*StreamingPublisher, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/nats-streaming/main.go

// ...
   publisher, err := nats.NewStreamingPublisher(
        nats.StreamingPublisherConfig{
            ClusterID: "test-cluster",
            ClientID:  "example-publisher",
            StanOptions: []stan.Option{
                stan.NatsURL("nats://nats-streaming:4222"),
            },
            Marshaler: nats.GobMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/subscriber.go

// ...
// NewStreamingSubscriber creates a new StreamingSubscriber.
//
// When using custom NATS hostname, you should pass it by options StreamingSubscriberConfig.StanOptions:
//        // ...
//        StanOptions: []stan.Option{
//            stan.NatsURL("nats://your-nats-hostname:4222"),
//        }
//        // ...
func NewStreamingSubscriber(config StreamingSubscriberConfig, logger watermill.LoggerAdapter) (*StreamingSubscriber, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/nats-streaming/main.go

// ...
   subscriber, err := nats.NewStreamingSubscriber(
        nats.StreamingSubscriberConfig{
            ClusterID:        "test-cluster",
            ClientID:         "example-subscriber",
            QueueGroup:       "example",
            DurableName:      "my-durable",
            SubscribersCount: 4, // how many goroutines should consume messages
           CloseTimeout:     time.Minute,
            AckWaitTimeout:   time.Second * 30,
            StanOptions: []stan.Option{
                stan.NatsURL("nats://nats-streaming:4222"),
            },
            Unmarshaler: nats.GobMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
// ...

You can also use NewStreamingSubscriberWithStanConn and NewStreamingPublisherWithStanConn to use a custom stan.Conn created by NewStanConnection.

Publishing

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/publisher.go

// ...
// Publish publishes message to NATS.
//
// Publish will not return until an ack has been received from NATS Streaming.
// When one of messages delivery fails - function is interrupted.
func (p StreamingPublisher) Publish(topic string, messages ...*message.Message) error {
// ...

Subscribing

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/subscriber.go

// ...
// Subscribe subscribes messages from NATS Streaming.
//
// Subscribe will spawn SubscribersCount goroutines making subscribe.
func (s *StreamingSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Marshaler

NATS doesn’t implement any mechanism like metadata or headers of the message. For that reason we need to marshal entire message to the []byte.

The default implementation is based on Golang’s gob.

Full source: github.com/ThreeDotsLabs/watermill-nats/pkg/nats/marshaler.go

// ...
type Marshaler interface {
    Marshal(topic string, msg *message.Message) ([]byte, error)
}

type Unmarshaler interface {
    Unmarshal(*stan.Msg) (*message.Message, error)
}

type MarshalerUnmarshaler interface {
    Marshaler
    Unmarshaler
}

// GobMarshaler is marshaller which is using Gob to marshal Watermill messages.
type GobMarshaler struct{}
// ...

When you have your own format of the messages, you can implement your own Marshaler, which will serialize messages in your format.

When needed, you can bypass both UUID and Metadata and send just a message.Payload, but some standard middlewares may be not working.