Google Cloud Pub/Sub On this page Cloud Pub/Sub brings the flexibility and reliability of enterprise message-oriented middleware to
the cloud.
At the same time, Cloud Pub/Sub is a scalable, durable event ingestion and delivery
system that serves as a foundation for modern stream analytics pipelines.
By providing many-to-many, asynchronous messaging that decouples senders and receivers,
it allows for secure and highly available communication among independently written applications.
Cloud Pub/Sub delivers low-latency, durable messaging that helps developers quickly integrate
systems hosted on the Google Cloud Platform and externally.
Official Documentation: https://cloud.google.com/pubsub/docs/
You can find a fully functional example with Google Cloud Pub/Sub in the Watermill examples
.
Installation# go get github.com/ThreeDotsLabs/watermill-googlecloud
Characteristics# Feature Implements Note ConsumerGroups yes multiple subscribers within the same Subscription name ExactlyOnceDelivery no GuaranteedOrder no Persistent yes* maximum retention time is 7 days
Configuration# // ...
type PublisherConfig struct {
// ProjectID is the Google Cloud Engine project ID.
ProjectID string
// If true, `Publisher` does not check if the topic exists before publishing.
DoNotCheckTopicExistence bool
// If false (default), `Publisher` tries to create a topic if there is none with the requested name.
// Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool
// Enables the topic message ordering
EnableMessageOrdering bool
// Enables automatic resume publish upon error
EnableMessageOrderingAutoResumePublishOnError bool
// ConnectTimeout defines the timeout for connecting to Pub/Sub
ConnectTimeout time . Duration
// PublishTimeout defines the timeout for publishing messages.
PublishTimeout time . Duration
// Settings for cloud.google.com/go/pubsub client library.
PublishSettings * pubsub . PublishSettings
ClientOptions [] option . ClientOption
ClientConfig * pubsub . ClientConfig
Marshaler Marshaler
}
func ( c * PublisherConfig ) setDefaults () {
if c . Marshaler == nil {
c . Marshaler = DefaultMarshalerUnmarshaler {}
}
if c . ConnectTimeout == 0 {
c . ConnectTimeout = time . Second * 10
}
if c . PublishTimeout == 0 {
c . PublishTimeout = time . Second * 5
}
}
func NewPublisher ( config PublisherConfig , logger watermill . LoggerAdapter ) ( * Publisher , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/publisher.go
// ...
type SubscriberConfig struct {
// GenerateSubscriptionName generates subscription name for a given topic.
// The subscription connects the topic to a subscriber application that receives and processes
// messages published to the topic.
//
// By default, subscriptions expire after 31 days of inactivity.
//
// A topic can have multiple subscriptions, but a given subscription belongs to a single topic.
GenerateSubscriptionName SubscriptionNameFn
// ProjectID is the Google Cloud Engine project ID.
ProjectID string
// TopicProjectID is an optionnal configuration value representing
// the underlying topic Google Cloud Engine project ID.
// This can be helpful when subscription is linked to a topic for another project.
TopicProjectID string
// If false (default), `Subscriber` tries to create a subscription if there is none with the requested name.
// Otherwise, trying to use non-existent subscription results in `ErrSubscriptionDoesNotExist`.
DoNotCreateSubscriptionIfMissing bool
// If false (default), `Subscriber` tries to create a topic if there is none with the requested name
// and it is trying to create a new subscription with this topic name.
// Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool
// deprecated: ConnectTimeout is no longer used, please use timeout on context in Subscribe() method
ConnectTimeout time . Duration
// InitializeTimeout defines the timeout for initializing topics.
InitializeTimeout time . Duration
// Settings for cloud.google.com/go/pubsub client library.
ReceiveSettings pubsub . ReceiveSettings
SubscriptionConfig pubsub . SubscriptionConfig
ClientOptions [] option . ClientOption
ClientConfig * pubsub . ClientConfig
// Unmarshaler transforms the client library format into watermill/message.Message.
// Use a custom unmarshaler if needed, otherwise the default Unmarshaler should cover most use cases.
Unmarshaler Unmarshaler
}
func ( sc SubscriberConfig ) topicProjectID () string {
if sc . TopicProjectID != "" {
return sc . TopicProjectID
}
return sc . ProjectID
}
type SubscriptionNameFn func ( topic string ) string
// TopicSubscriptionName uses the topic name as the subscription name.
func TopicSubscriptionName ( topic string ) string {
return topic
}
// TopicSubscriptionNameWithSuffix uses the topic name with a chosen suffix as the subscription name.
func TopicSubscriptionNameWithSuffix ( suffix string ) SubscriptionNameFn {
return func ( topic string ) string {
return topic + suffix
}
}
func ( c * SubscriberConfig ) setDefaults () {
if c . GenerateSubscriptionName == nil {
c . GenerateSubscriptionName = TopicSubscriptionName
}
if c . InitializeTimeout == 0 {
c . InitializeTimeout = time . Second * 10
}
if c . Unmarshaler == nil {
c . Unmarshaler = DefaultMarshalerUnmarshaler {}
}
}
func NewSubscriber (
// ...
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/subscriber.go
Subscription name# To receive messages published to a topic, you must create a subscription to that topic.
Only messages published to the topic after the subscription is created are available to subscriber
applications.
The subscription connects the topic to a subscriber application that receives and processes
messages published to the topic.
A topic can have multiple subscriptions, but a given subscription belongs to a single topic.
In Watermill, the subscription is created automatically during calling Subscribe()
.
Subscription name is generated by function passed to SubscriberConfig.GenerateSubscriptionName
.
By default, it is just the topic name (TopicSubscriptionName
).
When you want to consume messages from a topic with multiple subscribers, you should use
TopicSubscriptionNameWithSuffix
or your custom function to generate the subscription name.
Connecting# Watermill will connect to the instance of Google Cloud Pub/Sub indicated by the environment variables. For production setup, set the GOOGLE_APPLICATION_CREDENTIALS
env, as described in the official Google Cloud Pub/Sub docs
. Note that you won’t need to install the Cloud SDK, as Watermill will take care of the administrative tasks (creating topics/subscriptions) with the default settings and proper permissions.
For development, you can use a Docker image with the emulator and the PUBSUB_EMULATOR_HOST
env (check out the Getting Started guide
).
// ...
publisher , err := googlecloud . NewPublisher ( googlecloud . PublisherConfig {
ProjectID : "test-project" ,
}, logger )
if err != nil {
panic ( err )
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go
// ...
subscriber , err := googlecloud . NewSubscriber (
googlecloud . SubscriberConfig {
// custom function to generate Subscription Name,
// there are also predefined TopicSubscriptionName and TopicSubscriptionNameWithSuffix available.
GenerateSubscriptionName : func ( topic string ) string {
return "test-sub_" + topic
},
ProjectID : "test-project" ,
},
logger ,
)
if err != nil {
panic ( err )
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go
Publishing# // ...
// PublishTimeout defines the timeout for publishing messages.
PublishTimeout time . Duration
// Settings for cloud.google.com/go/pubsub client library.
PublishSettings * pubsub . PublishSettings
ClientOptions [] option . ClientOption
ClientConfig * pubsub . ClientConfig
Marshaler Marshaler
}
func ( c * PublisherConfig ) setDefaults () {
if c . Marshaler == nil {
c . Marshaler = DefaultMarshalerUnmarshaler {}
}
if c . ConnectTimeout == 0 {
c . ConnectTimeout = time . Second * 10
}
if c . PublishTimeout == 0 {
c . PublishTimeout = time . Second * 5
}
}
func NewPublisher ( config PublisherConfig , logger watermill . LoggerAdapter ) ( * Publisher , error ) {
config . setDefaults ()
if logger == nil {
logger = watermill . NopLogger {}
}
pub := & Publisher {
topics : map [ string ] * pubsub . Topic {},
config : config ,
logger : logger ,
}
ctx , cancel := context . WithTimeout ( context . Background (), config . ConnectTimeout )
defer cancel ()
cc , errc , err := connect ( ctx , config )
if err != nil {
return nil , err
}
select {
case <- ctx . Done ():
return nil , ErrConnectTimeout
case pub . client = <- cc :
case err = <- errc :
return nil , err
}
return pub , nil
}
func connect ( ctx context . Context , config PublisherConfig ) ( <- chan * pubsub . Client , <- chan error , error ) {
out := make ( chan * pubsub . Client )
errc := make ( chan error , 1 )
go func () {
defer close ( out )
defer close ( errc )
// blocking
c , err := pubsub . NewClientWithConfig ( context . Background (), config . ProjectID , config . ClientConfig , config . ClientOptions ... )
if err != nil {
errc <- err
return
}
select {
case out <- c :
// ok, carry on
case <- ctx . Done ():
return
}
}()
return out , errc , nil
}
// Publish publishes a set of messages on a Google Cloud Pub/Sub topic.
// It blocks until all the messages are successfully published or an error occurred.
//
// To receive messages published to a topic, you must create a subscription to that topic.
// Only messages published to the topic after the subscription is created are available to subscriber applications.
//
// See https://cloud.google.com/pubsub/docs/publisher to find out more about how Google Cloud Pub/Sub Publishers work.
func ( p * Publisher ) Publish ( topic string , messages ...* message . Message ) error {
// ...
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/publisher.go
Subscribing# // ...
// Subscribe consumes Google Cloud Pub/Sub and outputs them as Waterfall Message objects on the returned channel.
//
// In Google Cloud Pub/Sub, it is impossible to subscribe directly to a topic. Instead, a *subscription* is used.
// Each subscription has one topic, but there may be multiple subscriptions to one topic (with different names).
//
// The `topic` argument is transformed into subscription name with the configured `GenerateSubscriptionName` function.
// By default, if the subscription or topic don't exist, the are created. This behavior may be changed in the config.
//
// Be aware that in Google Cloud Pub/Sub, only messages sent after the subscription was created can be consumed.
//
// See https://cloud.google.com/pubsub/docs/subscriber to find out more about how Google Cloud Pub/Sub Subscriptions work.
func ( s * Subscriber ) Subscribe ( ctx context . Context , topic string ) ( <- chan * message . Message , error ) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/subscriber.go
Marshaler# Watermill’s messages cannot be directly sent to Google Cloud Pub/Sub - they need to be marshaled. You can implement your marshaler or use the default implementation.
// ...
// Marshaler transforms a Waterfall Message into the Google Cloud client library Message.
type Marshaler interface {
Marshal ( topic string , msg * message . Message ) ( * pubsub . Message , error )
}
// Unmarshaler transforms a Google Cloud client library Message into the Waterfall Message.
type Unmarshaler interface {
Unmarshal ( * pubsub . Message ) ( * message . Message , error )
}
// UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID.
const UUIDHeaderKey = "_watermill_message_uuid"
// GoogleMessageIDHeaderKey is the key of the Pub/Sub attribute that carries Google Cloud Message ID.
// This ID is assigned by the server when the message is published and is guaranteed to be unique within the topic.
const GoogleMessageIDHeaderKey = "_watermill_message_google_message_id"
// DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way:
// All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata.
// Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key.
type DefaultMarshalerUnmarshaler struct {}
// ...
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/marshaler.go