When you run an HTTP server, you don’t deal directly with TCP sockets, parsing HTTP requests, or managing connections.
Instead, you use a high-level library like net/http that handles all that complexity for you.
It’s what Watermill aims to be for messages.
It provides all you need to build an application based on events or other asynchronous patterns.
There are many different message queues, each with different features, client libraries, and APIs.
Watermill hides all that complexity behind an API that is easy to use and understand.
Watermill is NOT a framework.
It’s a lightweight library that’s easy to plug in or remove from your project.
The core part of Watermill is the Message
.
It is what http.Request is for the net/http package.
Most Watermill features work with this struct.
Watermill doesn’t enforce any message format. NewMessage expects a slice of bytes as the payload.
You can use strings, JSON, protobuf, Avro, gob, or anything else that serializes to []byte.
The message UUID is optional but recommended for debugging.
// ...
funcprocess(messages<-chan*message.Message){formsg:=rangemessages{fmt.Printf("received message: %s, payload: %s\n",msg.UUID,string(msg.Payload))// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()}}
// ...
funcprocess(messages<-chan*message.Message){formsg:=rangemessages{log.Printf("received message: %s, payload: %s",msg.UUID,string(msg.Payload))// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()}}
A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article
.
// ...
packagemainimport("context""log""time"stan"github.com/nats-io/stan.go""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-nats/pkg/nats""github.com/ThreeDotsLabs/watermill/message")funcmain(){subscriber,err:=nats.NewStreamingSubscriber(nats.StreamingSubscriberConfig{ClusterID:"test-cluster",ClientID:"example-subscriber",QueueGroup:"example",DurableName:"my-durable",SubscribersCount:4,// how many goroutines should consume messages
CloseTimeout:time.Minute,AckWaitTimeout:time.Second*30,StanOptions:[]stan.Option{stan.NatsURL("nats://nats-streaming:4222"),},Unmarshaler:nats.GobMarshaler{},},watermill.NewStdLogger(false,false),)iferr!=nil{panic(err)}messages,err:=subscriber.Subscribe(context.Background(),"example.topic")iferr!=nil{panic(err)}goprocess(messages)// ...
// ...
funcprocess(messages<-chan*message.Message){formsg:=rangemessages{log.Printf("received message: %s, payload: %s",msg.UUID,string(msg.Payload))// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()}}
You can run the Google Cloud Pub/Sub emulator locally for development.
services:server:image:golang:1.25restart:unless-stoppeddepends_on:- googlecloudvolumes:- .:/app- $GOPATH/pkg/mod:/go/pkg/modenvironment:# use local emulator instead of google cloud enginePUBSUB_EMULATOR_HOST:"googlecloud:8085"working_dir:/appcommand:go run main.gogooglecloud:image:google/cloud-sdk:414.0.0entrypoint:gcloud --quiet beta emulators pubsub start --host-port=0.0.0.0:8085 --verbosity=debug --log-httprestart:unless-stopped
A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article
.
// ...
packagemainimport("context""log""time""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-googlecloud/v2/pkg/googlecloud""github.com/ThreeDotsLabs/watermill/message")funcmain(){logger:=watermill.NewStdLogger(false,false)subscriber,err:=googlecloud.NewSubscriber(googlecloud.SubscriberConfig{// custom function to generate Subscription Name,
// there are also predefined TopicSubscriptionName and TopicSubscriptionNameWithSuffix available.
GenerateSubscriptionName:func(topicstring)string{return"test-sub_"+topic},ProjectID:"test-project",},logger,)iferr!=nil{panic(err)}// Subscribe will create the subscription. Only messages that are sent after the subscription is created may be received.
messages,err:=subscriber.Subscribe(context.Background(),"example.topic")iferr!=nil{panic(err)}goprocess(messages)// ...
// ...
funcprocess(messages<-chan*message.Message){formsg:=rangemessages{log.Printf("received message: %s, payload: %s",msg.UUID,string(msg.Payload))// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()}}
A more detailed explanation of how it is working (and how to add live code reload) can be found in Go Docker dev environment article
.
// ...
packagemainimport("context""log""time""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp""github.com/ThreeDotsLabs/watermill/message")varamqpURI="amqp://guest:guest@rabbitmq:5672/"funcmain(){amqpConfig:=amqp.NewDurableQueueConfig(amqpURI)subscriber,err:=amqp.NewSubscriber(// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// It works as a simple queue.
//
// If you want to implement a Pub/Sub style service instead, check
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,watermill.NewStdLogger(false,false),)iferr!=nil{panic(err)}messages,err:=subscriber.Subscribe(context.Background(),"example.topic")iferr!=nil{panic(err)}goprocess(messages)// ...
// ...
funcprocess(messages<-chan*message.Message){formsg:=rangemessages{log.Printf("received message: %s, payload: %s",msg.UUID,string(msg.Payload))// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()}}
// ...
funcprocess(messages<-chan*message.Message){formsg:=rangemessages{log.Printf("received message: %s, payload: %s",msg.UUID,string(msg.Payload))// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()}}
// ...
funcprocess(messages<-chan*message.Message){formsg:=rangemessages{log.Printf("received message: %s, payload: %s",msg.UUID,string(msg.Payload))// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()}}
// ...
funcprocess(prefixstring,messages<-chan*message.Message){formsg:=rangemessages{log.Printf("%v received message: %s, payload: %s",prefix,msg.UUID,string(msg.Payload))// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()}}
// ...
router,err:=message.NewRouter(message.RouterConfig{},logger)iferr!=nil{panic(err)}// SignalsHandler will gracefully shutdown Router when SIGTERM is received.
// You can also close the router by just calling `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)// Router level middleware are executed for every message sent to the router
router.AddMiddleware(// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
middleware.CorrelationID,// The handler function is retried if it returns an error.
// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.
middleware.Retry{MaxRetries:3,InitialInterval:time.Millisecond*100,Logger:logger,}.Middleware,// Recoverer handles panics from handlers.
// In this case, it passes them as errors to the Retry middleware.
middleware.Recoverer,)// ...
Set up handlers that the router uses.
Each handler independently handles incoming messages.
A handler listens to messages from the given subscriber and topic.
Any messages returned from the handler function will be published to the given publisher and topic.
// ...
// AddHandler returns a handler which can be used to add handler level middleware
// or to stop handler.
handler:=router.AddHandler("struct_handler",// handler name, must be unique
"incoming_messages_topic",// topic from which we will read events
pubSub,"outgoing_messages_topic",// topic to which we will publish events
pubSub,structHandler{}.Handler,)// ...
Note: the example above uses one pubSub argument for both the subscriber and publisher.
It’s because we use the GoChannel implementation, which is a simple in-memory Pub/Sub.
Alternatively, if you don’t plan to publish messages from within the handler, you can use the simpler AddConsumerHandler method.
a function func(msg *message.Message) ([]*message.Message, error)
a struct method func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Use the first one if your handler is a function without any dependencies.
The second option is useful when your handler requires dependencies such as a database handle or a logger.
// ...
funcprintMessages(msg*message.Message)error{fmt.Printf("\n> Received message: %s\n> %s\n> metadata: %v\n\n",msg.UUID,string(msg.Payload),msg.Metadata,)returnnil}typestructHandlerstruct{// we can add some dependencies here
}func(sstructHandler)Handler(msg*message.Message)([]*message.Message,error){log.Println("structHandler received message",msg.UUID)msg=message.NewMessage(watermill.NewUUID(),[]byte("message produced by structHandler"))returnmessage.Messages{msg},nil}
The Outbox Pattern
is a key pattern to know in event-driven applications.
We recommend checking the examples below to see how Watermill works in practice.
You can also try the free hands-on training
to learn how to use Watermill in practice.
Check out the examples
that will show you how to start using Watermill.
The recommended entry point is Your first Watermill application
.
It contains the entire environment in docker-compose.yml, including Go and Kafka, which you can run with one command.
After that, you can see the Realtime feed
example.
It uses more middlewares and contains two handlers.
For a different subscriber implementation (HTTP), see the receiving-webhooks
example.
It is a straightforward application that saves webhooks to Kafka.
You can find the complete list of examples in the README
.