RabbitMQ (AMQP)
RabbitMQ is the most widely deployed open source message broker.
We are providing Pub/Sub implementation based on github.com/rabbitmq/amqp091-go official library.
Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/doc.go
// AMQP implementation of Watermill's Pub/Sub interface.
//
// Supported features:
// - Reconnect support
// - Fully customizable configuration
// - Qos settings
// - TLS support
// - Publish Transactions support (optional, can be enabled in config)
//
// Nomenclature
//
// Unfortunately, Watermill's nomenclature is not fully compatible with AMQP's nomenclature.
// Depending of the configuration, topic can be mapped to exchange name, routing key and queue name.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// In case of any problem to find to what exchange name, routing key and queue name are set,
// just enable logging with debug level and check it in logs.
package amqp
// ...
Installation
go get github.com/ThreeDotsLabs/watermill-amqp/v2
Characteristics
Feature | Implements | Note |
---|---|---|
ConsumerGroups | yes* | there are no literal consumer groups in AMQP, but we can achieve similar behaviour with GenerateQueueNameTopicNameWithSuffix . For more details please check AMQP “Consumer Groups” section |
ExactlyOnceDelivery | no | |
GuaranteedOrder | yes | yes, please check https://www.rabbitmq.com/semantics.html#ordering |
Persistent | yes* | when using NewDurablePubSubConfig or NewDurableQueueConfig |
Configuration
Our AMQP is shipped with some pre-created configurations:
Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/config.go
// ...
// NewDurablePubSubConfig creates config for durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{},
Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return topic
},
Type: "fanout",
Durable: true,
},
Queue: QueueConfig{
GenerateName: generateQueueName,
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewNonDurablePubSubConfig creates config for non durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},
Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return topic
},
Type: "fanout",
},
Queue: QueueConfig{
GenerateName: generateQueueName,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewDurableQueueConfig creates config for durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurableQueueConfig(amqpURI string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{},
Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return ""
},
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewNonDurableQueueConfig creates config for non durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurableQueueConfig(amqpURI string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},
Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return ""
},
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
type Config struct {
// ...
For detailed configuration description, please check watermill-amqp/pkg/amqp/config.go
TLS Config
TLS config can be passed to Config.TLSConfig
.
Connecting
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
subscriber, err := amqp.NewSubscriber(
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// It works as a simple queue.
//
// If you want to implement a Pub/Sub style service instead, check
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
// ...
Publishing
Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/publisher.go
// ...
// Publish publishes messages to AMQP broker.
// Publish is blocking until the broker has received and saved the message.
// Publish is always thread safe.
//
// Watermill's topic in Publish is not mapped to AMQP's topic, but depending on configuration it can be mapped
// to exchange, queue or routing key.
// For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
func (p *Publisher) Publish(topic string, messages ...*message.Message) (err error) {
// ...
Subscribing
Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/subscriber.go
// ...
// Subscribe consumes messages from AMQP broker.
//
// Watermill's topic in Subscribe is not mapped to AMQP's topic, but depending on configuration it can be mapped
// to exchange, queue or routing key.
// For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...
Marshaler
Marshaler is responsible for mapping AMQP’s messages to Watermill’s messages.
Marshaller can be changed via the Configuration.
If you need to customize thing in amqp.Delivery
, you can do it PostprocessPublishing
function.
Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/marshaler.go
// ...
// Marshaler marshals Watermill's message to amqp.Publishing and unmarshals amqp.Delivery to Watermill's message.
type Marshaler interface {
Marshal(msg *message.Message) (amqp.Publishing, error)
Unmarshal(amqpMsg amqp.Delivery) (*message.Message, error)
}
type DefaultMarshaler struct {
// PostprocessPublishing can be used to make some extra processing with amqp.Publishing,
// for example add CorrelationId and ContentType:
//
// amqp.DefaultMarshaler{
// PostprocessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing {
// publishing.CorrelationId = "correlation"
// publishing.ContentType = "application/json"
//
// return publishing
// },
// }
PostprocessPublishing func(amqp.Publishing) amqp.Publishing
// When true, DeliveryMode will be not set to Persistent.
//
// DeliveryMode Transient means higher throughput, but messages will not be
// restored on broker restart. The delivery mode of publishings is unrelated
// to the durability of the queues they reside on. Transient messages will
// not be restored to durable queues, persistent messages will be restored to
// durable queues and lost on non-durable queues during server restart.
NotPersistentDeliveryMode bool
// Header used to store and read message UUID.
//
// If value is empty, DefaultMessageUUIDHeaderKey value is used.
// If header doesn't exist, empty value is passed as message UUID.
MessageUUIDHeaderKey string
}
func (d DefaultMarshaler) Marshal(msg *message.Message) (amqp.Publishing, error) {
// ...
AMQP “Consumer Groups”
AMQP doesn’t provide mechanism like Kafka’s “consumer groups”. You can still achieve similar behaviour with GenerateQueueNameTopicNameWithSuffix
and NewDurablePubSubConfig
.
Full source: github.com/ThreeDotsLabs/watermill/docs/content/docs/snippets/amqp-consumer-groups/main.go
// ...
func createSubscriber(queueSuffix string) *amqp.Subscriber {
subscriber, err := amqp.NewSubscriber(
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html
// to create just a simple queue, you can use NewDurableQueueConfig or create your own config.
amqp.NewDurablePubSubConfig(
amqpURI,
// Rabbit's queue name in this example is based on Watermill's topic passed to Subscribe
// plus provided suffix.
//
// Exchange is Rabbit's "fanout", so when subscribing with suffix other than "test_consumer_group",
// it will also receive all messages. It will work like separate consumer groups in Kafka.
amqp.GenerateQueueNameTopicNameWithSuffix(queueSuffix),
),
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
return subscriber
}
func main() {
subscriber1 := createSubscriber("test_consumer_group_1")
messages1, err := subscriber1.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process("subscriber_1", messages1)
subscriber2 := createSubscriber("test_consumer_group_2")
messages2, err := subscriber2.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
// subscriber2 will receive all messages independently from subscriber1
go process("subscriber_2", messages2)
// ...
In this example both pubSub1
and pubSub2
will receive some messages independently.
AMQP TopologyBuilder
Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/topology_builder.go
// ...
// TopologyBuilder is responsible for declaring exchange, queues and queues binding.
//
// Default TopologyBuilder is DefaultTopologyBuilder.
// If you need custom built topology, you should implement your own TopologyBuilder and pass it to the amqp.Config:
//
// config := NewDurablePubSubConfig()
// config.TopologyBuilder = MyProCustomBuilder{}
// ...