Getting started On this page What is Watermill?# Watermill is a Go library for working with message streams.
You can use it to build event-driven systems with popular Pub/Sub implementations like Kafka or RabbitMQ, as well as HTTP or Postgres if that fits your use case.
It comes with a set of Pub/Sub implementations and can be easily extended.
Watermill also ships with standard middlewares like instrumentation, poison queue, throttling, correlation,
and other tools used by every message-driven application.
Why use Watermill?# When using microservices, synchronous communication is not always the right choice.
Asynchronous methods became a new standard way to communicate.
While there are many tools and libraries for synchronous communication, like HTTP, correctly setting up
a message-oriented project can be challenging. There are many different message queues and streaming systems,
each with different features, client libraries, and APIs.
Watermill aims to be the standard messaging library for Go, hiding all that complexity behind an API that is easy to understand.
It provides all you need to build an application based on events or other asynchronous patterns.
Watermill is NOT a framework .
It’s a lightweight library that’s easy to plug in or remove from your project.
Install# go get -u github.com/ThreeDotsLabs/watermill
One-Minute Background# The idea behind event-driven applications is always the same: listen to and react to incoming messages.
Watermill supports this behavior for multiple publishers and subscribers
.
The core part of Watermill is the Message
.
It is what http.Request
is for the net/http
package.
Most Watermill features work with this struct.
Watermill provides a few APIs for working with messages.
They build on top of each other, each step providing a higher-level API:
At the bottom, the Publisher
and Subscriber
interfaces. It’s the “raw” way of working with messages. You get full control, but also need to handle everything yourself. The Router
is similar to HTTP routers you probably know. It introduces message handlers. The CQRS
component adds generic handlers without needing to marshal and unmarshal messages yourself. Publisher & Subscriber# Most Pub/Sub libraries come with complex features. For Watermill, it’s enough to implement two interfaces to start
working with them: the Publisher
and Subscriber
.
type Publisher interface {
Publish ( topic string , messages ...* Message ) error
Close () error
}
type Subscriber interface {
Subscribe ( ctx context . Context , topic string ) ( <- chan * Message , error )
Close () error
}
Subscribing for Messages# Subscribe
expects a topic name and returns a channel of incoming messages.
What topic exactly means depends on the Pub/Sub implementation.
Usually, it needs to match the topic name used by the publisher.
messages , err := subscriber . Subscribe ( ctx , "example.topic" )
if err != nil {
panic ( err )
}
for msg := range messages {
fmt . Printf ( "received message: %s, payload: %s\n" , msg . UUID , string ( msg . Payload ))
msg . Ack ()
}
See detailed examples below for supported PubSubs.
Go Channel
Kafka
NATS Streaming
Google Cloud Pub/Sub
RabbitMQ (AMQP)
SQL
AWS SQS
AWS SNS
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main () {
pubSub := gochannel . NewGoChannel (
gochannel . Config {},
watermill . NewStdLogger ( false , false ),
)
messages , err := pubSub . Subscribe ( context . Background (), "example.topic" )
if err != nil {
panic ( err )
}
go process ( messages )
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process ( messages <- chan * message . Message ) {
for msg := range messages {
fmt . Printf ( "received message: %s, payload: %s\n" , msg . UUID , string ( msg . Payload ))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg . Ack ()
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
Running in Docker The easiest way to run Watermill locally with Kafka is by using Docker.
services :
server :
image : golang:1.23
restart : unless-stopped
depends_on :
- kafka
volumes :
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir : /app
command : go run main.go
zookeeper :
image : confluentinc/cp-zookeeper:7.3.1
restart : unless-stopped
logging :
driver : none
environment :
ZOOKEEPER_CLIENT_PORT : 2181
kafka :
image : confluentinc/cp-kafka:7.3.1
restart : unless-stopped
depends_on :
- zookeeper
logging :
driver : none
environment :
KAFKA_ZOOKEEPER_CONNECT : zookeeper:2181
KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE : "true"
Full source: _examples/pubsubs/kafka/docker-compose.yml
The source should go to main.go
.
To run, execute the docker-compose up
command.
A more detailed explanation of how it works (and how to add live code reload) can be found in the Go Docker dev environment article
.
// ...
package main
import (
"context"
"log"
"time"
"github.com/IBM/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main () {
saramaSubscriberConfig := kafka . DefaultSaramaSubscriberConfig ()
// equivalent of auto.offset.reset: earliest
saramaSubscriberConfig . Consumer . Offsets . Initial = sarama . OffsetOldest
subscriber , err := kafka . NewSubscriber (
kafka . SubscriberConfig {
Brokers : [] string { "kafka:9092" },
Unmarshaler : kafka . DefaultMarshaler {},
OverwriteSaramaConfig : saramaSubscriberConfig ,
ConsumerGroup : "test_consumer_group" ,
},
watermill . NewStdLogger ( false , false ),
)
if err != nil {
panic ( err )
}
messages , err := subscriber . Subscribe ( context . Background (), "example.topic" )
if err != nil {
panic ( err )
}
go process ( messages )
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process ( messages <- chan * message . Message ) {
for msg := range messages {
log . Printf ( "received message: %s, payload: %s" , msg . UUID , string ( msg . Payload ))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg . Ack ()
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
Running in Docker The easiest way to run Watermill locally with NATS is using Docker.
services :
server :
image : golang:1.23
restart : unless-stopped
depends_on :
- nats-streaming
volumes :
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir : /app
command : go run main.go
nats-streaming :
image : nats-streaming:0.11.2
restart : unless-stopped
Full source: _examples/pubsubs/nats-streaming/docker-compose.yml
The source should go to main.go
.
To run, execute the docker-compose up
command.
A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article
.
// ...
package main
import (
"context"
"log"
"time"
stan "github.com/nats-io/stan.go"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-nats/pkg/nats"
"github.com/ThreeDotsLabs/watermill/message"
)
func main () {
subscriber , err := nats . NewStreamingSubscriber (
nats . StreamingSubscriberConfig {
ClusterID : "test-cluster" ,
ClientID : "example-subscriber" ,
QueueGroup : "example" ,
DurableName : "my-durable" ,
SubscribersCount : 4 , // how many goroutines should consume messages
CloseTimeout : time . Minute ,
AckWaitTimeout : time . Second * 30 ,
StanOptions : [] stan . Option {
stan . NatsURL ( "nats://nats-streaming:4222" ),
},
Unmarshaler : nats . GobMarshaler {},
},
watermill . NewStdLogger ( false , false ),
)
if err != nil {
panic ( err )
}
messages , err := subscriber . Subscribe ( context . Background (), "example.topic" )
if err != nil {
panic ( err )
}
go process ( messages )
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/nats-streaming/main.go
// ...
func process ( messages <- chan * message . Message ) {
for msg := range messages {
log . Printf ( "received message: %s, payload: %s" , msg . UUID , string ( msg . Payload ))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg . Ack ()
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/nats-streaming/main.go
Running in Docker You can run the Google Cloud Pub/Sub emulator locally for development.
services :
server :
image : golang:1.23
restart : unless-stopped
depends_on :
- googlecloud
volumes :
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
environment :
# use local emulator instead of google cloud engine
PUBSUB_EMULATOR_HOST : "googlecloud:8085"
working_dir : /app
command : go run main.go
googlecloud :
image : google/cloud-sdk:414.0.0
entrypoint : gcloud --quiet beta emulators pubsub start --host-port=0.0.0.0:8085 --verbosity=debug --log-http
restart : unless-stopped
Full source: _examples/pubsubs/googlecloud/docker-compose.yml
The source should go to main.go
.
To run, execute docker-compose up
.
A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article
.
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
"github.com/ThreeDotsLabs/watermill/message"
)
func main () {
logger := watermill . NewStdLogger ( false , false )
subscriber , err := googlecloud . NewSubscriber (
googlecloud . SubscriberConfig {
// custom function to generate Subscription Name,
// there are also predefined TopicSubscriptionName and TopicSubscriptionNameWithSuffix available.
GenerateSubscriptionName : func ( topic string ) string {
return "test-sub_" + topic
},
ProjectID : "test-project" ,
},
logger ,
)
if err != nil {
panic ( err )
}
// Subscribe will create the subscription. Only messages that are sent after the subscription is created may be received.
messages , err := subscriber . Subscribe ( context . Background (), "example.topic" )
if err != nil {
panic ( err )
}
go process ( messages )
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go
// ...
func process ( messages <- chan * message . Message ) {
for msg := range messages {
log . Printf ( "received message: %s, payload: %s" , msg . UUID , string ( msg . Payload ))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg . Ack ()
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go
Running in Docker services :
server :
image : golang:1.23
restart : unless-stopped
depends_on :
- rabbitmq
volumes :
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir : /app
command : go run main.go
rabbitmq :
image : rabbitmq:3.7
restart : unless-stopped
Full source: _examples/pubsubs/amqp/docker-compose.yml
The source should go to main.go
.
To run, execute docker-compose up
.
A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article
.
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main () {
amqpConfig := amqp . NewDurableQueueConfig ( amqpURI )
subscriber , err := amqp . NewSubscriber (
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// It works as a simple queue.
//
// If you want to implement a Pub/Sub style service instead, check
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig ,
watermill . NewStdLogger ( false , false ),
)
if err != nil {
panic ( err )
}
messages , err := subscriber . Subscribe ( context . Background (), "example.topic" )
if err != nil {
panic ( err )
}
go process ( messages )
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process ( messages <- chan * message . Message ) {
for msg := range messages {
log . Printf ( "received message: %s, payload: %s" , msg . UUID , string ( msg . Payload ))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg . Ack ()
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
Running in Docker services :
server :
image : golang:1.23
restart : unless-stopped
depends_on :
- mysql
volumes :
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir : /app
command : go run main.go
mysql :
image : mysql:8.0
restart : unless-stopped
ports :
- 3306 : 3306
environment :
MYSQL_DATABASE : watermill
MYSQL_ALLOW_EMPTY_PASSWORD : "yes"
Full source: _examples/pubsubs/sql/docker-compose.yml
The source should go to main.go
.
To run, execute docker-compose up
.
A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article
.
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main () {
db := createDB ()
logger := watermill . NewStdLogger ( false , false )
subscriber , err := sql . NewSubscriber (
db ,
sql . SubscriberConfig {
SchemaAdapter : sql . DefaultMySQLSchema {},
OffsetsAdapter : sql . DefaultMySQLOffsetsAdapter {},
InitializeSchema : true ,
},
logger ,
)
if err != nil {
panic ( err )
}
messages , err := subscriber . Subscribe ( context . Background (), "example_topic" )
if err != nil {
panic ( err )
}
go process ( messages )
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process ( messages <- chan * message . Message ) {
for msg := range messages {
log . Printf ( "received message: %s, payload: %s" , msg . UUID , string ( msg . Payload ))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg . Ack ()
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
Running in Docker services :
server :
image : golang:1.23
restart : unless-stopped
volumes :
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir : /app
command : go run main.go
localstack :
image : localstack/localstack:latest
environment :
- SERVICES=sqs,sns
- AWS_DEFAULT_REGION=us-east-1
- EDGE_PORT=4566
ports :
- "4566-4597:4566-4597"
healthcheck :
test : awslocal sqs list-queues && awslocal sns list-topics
interval : 5s
timeout : 5s
retries : 5
start_period : 30s
Full source: _examples/pubsubs/aws-sqs/docker-compose.yml
The source should go to main.go
.
To run, execute docker-compose up
.
A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article
.
// ...
package main
import (
"context"
"log"
"net/url"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
amazonsqs "github.com/aws/aws-sdk-go-v2/service/sqs"
transport "github.com/aws/smithy-go/endpoints"
"github.com/samber/lo"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-aws/sqs"
"github.com/ThreeDotsLabs/watermill/message"
)
func main () {
logger := watermill . NewStdLogger ( false , false )
sqsOpts := [] func ( * amazonsqs . Options ){
amazonsqs . WithEndpointResolverV2 ( sqs . OverrideEndpointResolver {
Endpoint : transport . Endpoint {
URI : * lo . Must ( url . Parse ( "http://localstack:4566" )),
},
}),
}
subscriberConfig := sqs . SubscriberConfig {
AWSConfig : aws . Config {
Credentials : aws . AnonymousCredentials {},
},
OptFns : sqsOpts ,
}
subscriber , err := sqs . NewSubscriber ( subscriberConfig , logger )
if err != nil {
panic ( err )
}
messages , err := subscriber . Subscribe ( context . Background (), "example-topic" )
if err != nil {
panic ( err )
}
go process ( messages )
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/aws-sqs/main.go
// ...
func process ( messages <- chan * message . Message ) {
for msg := range messages {
log . Printf ( "received message: %s, payload: %s" , msg . UUID , string ( msg . Payload ))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg . Ack ()
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/aws-sqs/main.go
Running in Docker services :
server :
image : golang:1.23
restart : unless-stopped
volumes :
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir : /app
command : go run main.go
localstack :
image : localstack/localstack:latest
environment :
- SERVICES=sqs,sns
- AWS_DEFAULT_REGION=us-east-1
- EDGE_PORT=4566
ports :
- "4566-4597:4566-4597"
healthcheck :
test : awslocal sqs list-queues && awslocal sns list-topics
interval : 5s
timeout : 5s
retries : 5
start_period : 30s
Full source: _examples/pubsubs/aws-sns/docker-compose.yml
The source should go to main.go
.
To run, execute docker-compose up
.
A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article
.
// ...
package main
import (
"context"
"fmt"
"log"
"net/url"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
amazonsns "github.com/aws/aws-sdk-go-v2/service/sns"
amazonsqs "github.com/aws/aws-sdk-go-v2/service/sqs"
transport "github.com/aws/smithy-go/endpoints"
"github.com/samber/lo"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-aws/sns"
"github.com/ThreeDotsLabs/watermill-aws/sqs"
"github.com/ThreeDotsLabs/watermill/message"
)
func main () {
logger := watermill . NewStdLogger ( false , false )
snsOpts := [] func ( * amazonsns . Options ){
amazonsns . WithEndpointResolverV2 ( sns . OverrideEndpointResolver {
Endpoint : transport . Endpoint {
URI : * lo . Must ( url . Parse ( "http://localstack:4566" )),
},
}),
}
sqsOpts := [] func ( * amazonsqs . Options ){
amazonsqs . WithEndpointResolverV2 ( sqs . OverrideEndpointResolver {
Endpoint : transport . Endpoint {
URI : * lo . Must ( url . Parse ( "http://localstack:4566" )),
},
}),
}
topicResolver , err := sns . NewGenerateArnTopicResolver ( "000000000000" , "us-east-1" )
if err != nil {
panic ( err )
}
newSubscriber := func ( name string ) ( message . Subscriber , error ) {
subscriberConfig := sns . SubscriberConfig {
AWSConfig : aws . Config {
Credentials : aws . AnonymousCredentials {},
},
OptFns : snsOpts ,
TopicResolver : topicResolver ,
GenerateSqsQueueName : func ( ctx context . Context , snsTopic sns . TopicArn ) ( string , error ) {
topic , err := sns . ExtractTopicNameFromTopicArn ( snsTopic )
if err != nil {
return "" , err
}
return fmt . Sprintf ( "%v-%v" , topic , name ), nil
},
}
sqsSubscriberConfig := sqs . SubscriberConfig {
AWSConfig : aws . Config {
Credentials : aws . AnonymousCredentials {},
},
OptFns : sqsOpts ,
}
return sns . NewSubscriber ( subscriberConfig , sqsSubscriberConfig , logger )
}
subA , err := newSubscriber ( "subA" )
if err != nil {
panic ( err )
}
subB , err := newSubscriber ( "subB" )
if err != nil {
panic ( err )
}
messagesA , err := subA . Subscribe ( context . Background (), "example-topic" )
if err != nil {
panic ( err )
}
messagesB , err := subB . Subscribe ( context . Background (), "example-topic" )
if err != nil {
panic ( err )
}
go process ( "A" , messagesA )
go process ( "B" , messagesB )
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/aws-sns/main.go
// ...
func process ( prefix string , messages <- chan * message . Message ) {
for msg := range messages {
log . Printf ( "%v received message: %s, payload: %s" , prefix , msg . UUID , string ( msg . Payload ))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg . Ack ()
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/aws-sns/main.go
Creating Messages# Watermill doesn’t enforce any message format. NewMessage
expects a slice of bytes as the payload.
You can use strings, JSON, protobuf, Avro, gob, or anything else that serializes to []byte
.
The message UUID is optional but recommended for debugging.
msg := message . NewMessage ( watermill . NewUUID (), [] byte ( "Hello, world!" ))
Publishing Messages# Publish
expects a topic and one or more Message
s to be published.
err := publisher . Publish ( "example.topic" , msg )
if err != nil {
panic ( err )
}
Go Channel
Kafka
NATS Streaming
Google Cloud Pub/Sub
RabbitMQ (AMQP)
SQL
AWS SQS
AWS SNS
Router# Publishers and subscribers
are the low-level parts of Watermill.
For most cases, you want to use a high-level API: Router
component.
Router configuration# Start with configuring the router and adding plugins and middlewares.
A middleware is a function executed for each incoming message.
You can use one of the existing ones for things like correlation, metrics, poison queue, retrying, throttling, etc.
.
You can also create your own.
// ...
router , err := message . NewRouter ( message . RouterConfig {}, logger )
if err != nil {
panic ( err )
}
// SignalsHandler will gracefully shutdown Router when SIGTERM is received.
// You can also close the router by just calling `r.Close()`.
router . AddPlugin ( plugin . SignalsHandler )
// Router level middleware are executed for every message sent to the router
router . AddMiddleware (
// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
middleware . CorrelationID ,
// The handler function is retried if it returns an error.
// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.
middleware . Retry {
MaxRetries : 3 ,
InitialInterval : time . Millisecond * 100 ,
Logger : logger ,
}. Middleware ,
// Recoverer handles panics from handlers.
// In this case, it passes them as errors to the Retry middleware.
middleware . Recoverer ,
)
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
Handlers# Set up handlers that the router uses.
Each handler independently handles incoming messages.
A handler listens to messages from the given subscriber and topic.
Any messages returned from the handler function will be published to the given publisher and topic.
// ...
// AddHandler returns a handler which can be used to add handler level middleware
// or to stop handler.
handler := router . AddHandler (
"struct_handler" , // handler name, must be unique
"incoming_messages_topic" , // topic from which we will read events
pubSub ,
"outgoing_messages_topic" , // topic to which we will publish events
pubSub ,
structHandler {}. Handler ,
)
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
Note: the example above uses one pubSub
argument for both the subscriber and publisher.
It’s because we use the GoChannel
implementation, which is a simple in-memory Pub/Sub.
Alternatively, if you don’t plan to publish messages from within the handler, you can use the simpler AddNoPublisherHandler
method.
// ...
router . AddNoPublisherHandler (
"print_incoming_messages" ,
"incoming_messages_topic" ,
pubSub ,
printMessages ,
)
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
You can use two types of handler functions :
a function func(msg *message.Message) ([]*message.Message, error)
a struct method func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Use the first one if your handler is a function without any dependencies.
The second option is useful when your handler requires dependencies such as a database handle or a logger.
// ...
func printMessages ( msg * message . Message ) error {
fmt . Printf (
"\n> Received message: %s\n> %s\n> metadata: %v\n\n" ,
msg . UUID , string ( msg . Payload ), msg . Metadata ,
)
return nil
}
type structHandler struct {
// we can add some dependencies here
}
func ( s structHandler ) Handler ( msg * message . Message ) ([] * message . Message , error ) {
log . Println ( "structHandler received message" , msg . UUID )
msg = message . NewMessage ( watermill . NewUUID (), [] byte ( "message produced by structHandler" ))
return message . Messages { msg }, nil
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
Finally, run the router.
// ...
if err := router . Run ( ctx ); err != nil {
panic ( err )
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
The complete example’s source can be found at /_examples/basic/3-router/main.go
.
Logging# To see Watermill’s logs, pass any logger that implements the LoggerAdapter
.
For experimental development, you can use NewStdLogger
.
Watermill provides ready-to-use slog
adapter. You can create it with watermill.NewSlogLogger
.
You can also map Watermill’s log levels to slog
levels with watermill.NewSlogLoggerWithLevelMapping
.
What’s next?# For more details, see documentation topics
.
See the CQRS component
for another high-level API.
Examples# Check out the examples
that will show you how to start using Watermill.
The recommended entry point is Your first Watermill application
.
It contains the entire environment in docker-compose.yml
, including Go and Kafka, which you can run with one command.
After that, you can see the Realtime feed
example.
It uses more middlewares and contains two handlers.
For a different subscriber implementation (HTTP ), see the receiving-webhooks
example.
It is a straightforward application that saves webhooks to Kafka.
You can find the complete list of examples in the README
.
Support# If anything is not clear, feel free to use any of our support channels
; we will be glad to help.