Google Cloud Pub/Sub

The fully-managed real-time messaging service from Google.

Google Cloud Pub/Sub

Cloud Pub/Sub brings the flexibility and reliability of enterprise message-oriented middleware to the cloud.

At the same time, Cloud Pub/Sub is a scalable, durable event ingestion and delivery system that serves as a foundation for modern stream analytics pipelines. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independently written applications.

Cloud Pub/Sub delivers low-latency, durable messaging that helps developers quickly integrate systems hosted on the Google Cloud Platform and externally.

Documentation: https://cloud.google.com/pubsub/docs/

Installation

go get github.com/ThreeDotsLabs/watermill-googlecloud

Characteristics

FeatureImplementsNote
ConsumerGroupsyesmultiple subscribers within the same Subscription name
ExactlyOnceDeliveryno
GuaranteedOrderno
Persistentyes*maximum retention time is 7 days

Configuration

Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/publisher.go

// ...
type PublisherConfig struct {
	// ProjectID is the Google Cloud Engine project ID.
	ProjectID string

	// If false (default), `Publisher` tries to create a topic if there is none with the requested name.
	// Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
	DoNotCreateTopicIfMissing bool
	// Enables the topic message ordering
	EnableMessageOrdering bool
	// Enables automatic resume publish upon error
	EnableMessageOrderingAutoResumePublishOnError bool

	// ConnectTimeout defines the timeout for connecting to Pub/Sub
	ConnectTimeout time.Duration
	// PublishTimeout defines the timeout for publishing messages.
	PublishTimeout time.Duration

	// Settings for cloud.google.com/go/pubsub client library.
	PublishSettings *pubsub.PublishSettings
	ClientOptions   []option.ClientOption

	Marshaler Marshaler
}

func (c *PublisherConfig) setDefaults() {
	if c.Marshaler == nil {
		c.Marshaler = DefaultMarshalerUnmarshaler{}
	}
	if c.ConnectTimeout == 0 {
		c.ConnectTimeout = time.Second * 10
	}
	if c.PublishTimeout == 0 {
		c.PublishTimeout = time.Second * 5
	}
}

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/subscriber.go

// ...
type SubscriberConfig struct {
	// GenerateSubscriptionName generates subscription name for a given topic.
	// The subscription connects the topic to a subscriber application that receives and processes
	// messages published to the topic.
	//
	// By default, subscriptions expire after 31 days of inactivity.
	//
	// A topic can have multiple subscriptions, but a given subscription belongs to a single topic.
	GenerateSubscriptionName SubscriptionNameFn

	// ProjectID is the Google Cloud Engine project ID.
	ProjectID string

	// TopicProjectID is an optionnal configuration value representing
	// the underlying topic Google Cloud Engine project ID.
	// This can be helpful when subscription is linked to a topic for another project.
	TopicProjectID string

	// If false (default), `Subscriber` tries to create a subscription if there is none with the requested name.
	// Otherwise, trying to use non-existent subscription results in `ErrSubscriptionDoesNotExist`.
	DoNotCreateSubscriptionIfMissing bool

	// If false (default), `Subscriber` tries to create a topic if there is none with the requested name
	// and it is trying to create a new subscription with this topic name.
	// Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`.
	DoNotCreateTopicIfMissing bool

	// deprecated: ConnectTimeout is no longer used, please use timeout on context in Subscribe() method
	ConnectTimeout time.Duration

	// InitializeTimeout defines the timeout for initializing topics.
	InitializeTimeout time.Duration

	// Settings for cloud.google.com/go/pubsub client library.
	ReceiveSettings    pubsub.ReceiveSettings
	SubscriptionConfig pubsub.SubscriptionConfig
	ClientOptions      []option.ClientOption

	// Unmarshaler transforms the client library format into watermill/message.Message.
	// Use a custom unmarshaler if needed, otherwise the default Unmarshaler should cover most use cases.
	Unmarshaler Unmarshaler
}

func (sc SubscriberConfig) topicProjectID() string {
	if sc.TopicProjectID != "" {
		return sc.TopicProjectID
	}

	return sc.ProjectID
}

type SubscriptionNameFn func(topic string) string

// TopicSubscriptionName uses the topic name as the subscription name.
func TopicSubscriptionName(topic string) string {
	return topic
}

// TopicSubscriptionNameWithSuffix uses the topic name with a chosen suffix as the subscription name.
func TopicSubscriptionNameWithSuffix(suffix string) SubscriptionNameFn {
	return func(topic string) string {
		return topic + suffix
	}
}

func (c *SubscriberConfig) setDefaults() {
	if c.GenerateSubscriptionName == nil {
		c.GenerateSubscriptionName = TopicSubscriptionName
	}
	if c.InitializeTimeout == 0 {
		c.InitializeTimeout = time.Second * 10
	}
	if c.Unmarshaler == nil {
		c.Unmarshaler = DefaultMarshalerUnmarshaler{}
	}
}

func NewSubscriber(
// ...
Subscription name

To receive messages published to a topic, you must create a subscription to that topic. Only messages published to the topic after the subscription is created are available to subscriber applications.

The subscription connects the topic to a subscriber application that receives and processes messages published to the topic.

A topic can have multiple subscriptions, but a given subscription belongs to a single topic.

In Watermill, the subscription is created automatically during calling Subscribe(). Subscription name is generated by function passed to SubscriberConfig.GenerateSubscriptionName. By default, it is just the topic name (TopicSubscriptionName).

When you want to consume messages from a topic with multiple subscribers, you should use TopicSubscriptionNameWithSuffix or your custom function to generate the subscription name.

Connecting

Watermill will connect to the instance of Google Cloud Pub/Sub indicated by the environment variables. For production setup, set the GOOGLE_APPLICATION_CREDENTIALS env, as described in the official Google Cloud Pub/Sub docs. Note that you won’t need to install the Cloud SDK, as Watermill will take care of the administrative tasks (creating topics/subscriptions) with the default settings and proper permissions.

For development, you can use a Docker image with the emulator and the PUBSUB_EMULATOR_HOST env (check out the Getting Started guide).

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go

// ...
	publisher, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{
		ProjectID: "test-project",
	}, logger)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/googlecloud/main.go

// ...
	subscriber, err := googlecloud.NewSubscriber(
		googlecloud.SubscriberConfig{
			// custom function to generate Subscription Name,
			// there are also predefined TopicSubscriptionName and TopicSubscriptionNameWithSuffix available.
			GenerateSubscriptionName: func(topic string) string {
				return "test-sub_" + topic
			},
			ProjectID: "test-project",
		},
		logger,
	)
	if err != nil {
		panic(err)
	}
// ...

Publishing

Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/publisher.go

// ...
	// PublishTimeout defines the timeout for publishing messages.
	PublishTimeout time.Duration

	// Settings for cloud.google.com/go/pubsub client library.
	PublishSettings *pubsub.PublishSettings
	ClientOptions   []option.ClientOption

	Marshaler Marshaler
}

func (c *PublisherConfig) setDefaults() {
	if c.Marshaler == nil {
		c.Marshaler = DefaultMarshalerUnmarshaler{}
	}
	if c.ConnectTimeout == 0 {
		c.ConnectTimeout = time.Second * 10
	}
	if c.PublishTimeout == 0 {
		c.PublishTimeout = time.Second * 5
	}
}

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
	config.setDefaults()

	if logger == nil {
		logger = watermill.NopLogger{}
	}

	pub := &Publisher{
		topics: map[string]*pubsub.Topic{},
		config: config,
		logger: logger,
	}

	ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout)
	defer cancel()

	cc, errc, err := connect(ctx, config)
	if err != nil {
		return nil, err
	}

	select {
	case <-ctx.Done():
		return nil, ErrConnectTimeout
	case pub.client = <-cc:
	case err = <-errc:
		return nil, err
	}

	return pub, nil
}

func connect(ctx context.Context, config PublisherConfig) (<-chan *pubsub.Client, <-chan error, error) {
	out := make(chan *pubsub.Client)
	errc := make(chan error, 1)

	go func() {
		defer close(out)
		defer close(errc)

		// blocking
		c, err := pubsub.NewClient(context.Background(), config.ProjectID, config.ClientOptions...)
		if err != nil {
			errc <- err
			return
		}
		select {
		case out <- c:
			// ok, carry on
		case <-ctx.Done():
			return
		}

	}()

	return out, errc, nil
}

// Publish publishes a set of messages on a Google Cloud Pub/Sub topic.
// It blocks until all the messages are successfully published or an error occurred.
//
// To receive messages published to a topic, you must create a subscription to that topic.
// Only messages published to the topic after the subscription is created are available to subscriber applications.
//
// See https://cloud.google.com/pubsub/docs/publisher to find out more about how Google Cloud Pub/Sub Publishers work.
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
// ...

Subscribing

Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/subscriber.go

// ...
// Subscribe consumes Google Cloud Pub/Sub and outputs them as Waterfall Message objects on the returned channel.
//
// In Google Cloud Pub/Sub, it is impossible to subscribe directly to a topic. Instead, a *subscription* is used.
// Each subscription has one topic, but there may be multiple subscriptions to one topic (with different names).
//
// The `topic` argument is transformed into subscription name with the configured `GenerateSubscriptionName` function.
// By default, if the subscription or topic don't exist, the are created. This behavior may be changed in the config.
//
// Be aware that in Google Cloud Pub/Sub, only messages sent after the subscription was created can be consumed.
//
// See https://cloud.google.com/pubsub/docs/subscriber to find out more about how Google Cloud Pub/Sub Subscriptions work.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...

Marshaler

Watermill’s messages cannot be directly sent to Google Cloud Pub/Sub - they need to be marshaled. You can implement your marshaler or use the default implementation.

Full source: github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud/marshaler.go

// ...
// Marshaler transforms a Waterfall Message into the Google Cloud client library Message.
type Marshaler interface {
	Marshal(topic string, msg *message.Message) (*pubsub.Message, error)
}

// Unmarshaler transforms a Google Cloud client library Message into the Waterfall Message.
type Unmarshaler interface {
	Unmarshal(*pubsub.Message) (*message.Message, error)
}

// UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID.
const UUIDHeaderKey = "_watermill_message_uuid"

// DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way:
// All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata.
// Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key.
type DefaultMarshalerUnmarshaler struct{}
// ...