Getting started

Watermill up and running.

What is Watermill?

Watermill is a Golang library for working efficiently with message streams. It is intended for building event-driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

It comes with a set of Pub/Sub implementations, which can be easily extended by your own implementations.

Watermill is also shipped with a set of standard tools (middlewares) like instrumentation, poison queue, throttling, correlation and other tools used by every message-driven application.

Install

go get -u github.com/ThreeDotsLabs/watermill/

Subscribing for messages

One of the most important parts of Watermill is the Message. It is as important as http.Request is for http package. Almost every part of Watermill uses this type in some part.

When you are building reactive/event-driven application/[insert your buzzword here] you always want to listen for incoming messages to react on them. Watermill is supporting multiple publishers and subscribers implementations with compatible interfaces and abstractions, which provide a similar behaviour.

Let’s start with subscribing for messages.

Full source: docs/content/docs/getting-started/go-channel/main.go

// ...
package main

import (
    "context"
    "log"

    "github.com/ThreeDotsLabs/watermill/message"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel"
)

func main() {
    pubSub := gochannel.NewGoChannel(
        gochannel.Config{},
        watermill.NewStdLogger(false, false),
    )

    messages, err := pubSub.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Full source: docs/content/docs/getting-started/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // we need to Acknowledge that we received and processed the message,
       // otherwise we will not receive next message
       msg.Ack()
    }
}

Running in Docker  

The easiest way to run Watermill locally with Kafka is using Docker.

Full source: docs/content/docs/getting-started/kafka/docker-compose.yml

version: '3'
services:
  server:
    image: golang:1.11
    restart: on-failure
    depends_on:
      - kafka
    volumes:
      - .:/app
      - $GOPATH/pkg/mod:/go/pkg/mod
    working_dir: /app
    command: go run main.go

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    restart: on-failure
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    restart: on-failure
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

The source should go to main.go.

To run, please execute docker-compose up command.

A more detailed explanation of how it is running (and how to add live code reload) can be found in Go Docker dev environment article.

Full source: docs/content/docs/getting-started/kafka/main.go

// ...
package main

import (
    "context"
    "log"

    "github.com/Shopify/sarama"

    "github.com/ThreeDotsLabs/watermill"

    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
)

func main() {
    saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
    // equivalent of auto.offset.reset: earliest
   saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

    subscriber, err := kafka.NewSubscriber(
        kafka.SubscriberConfig{
            Brokers:       []string{"kafka:9092"},
            ConsumerGroup: "test_consumer_group",
        },
        saramaSubscriberConfig,
        kafka.DefaultMarshaler{},
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Full source: docs/content/docs/getting-started/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // we need to Acknowledge that we received and processed the message,
       // otherwise we will not receive next message
       msg.Ack()
    }
}

Running in Docker  

The easiest way to run Watermill locally with NATS is using Docker.

Full source: docs/content/docs/getting-started/nats-streaming/docker-compose.yml

version: '3'
services:
  server:
    image: golang:1.11
    restart: on-failure
    depends_on:
      - nats-streaming
    volumes:
      - .:/app
      - $GOPATH/pkg/mod:/go/pkg/mod
    working_dir: /app
    command: go run main.go

  nats-streaming:
    image: nats-streaming:0.11.2
    restart: on-failure

The source should go to main.go.

To run please execute docker-compose up command.

A more detailed explanation of how it is running (and how to add live code reload) can be found in Go Docker dev environment article.

Full source: docs/content/docs/getting-started/nats-streaming/main.go

// ...
package main

import (
    "context"
    "log"
    "time"

    "github.com/nats-io/go-nats-streaming"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message/infrastructure/nats"

    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    subscriber, err := nats.NewStreamingSubscriber(
        nats.StreamingSubscriberConfig{
            ClusterID:        "test-cluster",
            ClientID:         "example-subscriber",
            QueueGroup:       "example",
            DurableName:      "my-durable",
            SubscribersCount: 4, // how much goroutines should consume messages
           CloseTimeout:     time.Minute,
            AckWaitTimeout:   time.Second * 30,
            StanOptions: []stan.Option{
                stan.NatsURL("nats://nats-streaming:4222"),
            },
            Unmarshaler: nats.GobMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Full source: docs/content/docs/getting-started/nats-streaming/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // we need to Acknowledge that we received and processed the message,
       // otherwise we will not receive next message
       msg.Ack()
    }
}

Running in Docker  

You can run Google Cloud Pub/Sub emulator locally for development.

Full source: docs/content/docs/getting-started/googlecloud/docker-compose.yml

version: '3'
services:
  server:
    image: golang:1.11
    restart: on-failure
    depends_on:
      - googlecloud
    volumes:
      - .:/app
      - $GOPATH/pkg/mod:/go/pkg/mod
    environment:
      # use local emulator instead of google cloud engine
      PUBSUB_EMULATOR_HOST: "googlecloud:8085"
    working_dir: /app
    command: go run main.go

  googlecloud:
    image: google/cloud-sdk:228.0.0
    entrypoint: gcloud --quiet beta emulators pubsub start --host-port=googlecloud:8085 --verbosity=debug --log-http
    restart: on-failure

The source should go to main.go.

To run, please execute docker-compose up.

A more detailed explanation of how it is running (and how to add live code reload) can be found in Go Docker dev environment article.

Full source: docs/content/docs/getting-started/googlecloud/main.go

// ...
package main

import (
    "context"
    "log"

    "github.com/ThreeDotsLabs/watermill/message/infrastructure/googlecloud"

    "github.com/ThreeDotsLabs/watermill"

    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    subscriber, err := googlecloud.NewSubscriber(
        context.Background(),
        googlecloud.SubscriberConfig{
            // custom function to generate Subscription Name,
           // there are also predefined TopicSubscriptionName and TopicSubscriptionNameWithSuffix available.
           GenerateSubscriptionName: func(topic string) string {
                return "test-sub_" + topic
            },
            ProjectID: "test-project",
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    // Subscribe will create the subscription. Only messages that are sent after the subscription is created may be received.
   messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Full source: docs/content/docs/getting-started/googlecloud/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // we need to Acknowledge that we received and processed the message,
       // otherwise, it will be resent over and over again.
       msg.Ack()
    }
}

Running in Docker  

Full source: docs/content/docs/getting-started/amqp/docker-compose.yml

version: '3'
services:
  server:
    image: golang:1.11
    restart: on-failure
    depends_on:
      - rabbitmq
    volumes:
      - .:/app
      - $GOPATH/pkg/mod:/go/pkg/mod
    working_dir: /app
    command: go run main.go

  rabbitmq:
    image: rabbitmq:3.7
    restart: on-failure

The source should go to main.go.

To run, please execute docker-compose up.

A more detailed explanation of how it is running (and how to add live code reload) can be found in Go Docker dev environment article.

Full source: docs/content/docs/getting-started/amqp/main.go

// ...
package main

import (
    "context"
    "log"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/infrastructure/amqp"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
    amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

    subscriber, err := amqp.NewSubscriber(
        // This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
       // It works as a simple queue.
       //
       // If you want to implement a Pub/Sub style service instead, check
       // https://watermill.io/docs/pub-sub-implementations/#amqp-consumer-groups
       amqpConfig,
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Full source: docs/content/docs/getting-started/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // we need to Acknowledge that we received and processed the message,
       // otherwise we will not receive the next message
       msg.Ack()
    }
}

Publishing messages

Full source: docs/content/docs/getting-started/go-channel/main.go

// ...
   go process(messages)

    publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
    }
}
// ...

Full source: docs/content/docs/getting-started/kafka/main.go

// ...
   go process(messages)

    publisher, err := kafka.NewPublisher(
        []string{"kafka:9092"},
        kafka.DefaultMarshaler{},
        nil, // no custom sarama config
       watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
    }
}
// ...

Full source: docs/content/docs/getting-started/nats-streaming/main.go

// ...
   go process(messages)

    publisher, err := nats.NewStreamingPublisher(
        nats.StreamingPublisherConfig{
            ClusterID: "test-cluster",
            ClientID:  "example-publisher",
            StanOptions: []stan.Option{
                stan.NatsURL("nats://nats-streaming:4222"),
            },
            Marshaler: nats.GobMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
    }
}
// ...

Full source: docs/content/docs/getting-started/googlecloud/main.go

// ...
   go process(messages)

    publisher, err := googlecloud.NewPublisher(context.Background(), googlecloud.PublisherConfig{
        ProjectID: "test-project",
    })
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
    }
}
// ...

Full source: docs/content/docs/getting-started/amqp/main.go

// ...
   go process(messages)

    publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
    }
}
// ...

Message format

We don’t enforce any message format. You can use strings, JSON, protobuf, Avro, gob or anything else that serializes to []byte.

Using Messages Router

Publishers and subscribers are rather low-level parts of Watermill. In production use, you’d usually want to use a high-level interface and features like correlation, metrics, poison queue, retrying, throttling, etc..

You also might not want to send an Ack when processing was successful. Sometimes, you’d like to send a message after processing of another message finishes.

To handle these requirements, there is a component named Router.

The flow of our application looks like this:

  1. We are producing a message to the topic example.topic_1 every second.
  2. struct_handler handler is listening to example.topic_1. When a message is received, the UUID is printed and a new message is produced to example.topic_2.
  3. print_events_topic_1 handler is listening to example.topic_1 and printing message UUID, payload and metadata. Correlation ID should be the same as in message in example.topic_1.
  4. print_events_topic_2 handler is listening to example.topic_2 and printing message UUID, payload and metadata. Correlation ID should be the same as in message in example.topic_2.

Router configuration

For the beginning, we should start with the configuration of the router. We will configure which plugins and middlewares we want to use.

We also will set up handlers which this router will support. Every handler will independently handle the messages.

Full source: docs/content/docs/getting-started/router/main.go

// ...
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/infrastructure/gochannel"
    "github.com/ThreeDotsLabs/watermill/message/router/middleware"
    "github.com/ThreeDotsLabs/watermill/message/router/plugin"
)

var (
    // just a simplest implementation,
   // probably you want to ship your own implementation of `watermill.LoggerAdapter`
   logger = watermill.NewStdLogger(false, false)
)

func main() {
    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }

    // this plugin will gracefully shutdown router, when SIGTERM was sent
   // you can also close router by just calling `r.Close()`
   router.AddPlugin(plugin.SignalsHandler)

    router.AddMiddleware(
        // correlation ID will copy correlation id from consumed message metadata to produced messages
       middleware.CorrelationID,

        // when error occurred, function will be retried,
       // after max retries (or if no Retry middleware is added) Nack is send and message will be resent
       middleware.Retry{
            MaxRetries: 3,
            WaitTime:   time.Millisecond * 100,
            Backoff:    3,
            Logger:     logger,
        }.Middleware,

        // this middleware will handle panics from handlers
       // and pass them as error to retry middleware in this case
       middleware.Recoverer,
    )

    // for simplicity we are using gochannel Pub/Sub here,
   // you can replace it with any Pub/Sub implementation, it will work the same
   pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

    // producing some messages in background
   go publishMessages(pubSub)

    router.AddHandler(
        "struct_handler",  // handler name, must be unique
       "example.topic_1", // topic from which we will read events
       pubSub,
        "example.topic_2", // topic to which we will publish event
       pubSub,
        structHandler{}.Handler,
    )

    // just for debug, we are printing all events sent to `example.topic_1`
   router.AddNoPublisherHandler(
        "print_events_topic_1",
        "example.topic_1",
        pubSub,
        printMessages,
    )

    // just for debug, we are printing all events sent to `example.topic_2`
   router.AddNoPublisherHandler(
        "print_events_topic_2",
        "example.topic_2",
        pubSub,
        printMessages,
    )

    // when everything is ready, let's run router,
   // this function is blocking since router is running
   if err := router.Run(); err != nil {
        panic(err)
    }
}

// ...

Producing messages

Producing messages works just like before. We have only added middleware.SetCorrelationID to set the correlation ID. Correlation ID will be added to all messages produced by the router (middleware.CorrelationID).

Full source: docs/content/docs/getting-started/router/main.go

// ...
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("example.topic_1", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// ...

Handlers

You may notice that we have two types of handler functions:

  1. function func(msg *message.Message) ([]*message.Message, error)
  2. method func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

The second option is useful when your function requires some dependencies like database, logger, etc. If you use a simple function without dependencies, it’s fine to use the second option.

Full source: docs/content/docs/getting-started/router/main.go

// ...
func printMessages(msg *message.Message) ([]*message.Message, error) {
    fmt.Printf(
        "\n> Received message: %s\n> %s\n> metadata: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil, nil
}

type structHandler struct {
    // we can add some dependencies here
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler received message", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by structHandler"))
    return message.Messages{msg}, nil
}

Done!

You can run this example by go run main.go.

You’ve just created your first application with Watermill. You can find the full source in /docs/content/docs/getting-started/router/main.go.

Deployment

Watermill is not a framework. We don’t enforce any type of deployment and it’s totally up to you.

What’s next?

For more detailed documentation you should check documentation topics list.

Examples

We’ve also created some examples, that will show you how to start using Watermill. The recommended entry point is Your first Watermill application. It contains the entire environment in docker-compose.yml, including Golang and Kafka which you can run with one command.

After that, you can check the Simple app example. It uses more middlewares and contains two handlers. There is also a separate application for publishing messages.

The third example showcases the use of a different Subscriber implementation, namely HTTP. It is a very simple application, which can save GitLab webhooks to Kafka.

You may also find some useful informations in our README .

Support

If anything is not clear, feel free to use any of our support channels, we will be glad to help.