Google Cloud Pub/Sub
Cloud Pub/Sub brings the flexibility and reliability of enterprise message-oriented middleware to the cloud.
At the same time, Cloud Pub/Sub is a scalable, durable event ingestion and delivery system that serves as a foundation for modern stream analytics pipelines. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independently written applications.
Cloud Pub/Sub delivers low-latency, durable messaging that helps developers quickly integrate systems hosted on the Google Cloud Platform and externally.
Documentation: https://cloud.google.com/pubsub/docs/
Installation
go get github.com/ThreeDotsLabs/watermill-googlecloud
Characteristics
Feature | Implements | Note |
---|---|---|
ConsumerGroups | yes | multiple subscribers within the same Subscription name |
ExactlyOnceDelivery | no | |
GuaranteedOrder | no | |
Persistent | yes* | maximum retention time is 7 days |
Configuration
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/publisher.go
// ...
type PublisherConfig struct {
// ProjectID is the Google Cloud Engine project ID.
ProjectID string
// If false (default), `Publisher` tries to create a topic if there is none with the requested name.
// Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool
// Enables the topic message ordering
EnableMessageOrdering bool
// Enables automatic resume publish upon error
EnableMessageOrderingAutoResumePublishOnError bool
// ConnectTimeout defines the timeout for connecting to Pub/Sub
ConnectTimeout time.Duration
// PublishTimeout defines the timeout for publishing messages.
PublishTimeout time.Duration
// Settings for cloud.google.com/go/pubsub client library.
PublishSettings *pubsub.PublishSettings
ClientOptions []option.ClientOption
Marshaler Marshaler
}
func (c *PublisherConfig) setDefaults() {
if c.Marshaler == nil {
c.Marshaler = DefaultMarshalerUnmarshaler{}
}
if c.ConnectTimeout == 0 {
c.ConnectTimeout = time.Second * 10
}
if c.PublishTimeout == 0 {
c.PublishTimeout = time.Second * 5
}
}
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/subscriber.go
// ...
type SubscriberConfig struct {
// GenerateSubscriptionName generates subscription name for a given topic.
// The subscription connects the topic to a subscriber application that receives and processes
// messages published to the topic.
//
// By default, subscriptions expire after 31 days of inactivity.
//
// A topic can have multiple subscriptions, but a given subscription belongs to a single topic.
GenerateSubscriptionName SubscriptionNameFn
// ProjectID is the Google Cloud Engine project ID.
ProjectID string
// TopicProjectID is an optionnal configuration value representing
// the underlying topic Google Cloud Engine project ID.
// This can be helpful when subscription is linked to a topic for another project.
TopicProjectID string
// If false (default), `Subscriber` tries to create a subscription if there is none with the requested name.
// Otherwise, trying to use non-existent subscription results in `ErrSubscriptionDoesNotExist`.
DoNotCreateSubscriptionIfMissing bool
// If false (default), `Subscriber` tries to create a topic if there is none with the requested name
// and it is trying to create a new subscription with this topic name.
// Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool
// deprecated: ConnectTimeout is no longer used, please use timeout on context in Subscribe() method
ConnectTimeout time.Duration
// InitializeTimeout defines the timeout for initializing topics.
InitializeTimeout time.Duration
// Settings for cloud.google.com/go/pubsub client library.
ReceiveSettings pubsub.ReceiveSettings
SubscriptionConfig pubsub.SubscriptionConfig
ClientOptions []option.ClientOption
// Unmarshaler transforms the client library format into watermill/message.Message.
// Use a custom unmarshaler if needed, otherwise the default Unmarshaler should cover most use cases.
Unmarshaler Unmarshaler
}
func (sc SubscriberConfig) topicProjectID() string {
if sc.TopicProjectID != "" {
return sc.TopicProjectID
}
return sc.ProjectID
}
type SubscriptionNameFn func(topic string) string
// TopicSubscriptionName uses the topic name as the subscription name.
func TopicSubscriptionName(topic string) string {
return topic
}
// TopicSubscriptionNameWithSuffix uses the topic name with a chosen suffix as the subscription name.
func TopicSubscriptionNameWithSuffix(suffix string) SubscriptionNameFn {
return func(topic string) string {
return topic + suffix
}
}
func (c *SubscriberConfig) setDefaults() {
if c.GenerateSubscriptionName == nil {
c.GenerateSubscriptionName = TopicSubscriptionName
}
if c.InitializeTimeout == 0 {
c.InitializeTimeout = time.Second * 10
}
if c.Unmarshaler == nil {
c.Unmarshaler = DefaultMarshalerUnmarshaler{}
}
}
func NewSubscriber(
// ...
Subscription name
To receive messages published to a topic, you must create a subscription to that topic. Only messages published to the topic after the subscription is created are available to subscriber applications.
The subscription connects the topic to a subscriber application that receives and processes messages published to the topic.
A topic can have multiple subscriptions, but a given subscription belongs to a single topic.
In Watermill, the subscription is created automatically during calling Subscribe()
.
Subscription name is generated by function passed to SubscriberConfig.GenerateSubscriptionName
.
By default, it is just the topic name (TopicSubscriptionName
).
When you want to consume messages from a topic with multiple subscribers, you should use
TopicSubscriptionNameWithSuffix
or your custom function to generate the subscription name.
Connecting
Watermill will connect to the instance of Google Cloud Pub/Sub indicated by the environment variables. For production setup, set the GOOGLE_APPLICATION_CREDENTIALS
env, as described in the official Google Cloud Pub/Sub docs. Note that you won’t need to install the Cloud SDK, as Watermill will take care of the administrative tasks (creating topics/subscriptions) with the default settings and proper permissions.
For development, you can use a Docker image with the emulator and the PUBSUB_EMULATOR_HOST
env (check out the Getting Started guide).
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go
// ...
publisher, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{
ProjectID: "test-project",
}, logger)
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go
// ...
subscriber, err := googlecloud.NewSubscriber(
googlecloud.SubscriberConfig{
// custom function to generate Subscription Name,
// there are also predefined TopicSubscriptionName and TopicSubscriptionNameWithSuffix available.
GenerateSubscriptionName: func(topic string) string {
return "test-sub_" + topic
},
ProjectID: "test-project",
},
logger,
)
if err != nil {
panic(err)
}
// ...
Publishing
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/publisher.go
// ...
// PublishTimeout defines the timeout for publishing messages.
PublishTimeout time.Duration
// Settings for cloud.google.com/go/pubsub client library.
PublishSettings *pubsub.PublishSettings
ClientOptions []option.ClientOption
Marshaler Marshaler
}
func (c *PublisherConfig) setDefaults() {
if c.Marshaler == nil {
c.Marshaler = DefaultMarshalerUnmarshaler{}
}
if c.ConnectTimeout == 0 {
c.ConnectTimeout = time.Second * 10
}
if c.PublishTimeout == 0 {
c.PublishTimeout = time.Second * 5
}
}
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
config.setDefaults()
if logger == nil {
logger = watermill.NopLogger{}
}
pub := &Publisher{
topics: map[string]*pubsub.Topic{},
config: config,
logger: logger,
}
ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout)
defer cancel()
cc, errc, err := connect(ctx, config)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
return nil, ErrConnectTimeout
case pub.client = <-cc:
case err = <-errc:
return nil, err
}
return pub, nil
}
func connect(ctx context.Context, config PublisherConfig) (<-chan *pubsub.Client, <-chan error, error) {
out := make(chan *pubsub.Client)
errc := make(chan error, 1)
go func() {
defer close(out)
defer close(errc)
// blocking
c, err := pubsub.NewClient(context.Background(), config.ProjectID, config.ClientOptions...)
if err != nil {
errc <- err
return
}
select {
case out <- c:
// ok, carry on
case <-ctx.Done():
return
}
}()
return out, errc, nil
}
// Publish publishes a set of messages on a Google Cloud Pub/Sub topic.
// It blocks until all the messages are successfully published or an error occurred.
//
// To receive messages published to a topic, you must create a subscription to that topic.
// Only messages published to the topic after the subscription is created are available to subscriber applications.
//
// See https://cloud.google.com/pubsub/docs/publisher to find out more about how Google Cloud Pub/Sub Publishers work.
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
// ...
Subscribing
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/subscriber.go
// ...
// Subscribe consumes Google Cloud Pub/Sub and outputs them as Waterfall Message objects on the returned channel.
//
// In Google Cloud Pub/Sub, it is impossible to subscribe directly to a topic. Instead, a *subscription* is used.
// Each subscription has one topic, but there may be multiple subscriptions to one topic (with different names).
//
// The `topic` argument is transformed into subscription name with the configured `GenerateSubscriptionName` function.
// By default, if the subscription or topic don't exist, the are created. This behavior may be changed in the config.
//
// Be aware that in Google Cloud Pub/Sub, only messages sent after the subscription was created can be consumed.
//
// See https://cloud.google.com/pubsub/docs/subscriber to find out more about how Google Cloud Pub/Sub Subscriptions work.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...
Marshaler
Watermill’s messages cannot be directly sent to Google Cloud Pub/Sub - they need to be marshaled. You can implement your marshaler or use the default implementation.
Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/marshaler.go
// ...
// Marshaler transforms a Waterfall Message into the Google Cloud client library Message.
type Marshaler interface {
Marshal(topic string, msg *message.Message) (*pubsub.Message, error)
}
// Unmarshaler transforms a Google Cloud client library Message into the Waterfall Message.
type Unmarshaler interface {
Unmarshal(*pubsub.Message) (*message.Message, error)
}
// UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID.
const UUIDHeaderKey = "_watermill_message_uuid"
// GoogleMessageIDHeaderKey is the key of the Pub/Sub attribute that carries Google Cloud Message ID.
// This ID is assigned by the server when the message is published and is guaranteed to be unique within the topic.
const GoogleMessageIDHeaderKey = "_watermill_message_google_message_id"
// DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way:
// All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata.
// Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key.
type DefaultMarshalerUnmarshaler struct{}
// ...