RabbitMQ (AMQP)

RabbitMQ is the most widely deployed open source message broker.

We are providing Pub/Sub implementation based on github.com/rabbitmq/amqp091-go official library.

// AMQP implementation of Watermill's Pub/Sub interface.
//
// Supported features:
// - Reconnect support
// - Fully customizable configuration
// - Qos settings
// - TLS support
// - Publish Transactions support (optional, can be enabled in config)
//
// # Nomenclature
//
// Unfortunately, Watermill's nomenclature is not fully compatible with AMQP's nomenclature.
// Depending of the configuration, topic can be mapped to exchange name, routing key and queue name.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// In case of any problem to find to what exchange name, routing key and queue name are set,
// just enable logging with debug level and check it in logs.
package amqp
// ...

Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/doc.go

You can find a fully functional example with RabbitMQ in the Watermill examples .

Installation

go get github.com/ThreeDotsLabs/watermill-amqp/v3

Characteristics

FeatureImplementsNote
ConsumerGroupsyes*there are no literal consumer groups in AMQP, but we can achieve similar behaviour with GenerateQueueNameTopicNameWithSuffix. For more details please check AMQP “Consumer Groups” section
ExactlyOnceDeliveryno
GuaranteedOrderyesyes, please check https://www.rabbitmq.com/semantics.html#ordering
Persistentyes*when using NewDurablePubSubConfig or NewDurableQueueConfig

Configuration

Our AMQP is shipped with some pre-created configurations:

// ...
// NewDurablePubSubConfig creates config for durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
	return Config{
		Connection: ConnectionConfig{
			AmqpURI: amqpURI,
		},

		Marshaler: DefaultMarshaler{},

		Exchange: ExchangeConfig{
			GenerateName: GenerateExchangeNameTopicName,
			Type:         "fanout",
			Durable:      true,
		},
		Queue: QueueConfig{
			GenerateName: generateQueueName,
			Durable:      true,
		},
		QueueBind: QueueBindConfig{
			GenerateRoutingKey: func(topic string) string {
				return ""
			},
		},
		Publish: PublishConfig{
			GenerateRoutingKey: func(topic string) string {
				return ""
			},
		},
		Consume: ConsumeConfig{
			Qos: QosConfig{
				PrefetchCount: 1,
			},
		},
		TopologyBuilder: &DefaultTopologyBuilder{},
	}
}

// NewNonDurablePubSubConfig creates config for non-durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and the routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
	return Config{
		Connection: ConnectionConfig{
			AmqpURI: amqpURI,
		},

		Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

		Exchange: ExchangeConfig{
			GenerateName: GenerateExchangeNameTopicName,
			Type:         "fanout",
		},
		Queue: QueueConfig{
			GenerateName: generateQueueName,
		},
		QueueBind: QueueBindConfig{
			GenerateRoutingKey: func(topic string) string {
				return ""
			},
		},
		Publish: PublishConfig{
			GenerateRoutingKey: func(topic string) string {
				return ""
			},
		},
		Consume: ConsumeConfig{
			Qos: QosConfig{
				PrefetchCount: 1,
			},
		},
		TopologyBuilder: &DefaultTopologyBuilder{},
	}
}

// NewDurableQueueConfig creates config for durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurableQueueConfig(amqpURI string) Config {
	return Config{
		Connection: ConnectionConfig{
			AmqpURI: amqpURI,
		},

		Marshaler: DefaultMarshaler{},

		Exchange: ExchangeConfig{
			GenerateName: GenerateExchangeNameConstant(""),
		},
		Queue: QueueConfig{
			GenerateName: GenerateQueueNameTopicName,
			Durable:      true,
		},
		QueueBind: QueueBindConfig{
			GenerateRoutingKey: func(topic string) string {
				return ""
			},
		},
		Publish: PublishConfig{
			GenerateRoutingKey: func(topic string) string {
				return topic
			},
		},
		Consume: ConsumeConfig{
			Qos: QosConfig{
				PrefetchCount: 1,
			},
		},
		TopologyBuilder: &DefaultTopologyBuilder{},
	}
}

// NewNonDurableQueueConfig creates config for non-durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurableQueueConfig(amqpURI string) Config {
	return Config{
		Connection: ConnectionConfig{
			AmqpURI: amqpURI,
		},

		Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

		Exchange: ExchangeConfig{
			GenerateName: GenerateExchangeNameConstant(""),
		},
		Queue: QueueConfig{
			GenerateName: GenerateQueueNameTopicName,
		},
		QueueBind: QueueBindConfig{
			GenerateRoutingKey: func(topic string) string {
				return ""
			},
		},
		Publish: PublishConfig{
			GenerateRoutingKey: func(topic string) string {
				return topic
			},
		},
		Consume: ConsumeConfig{
			Qos: QosConfig{
				PrefetchCount: 1,
			},
		},
		TopologyBuilder: &DefaultTopologyBuilder{},
	}
}

// NewDurableTopicConfig creates config for topic exchange for durable Queue.
// Queue name and Exchange are set to the parameters.
func NewDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
	return Config{
		Connection: ConnectionConfig{
			AmqpURI: amqpURI,
		},

		Marshaler: DefaultMarshaler{},

		Exchange: ExchangeConfig{
			GenerateName: GenerateExchangeNameConstant(exchange),
			Type:         "topic",
		},
		Queue: QueueConfig{
			GenerateName: GenerateQueueNameConstant(queue),
			Durable:      true,
		},
		QueueBind: QueueBindConfig{
			GenerateRoutingKey: func(topic string) string {
				return topic
			},
		},
		Publish: PublishConfig{
			GenerateRoutingKey: func(topic string) string {
				return topic
			},
		},
		Consume: ConsumeConfig{
			Qos: QosConfig{
				PrefetchCount: 1,
			},
		},
		TopologyBuilder: &DefaultTopologyBuilder{},
	}
}

// NewNonDurableTopicConfig creates config for topic exchange for non-durable Queue.
// Queue name and Exchange are set to the parameters.
func NewNonDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
	return Config{
		Connection: ConnectionConfig{
			AmqpURI: amqpURI,
		},

		Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

		Exchange: ExchangeConfig{
			GenerateName: GenerateExchangeNameConstant(exchange),
			Type:         "topic",
		},
		Queue: QueueConfig{
			GenerateName: GenerateQueueNameConstant(queue),
		},
		QueueBind: QueueBindConfig{
			GenerateRoutingKey: func(topic string) string {
				return topic
			},
		},
		Publish: PublishConfig{
			GenerateRoutingKey: func(topic string) string {
				return topic
			},
		},
		Consume: ConsumeConfig{
			Qos: QosConfig{
				PrefetchCount: 1,
			},
		},
		TopologyBuilder: &DefaultTopologyBuilder{},
	}
}

type Config struct {
// ...

Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/config.go

For detailed configuration description, please check watermill-amqp/pkg/amqp/config.go

TLS Config

TLS config can be passed to Config.TLSConfig.

Connecting

// ...
	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	subscriber, err := amqp.NewSubscriber(
		// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// It works as a simple queue.
		//
		// If you want to implement a Pub/Sub style service instead, check
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

Publishing

// ...
// Publish publishes messages to AMQP broker.
// Publish is blocking until the broker has received and saved the message.
// Publish is always thread safe.
//
// Watermill's topic in Publish is not mapped to AMQP's topic, but depending on configuration it can be mapped
// to exchange, queue or routing key.
// For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
func (p *Publisher) Publish(topic string, messages ...*message.Message) (err error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/publisher.go

Subscribing

// ...
// Subscribe consumes messages from AMQP broker.
//
// Watermill's topic in Subscribe is not mapped to AMQP's topic, but depending on configuration, it can be mapped
// to exchange, queue or routing key.
// For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/subscriber.go

Marshaler

Marshaler is responsible for mapping AMQP’s messages to Watermill’s messages.

Marshaller can be changed via the Configuration. If you need to customize thing in amqp.Delivery, you can do it PostprocessPublishing function.

// ...
// Marshaler marshals Watermill's message to amqp.Publishing and unmarshals amqp.Delivery to Watermill's message.
type Marshaler interface {
	Marshal(msg *message.Message) (amqp.Publishing, error)
	Unmarshal(amqpMsg amqp.Delivery) (*message.Message, error)
}

type DefaultMarshaler struct {
	// PostprocessPublishing can be used to make some extra processing with amqp.Publishing,
	// for example add CorrelationId and ContentType:
	//
	//  amqp.DefaultMarshaler{
	//		PostprocessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing {
	//			publishing.CorrelationId = "correlation"
	//			publishing.ContentType = "application/json"
	//
	//			return publishing
	//		},
	//	}
	PostprocessPublishing func(amqp.Publishing) amqp.Publishing

	// When true, DeliveryMode will be not set to Persistent.
	//
	// DeliveryMode Transient means higher throughput, but messages will not be
	// restored on broker restart. The delivery mode of publishings is unrelated
	// to the durability of the queues they reside on. Transient messages will
	// not be restored to durable queues, persistent messages will be restored to
	// durable queues and lost on non-durable queues during server restart.
	NotPersistentDeliveryMode bool

	// Header used to store and read message UUID.
	//
	// If value is empty, DefaultMessageUUIDHeaderKey value is used.
	// If header doesn't exist, empty value is passed as message UUID.
	MessageUUIDHeaderKey string
}

func (d DefaultMarshaler) Marshal(msg *message.Message) (amqp.Publishing, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/marshaler.go

AMQP “Consumer Groups”

AMQP doesn’t provide mechanism like Kafka’s “consumer groups”. You can still achieve similar behaviour with GenerateQueueNameTopicNameWithSuffix and NewDurablePubSubConfig.

// ...
func createSubscriber(queueSuffix string) *amqp.Subscriber {
	subscriber, err := amqp.NewSubscriber(
		// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html
		// to create just a simple queue, you can use NewDurableQueueConfig or create your own config.
		amqp.NewDurablePubSubConfig(
			amqpURI,
			// Rabbit's queue name in this example is based on Watermill's topic passed to Subscribe
			// plus provided suffix.
			//
			// Exchange is Rabbit's "fanout", so when subscribing with suffix other than "test_consumer_group",
			// it will also receive all messages. It will work like separate consumer groups in Kafka.
			amqp.GenerateQueueNameTopicNameWithSuffix(queueSuffix),
		),
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}
	return subscriber
}

func main() {
	subscriber1 := createSubscriber("test_consumer_group_1")
	messages1, err := subscriber1.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}
	go process("subscriber_1", messages1)

	subscriber2 := createSubscriber("test_consumer_group_2")
	messages2, err := subscriber2.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}
	// subscriber2 will receive all messages independently from subscriber1
	go process("subscriber_2", messages2)
// ...

Full source: github.com/ThreeDotsLabs/watermill/docs/content/docs/snippets/amqp-consumer-groups/main.go

In this example both pubSub1 and pubSub2 will receive some messages independently.

AMQP TopologyBuilder

// ...
// TopologyBuilder is responsible for declaring exchange, queues and queues binding.
//
// Default TopologyBuilder is DefaultTopologyBuilder.
// If you need a custom-built topology, you should implement your own TopologyBuilder and pass it to the amqp.Config:
//
//	config := NewDurablePubSubConfig()
//	config.TopologyBuilder = MyProCustomBuilder{}
// ...

Full source: github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp/topology_builder.go


Check our online hands-on training