// ...
// NewDurablePubSubConfig creates config for durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameTopicName,
Type: "fanout",
Durable: true,
},
Queue: QueueConfig{
GenerateName: generateQueueName,
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewNonDurablePubSubConfig creates config for non-durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and the routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameTopicName,
Type: "fanout",
},
Queue: QueueConfig{
GenerateName: generateQueueName,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewDurableQueueConfig creates config for durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurableQueueConfig(amqpURI string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(""),
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewNonDurableQueueConfig creates config for non-durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurableQueueConfig(amqpURI string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(""),
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewDurableTopicConfig creates config for topic exchange for durable Queue.
// Queue name and Exchange are set to the parameters.
func NewDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(exchange),
Type: "topic",
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameConstant(queue),
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewNonDurableTopicConfig creates config for topic exchange for non-durable Queue.
// Queue name and Exchange are set to the parameters.
func NewNonDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(exchange),
Type: "topic",
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameConstant(queue),
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
type Config struct {
// ...