Amazon AWS SNS/SQS On this page AWS SQS and SNS are fully-managed message queuing and Pub/Sub-like services that make it easy to decouple
and scale microservices, distributed systems, and serverless applications.
Watermill provides a simple way to use AWS SQS and SNS with Go.
It handles all the AWS SDK internals and provides a simple API to publish and subscribe messages.
Official Documentation:
You can find a fully functional example with AWS SNS in the Watermill examples:
Installation# go get github.com/ThreeDotsLabs/watermill-aws
SQS vs SNS# While both SQS and SNS are messaging services provided by AWS, they serve different purposes and are best suited for different scenarios in your Watermill applications.
How SNS is connected with SQS# To use SNS as a Pub/Sub (to have multiple subscribers receiving the same message), you need to create an SNS topic and subscribe to SQS queues.
When a message is published to the SNS topic, it will be delivered to all subscribed SQS queues.
We implemented this logic in the watermill-aws
package out of the box.
When you subscribe to an SNS topic, Watermill AWS creates an SQS queue and subscribes to it.
We can say, that a single SQS queue acts as a consumer group or subscription in other Pub/Sub implementations.
The mechanism is detailed in AWS documentation
.
How to choose between SQS and SNS# SQS (Simple Queue Service)# Use when you need a simple message queue with a single consumer. Great for task queues or background job processing. Supports exactly-once processing (with FIFO queues) and guaranteed order (mostly). Example use case: Processing user uploads in the background.
SNS (Simple Notification Service)# Use when you need to broadcast messages to multiple subscribers. Perfect for implementing pub/sub patterns. Useful for event-driven architectures. Supports multiple types of subscribers (SQS, Lambda, HTTP/S, email, SMS, etc.). Example use case: Notifying multiple services about a new user registration.
Our SNS implementation in Watermill automatically creates and manages SQS queues for each subscriber, simplifying the process of using SNS with multiple SQS queues.
Remember, you can use both in the same application where appropriate. For instance, you might use SNS to broadcast events and SQS to process specific tasks triggered by those events.
To learn how SNS and SQS work together, see the How SNS is connected with SQS
section.
SQS# Characteristics# Feature Implements Note ConsumerGroups no it’s a queue, for consumer groups-like functionality use SNS ExactlyOnceDelivery no yes GuaranteedOrder yes* from AWS Docs
: "(…) due to the highly distributed architecture, more than one copy of a message might be delivered, and messages may occasionally arrive out of order. Despite this, standard queues make a best-effort attempt to maintain the order in which messages are sent." Persistent yes
Required permissions# "sqs:ReceiveMessage"
"sqs:DeleteMessage"
"sqs:GetQueueUrl"
"sqs:CreateQueue"
"sqs:GetQueueAttributes"
"sqs:SendMessage"
"sqs:ChangeMessageVisibility"
[todo - verify]
SQS Configuration# // ...
type SubscriberConfig struct {
// AWSConfig is the AWS configuration.
AWSConfig aws . Config
// OptFns are options for the SQS client.
OptFns [] func ( * sqs . Options )
// DoNotCreateQueueIfNotExists disables creating the queue if it does not exist.
DoNotCreateQueueIfNotExists bool
// QueueUrlResolver is a function that resolves the queue name to the queue URL.
QueueUrlResolver QueueUrlResolver
// ReconnectRetrySleep is the time to sleep between reconnect attempts.
ReconnectRetrySleep time . Duration
// QueueConfigAttributes is a struct that holds the attributes of an SQS queue.
QueueConfigAttributes QueueConfigAttributes
// GenerateCreateQueueInput generates *sqs.CreateQueueInput for AWS SDK.
GenerateCreateQueueInput GenerateCreateQueueInputFunc
// GenerateReceiveMessageInput generates *sqs.ReceiveMessageInput for AWS SDK.
GenerateReceiveMessageInput GenerateReceiveMessageInputFunc
// GenerateDeleteMessageInput generates *sqs.DeleteMessageInput for AWS SDK.
GenerateDeleteMessageInput GenerateDeleteMessageInputFunc
Unmarshaler Unmarshaler
}
func ( c * SubscriberConfig ) SetDefaults () {
if c . Unmarshaler == nil {
c . Unmarshaler = DefaultMarshalerUnmarshaler {}
}
if c . ReconnectRetrySleep == 0 {
c . ReconnectRetrySleep = time . Second
}
if c . GenerateCreateQueueInput == nil {
c . GenerateCreateQueueInput = GenerateCreateQueueInputDefault
}
if c . GenerateReceiveMessageInput == nil {
c . GenerateReceiveMessageInput = GenerateReceiveMessageInputDefault
}
if c . GenerateDeleteMessageInput == nil {
c . GenerateDeleteMessageInput = GenerateDeleteMessageInputDefault
}
if c . QueueUrlResolver == nil {
c . QueueUrlResolver = NewGetQueueUrlByNameUrlResolver ( GetQueueUrlByNameUrlResolverConfig {})
}
}
func ( c SubscriberConfig ) Validate () error {
var err error
if c . AWSConfig . Credentials == nil {
err = errors . Join ( err , errors . New ( "missing Config.Credentials" ))
}
if c . Unmarshaler == nil {
err = errors . Join ( err , errors . New ( "missing Config.Marshaler" ))
}
if c . QueueUrlResolver == nil {
err = errors . Join ( err , fmt . Errorf ( "sqs.SubscriberConfig.QueueUrlResolver is nil" ))
}
return err
}
type PublisherConfig struct {
// AWSConfig is the AWS configuration.
AWSConfig aws . Config
// OptFns are options for the SQS client.
OptFns [] func ( * sqs . Options )
// QueueConfigAttributes is a struct that holds the attributes of an SQS queue.
CreateQueueConfig QueueConfigAttributes
// DoNotCreateQueueIfNotExists disables creating the queue if it does not exist.
DoNotCreateQueueIfNotExists bool
// QueueUrlResolver is a function that resolves queue URL.
QueueUrlResolver QueueUrlResolver
// GenerateSendMessageInput generates *sqs.SendMessageInput for AWS SDK.
GenerateSendMessageInput GenerateSendMessageInputFunc
// GenerateCreateQueueInput generates *sqs.CreateQueueInput for AWS SDK.
GenerateCreateQueueInput GenerateCreateQueueInputFunc
Marshaler Marshaler
}
func ( c * PublisherConfig ) setDefaults () {
if c . Marshaler == nil {
c . Marshaler = DefaultMarshalerUnmarshaler {}
}
if c . GenerateSendMessageInput == nil {
c . GenerateSendMessageInput = GenerateSendMessageInputDefault
}
if c . GenerateCreateQueueInput == nil {
c . GenerateCreateQueueInput = GenerateCreateQueueInputDefault
}
if c . QueueUrlResolver == nil {
c . QueueUrlResolver = NewGetQueueUrlByNameUrlResolver ( GetQueueUrlByNameUrlResolverConfig {})
}
}
func ( c * PublisherConfig ) Validate () error {
var err error
if c . QueueUrlResolver == nil {
err = errors . Join ( err , fmt . Errorf ( "sqs.SubscriberConfig.QueueUrlResolver is nil" ))
}
return err
}
type GenerateCreateQueueInputFunc func ( ctx context . Context , queueName QueueName , attrs QueueConfigAttributes ) ( * sqs . CreateQueueInput , error )
// ...
Full source: github.com/ThreeDotsLabs/watermill-aws/sqs/config.go
Resolving Queue URL# In the Watermill model, we are normalizing the AWS queue url to topic
used in the Publish
and Subscribe
methods.
To give you flexibility of what you want to use as a topic in Watermill, you can customize resolving the queue URL.
// ...
// QueueUrlResolver resolves queue URL by topic passed to Publisher.Publish, Subscriber.Subscribe.
type QueueUrlResolver interface {
ResolveQueueUrl ( ctx context . Context , params ResolveQueueUrlParams ) ( QueueUrlResolverResult , error )
}
type ResolveQueueUrlParams struct {
// Topic passed to Publisher.Publish, Subscriber.Subscribe, etc.
// It may be mapped to a different name by QueueUrlResolver.
Topic string
SqsClient * sqs . Client
Logger watermill . LoggerAdapter
}
type QueueUrlResolverResult struct {
QueueName QueueName
// QueueURL is not present if queue doesn't exist.
QueueURL * QueueURL
// Exists says if queue exists.
// May be nil, if resolver doesn't have information about queue existence.
Exists * bool
}
// GenerateQueueUrlResolver is a resolver that generates queue URL based on AWS region and account ID.
// ...
Full source: github.com/ThreeDotsLabs/watermill-aws/sqs/url_resolver.go
You can implement your own QueueUrlResolver
or use one of the provided resolvers.
By default, GetQueueUrlByNameUrlResolver
resolver is used:
// ...
// GetQueueUrlByNameUrlResolver resolves queue url by calling AWS API.
// Topic name passed to Publisher.Publish, Subscriber.Subscribe is mapped to queue name.
//
// By default, queue URL is cached. It can be changed with GetQueueUrlByNameUrlResolverConfig.
type GetQueueUrlByNameUrlResolver struct {
config GetQueueUrlByNameUrlResolverConfig
queuesCache map [ string ] QueueURL
queuesCacheLock sync . RWMutex
}
func NewGetQueueUrlByNameUrlResolver (
// ...
Full source: github.com/ThreeDotsLabs/watermill-aws/sqs/url_resolver.go
There are two more resolvers available:
// ...
// GenerateQueueUrlResolver is a resolver that generates queue URL based on AWS region and account ID.
type GenerateQueueUrlResolver struct {
AwsRegion string
AwsAccountID string
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-aws/sqs/url_resolver.go
// ...
// TransparentUrlResolver is a resolver that uses topic passed to Publisher.Publish, Subscriber.Subscribe as queue URL.
// In this case, you should pass queue URL as topic.
type TransparentUrlResolver struct {}
// ...
Full source: github.com/ThreeDotsLabs/watermill-aws/sqs/url_resolver.go
Using with SQS emulator# You may want to use goaws
or localstack
for local development or testing.
You can override the endpoint using the OptFns
option in the SubscriberConfig
or PublisherConfig
.
package main
import (
amazonsqs "github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
)
func main () {
// ...
sqsOpts := [] func ( * amazonsqs . Options ){
amazonsqs . WithEndpointResolverV2 ( sqs . OverrideEndpointResolver {
Endpoint : transport . Endpoint {
URI : * lo . Must ( url . Parse ( "http://localstack:4566" )),
},
}),
}
sqsConfig := sqs . SubscriberConfig {
AWSConfig : cfg ,
OptFns : sqsOpts ,
}
sub , err := sqs . NewSubscriber ( sqsConfig , logger )
if err != nil {
panic ( fmt . Errorf ( "unable to create new subscriber: %w" , err ))
}
// ...
}
SNS# Characteristics# Feature Implements Note ConsumerGroups yes yes ExactlyOnceDelivery no yes GuaranteedOrder yes* from AWS Docs
: "(…) due to the highly distributed architecture, more than one copy of a message might be delivered, and messages may occasionally arrive out of order. Despite this, standard queues make a best-effort attempt to maintain the order in which messages are sent." Persistent yes
Required permissions# sns:Subscribe
sns:ConfirmSubscription
sns:Receive
sns:Unsubscribe
and all permissions required for SQS:
sqs:ReceiveMessage
sqs:DeleteMessage
sqs:GetQueueUrl
sqs:CreateQueue
sqs:GetQueueAttributes
sqs:SendMessage
sqs:ChangeMessageVisibility
sqs:SetQueueAttributes
Additionally, if sns.SubscriberConfig.DoNotSetQueueAccessPolicy
is not enabled, you should have the following:
SNS Configuration# // ...
type SubscriberConfig struct {
// AWSConfig is the AWS configuration.
AWSConfig aws . Config
// OptFns are options for the SNS client.
OptFns [] func ( * sns . Options )
// TopicResolver is a function that resolves the topic name to the topic ARN.
TopicResolver TopicResolver
// GenerateSqsQueueName generates the name of the SQS queue for the SNS subscription.
GenerateSqsQueueName GenerateSqsQueueNameFn
// GenerateSubscribeInput generates the input for the Subscribe operation.
GenerateSubscribeInput GenerateSubscribeInputFn
// GenerateQueueAccessPolicy generates the access policy for the SQS queue.
GenerateQueueAccessPolicy GenerateQueueAccessPolicyFn
// DoNotCreateSqsSubscription disables creating the SQS subscription.
DoNotCreateSqsSubscription bool
// DoNotSetQueueAccessPolicy disables setting the queue access policy.
// Described in AWS docs: https://docs.aws.amazon.com/sns/latest/dg/subscribe-sqs-queue-to-sns-topic.html#SendMessageToSQS.sqs.permissions
// Creating access policy requires "sqs:SetQueueAttributes" permission.
DoNotSetQueueAccessPolicy bool
}
func ( c * SubscriberConfig ) SetDefaults () {
if c . GenerateSubscribeInput == nil {
c . GenerateSubscribeInput = GenerateSubscribeInputDefault
}
if c . GenerateQueueAccessPolicy == nil {
c . GenerateQueueAccessPolicy = GenerateQueueAccessPolicyDefault
}
}
func ( c * SubscriberConfig ) Validate () error {
var err error
if c . AWSConfig . Credentials == nil {
err = errors . Join ( err , fmt . Errorf ( "sns.SubscriberConfig.AWSConfig.Credentials is nil" ))
}
if c . GenerateSqsQueueName == nil {
err = errors . Join ( err , fmt . Errorf ( "sns.SubscriberConfig.GenerateSqsQueueName is nil" ))
}
if c . TopicResolver == nil {
err = errors . Join ( err , fmt . Errorf ( "sns.SubscriberConfig.TopicResolver is nil" ))
}
return err
}
type GenerateSqsQueueNameFn func ( ctx context . Context , snsTopic TopicArn ) ( string , error )
// ...
Full source: github.com/ThreeDotsLabs/watermill-aws/sns/config.go
Additionally, because SNS Subscriber uses SQS ques as “subscriptions”, you need to pass SQS configuration
as well.
Resolving Queue URL# In the Watermill model, we normalise AWS Topic ARN to the topic
used in the Publish
and Subscribe
methods.
// ...
// TopicResolver resolves topic name to topic ARN by topic passed to Publisher.Publish, Subscriber.Subscribe.
type TopicResolver interface {
ResolveTopic ( ctx context . Context , topic string ) ( snsTopic TopicArn , err error )
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-aws/sns/topic.go
We are providing two out-of-the-box resolvers:
// ...
// TransparentTopicResolver is a TopicResolver that passes the topic as is.
type TransparentTopicResolver struct {}
// ...
Full source: github.com/ThreeDotsLabs/watermill-aws/sns/topic.go
// ...
// GenerateArnTopicResolver is a TopicResolver that generates ARN for the topic
// using the provided account ID and region.
type GenerateArnTopicResolver struct {
accountID string
region string
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-aws/sns/topic.go
Using with SNS emulator# You may want to use goaws
or localstack
for local development or testing.
You can override the endpoint using the OptFns
option in the SubscriberConfig
or PublisherConfig
.
package main
import (
amazonsns "github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/ThreeDotsLabs/watermill-amazonsns/sns"
)
func main () {
// ...
snsOpts := [] func ( * amazonsns . Options ){
amazonsns . WithEndpointResolverV2 ( sns . OverrideEndpointResolver {
Endpoint : transport . Endpoint {
URI : * lo . Must ( url . Parse ( "http://localstack:4566" )),
},
}),
}
snsConfig := sns . SubscriberConfig {
AWSConfig : cfg ,
OptFns : snsOpts ,
}
sub , err := sns . NewSubscriber ( snsConfig , sqsConfig , logger )
if err != nil {
panic ( fmt . Errorf ( "unable to create new subscriber: %w" , err ))
}
// ...
}