Firestore Pub/Sub

Cloud Firestore is a cloud-hosted, NoSQL database from Google.

This Pub/Sub comes with two publishers. To publish messages in a transaction use the TransactionalPublisher. If you do not want to publish messages in transaction use the normal Publisher.

Using Firestore as a Pub/Sub instead of using a dedicated Pub/Sub system can be useful to publish messages in transaction while at the same time saving other data in Firestore. Thanks to that the data and the messages can be consistently persisted. If the messages and the data weren’t being published transactionally you could end up in situations where messages were emitted even though the data wasn’t saved or messages weren’t emitted even though the data was saved. After transactionally publishing messages in Firestore you can then subscribe to them and relay them to a different Pub/Sub system.

Godoc: https://pkg.go.dev/github.com/ThreeDotsLabs/watermill-firestore

Firestore documentation: https://firebase.google.com/docs/firestore/

Installation

go get github.com/ThreeDotsLabs/watermill-firestore

Characteristics

FeatureImplementsNote
ConsumerGroupsyes
ExactlyOnceDeliveryno
GuaranteedOrderno
Persistentyes

Configuration

Publisher configuration

// ...
type PublisherConfig struct {
	// ProjectID is an ID of a Google Cloud project with Firestore database.
	ProjectID string

	// PubSubRootCollection is a name of a collection which will be used as a root collection for the PubSub.
	// It defaults to "pubsub".
	PubSubRootCollection string

	// MessagePublishTimeout is a timeout used for a single `Publish` call.
	// It defaults to 1 minute.
	MessagePublishTimeout time.Duration

	// SubscriptionsCacheValidityDuration is used for internal subscriptions cache
	// in order to reduce fetch calls to Firestore on each `Publish` method call.
	//
	// If you prefer to not cache subscriptions and fetch them each time `Publish`
	// is called, please set `DontCacheSubscriptions` to true.
	//
	// It defaults to 500 milliseconds.
	SubscriptionsCacheValidityDuration time.Duration

	// DontCacheSubscriptions should be set to true when you don't want
	// Publisher to keep an internal cache of subscribers.
	DontCacheSubscriptions bool

	// GoogleClientOpts are options passed directly to firestore client.
	GoogleClientOpts []option.ClientOption

	// Marshaler marshals message from Watermill to Firestore format and vice versa.
	Marshaler Marshaler

	// CustomFirestoreClient can be used to override a default client.
	CustomFirestoreClient client
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-firestore/pkg/firestore/publisher.go

Subscriber configuration

// ...
type SubscriberConfig struct {
	// ProjectID is an ID of a Google Cloud project with Firestore database.
	ProjectID string

	// GenerateSubscriptionName should accept topic name and construct a subscription name basing on it.
	//
	// It defaults to topic -> topic + "_sub".
	GenerateSubscriptionName GenerateSubscriptionNameFn

	// PubSubRootCollection is a name of a collection which will be used as a root collection for the PubSub.
	//
	// It defaults to "pubsub".
	PubSubRootCollection string

	// Timeout is used for single Firestore operations.
	//
	// It defaults to 30 seconds.
	Timeout time.Duration

	// GoogleClientOpts are options passed directly to firestore client.
	GoogleClientOpts []option.ClientOption

	// Marshaler marshals message from Watermill to Firestore format and vice versa.
	Marshaler Marshaler

	// CustomFirestoreClient can be used to override a default client.
	CustomFirestoreClient client

	// ReadAllPeriod is a period of time between two read-all operations of a subscriber.
	// Read-all operation means that a subscription collection is read and all messages are consumed.
	// It's needed as a workaround for Firestore sometimes ignoring collection changes.
	// Thanks to that we're sure that all messages are consumed with at most ReadAllPeriod delay.
	//
	// It defaults to 10 seconds.
	ReadAllPeriod time.Duration
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-firestore/pkg/firestore/subscriber.go

Subscription name

To receive messages published to a topic, you must create a subscription to that topic. Only messages published to the topic after the subscription is created will be received by the subscribers.

A topic can have multiple subscriptions, but a given subscription belongs to a single topic.

In Watermill, the subscription is created automatically during calling Subscribe(). Subscription name is generated by function passed to SubscriberConfig.GenerateSubscriptionName. By default, it is just the topic name with a suffix _sub appended to it.

If you want to consume messages from a topic with multiple subscribers processing the incoming messages in a different way, you should use a custom function to generate unique subscription names for each subscriber.

Marshaler

Watermill’s messages cannot be stored directly in Firestore. The marshaler is responsible for converting them to a type which can be stored by Firestore. The default implementation should be enough for most applications so it is unlikely that you need to implement your own marshaler.

// ...
// Marshaler marshals and unmarshals Watermill messages for storage in
// Firestore.
type Marshaler interface {
	// Marshal should return a data type which is supported by Firestore.
	// See the docstring on cloud.google.com/go/firestore.DocumentRef.Create.
	Marshal(msg *message.Message) (interface{}, error)

	// Unmarshal should set the following message fields: UUID, Metadata,
	// Payload.
	Unmarshal(doc *firestore.DocumentSnapshot) (*message.Message, error)
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-firestore/pkg/firestore/marshaler.go


Check our online hands-on training