SQL

Pub/Sub based on MySQL or PostgreSQL..

SQL

SQL Pub/Sub executes queries on any SQL database, using it like a messaging system. At the moment, MySQL and PostgreSQL are supported.

While the performance of this approach isn’t the best, it fits many use cases, where eventual consistency is acceptable. It can also be useful for projects that are not using any specialized message queue at the moment, but have access to a SQL database.

The SQL subscriber runs a SELECT query within short periods, remembering the position of the last record. If it finds any new records, they are returned. One handy use case is consuming events from a database table, that can be later published on some kind of message queue.

The SQL publisher simply inserts consumed messages into the chosen table. A common approach would be to use it as a persistent log of events that were published on a queue with short message expiration time.

SQL Pub/Sub is also a good choice for implementing Outbox pattern with Forwarder component.

See also the SQL example.

Installation

go get github.com/ThreeDotsLabs/watermill-sql/v2

Characteristics

FeatureImplementsNote
ConsumerGroupsyesSee ConsumerGroup in SubscriberConfig
ExactlyOnceDeliveryyes*Just for MySQL implementation
GuaranteedOrderyes
Persistentyes

Schema

SQL Pub/Sub uses user-defined schema to handle select and insert queries. You need to implement SchemaAdapter and pass it to SubscriberConfig or PublisherConfig.

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/schema_adapter_mysql.go

// ...
// DefaultMySQLSchema is a default implementation of SchemaAdapter based on MySQL.
// If you need some customization, you can use composition to change schema and method of unmarshaling.
//
//	type MyMessagesSchema struct {
//		DefaultMySQLSchema
//	}
//
//	func (m MyMessagesSchema) SchemaInitializingQueries(topic string) []string {
//		createMessagesTable := strings.Join([]string{
//			"CREATE TABLE IF NOT EXISTS " + m.MessagesTable(topic) + " (",
//			"`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,",
//			"`uuid` BINARY(16) NOT NULL,",
//			"`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,",
//			"`payload` JSON DEFAULT NULL,",
//			"`metadata` JSON DEFAULT NULL",
//			");",
//		}, "\n")
//
//		return []string{createMessagesTable}
//	}
//
//	func (m MyMessagesSchema) UnmarshalMessage(row *sql.Row) (offset int, msg *message.Message, err error) {
//		// ...
//
// For debugging your custom schema, we recommend to inject logger with trace logging level
// which will print all SQL queries.
type DefaultMySQLSchema struct {
// ...

There is a default schema provided for each supported engine (DefaultMySQLSchema and DefaultPostgreSQLSchema). It supports the most common use case (storing events in a table). You can base your schema on one of these, extending only chosen methods.

Extending schema

Consider an example project, where you’re fine with using the default schema, but would like to use BINARY(16) for storing the uuid column, instead of VARCHAR(36). In that case, you have to define two methods:

  • SchemaInitializingQueries that creates the table.
  • UnmarshalMessage method that produces a Message from the database record.

Note that you don’t have to use the initialization queries provided by Watermill. They will be run only if you set the InitializeSchema field to true in the config. Otherwise, you can use your own solution for database migrations.

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/schema_adapter_mysql.go

// ...
// DefaultMySQLSchema is a default implementation of SchemaAdapter based on MySQL.
// If you need some customization, you can use composition to change schema and method of unmarshaling.
//
//	type MyMessagesSchema struct {
//		DefaultMySQLSchema
//	}
//
//	func (m MyMessagesSchema) SchemaInitializingQueries(topic string) []string {
//		createMessagesTable := strings.Join([]string{
//			"CREATE TABLE IF NOT EXISTS " + m.MessagesTable(topic) + " (",
//			"`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,",
//			"`uuid` BINARY(16) NOT NULL,",
//			"`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,",
//			"`payload` JSON DEFAULT NULL,",
//			"`metadata` JSON DEFAULT NULL",
//			");",
//		}, "\n")
//
//		return []string{createMessagesTable}
//	}
//
//	func (m MyMessagesSchema) UnmarshalMessage(row *sql.Row) (offset int, msg *message.Message, err error) {
//		// ...
//
// For debugging your custom schema, we recommend to inject logger with trace logging level
// which will print all SQL queries.
type DefaultMySQLSchema struct {
// ...

Configuration

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/publisher.go

// ...
type PublisherConfig struct {
	// SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc.
	SchemaAdapter SchemaAdapter

	// AutoInitializeSchema enables initialization of schema database during publish.
	// Schema is initialized once per topic per publisher instance.
	// AutoInitializeSchema is forbidden if using an ongoing transaction as database handle;
	// That could result in an implicit commit of the transaction by a CREATE TABLE statement.
	AutoInitializeSchema bool
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/subscriber.go

// ...
type SubscriberConfig struct {
	ConsumerGroup string

	// AckDeadline is the time to wait for acking a message.
	// If message is not acked within this time, it will be nacked and re-delivered.
	//
	// When messages are read in bulk, this time is calculated for each message separately.
	//
	// If you want to disable ack deadline, set it to 0.
	// Warning: when ack deadline is disabled, messages which are not acked may block PostgreSQL subscriber from reading new messages
	// due to not increasing `pg_snapshot_xmin(pg_current_snapshot())` value.
	//
	// Must be non-negative. Nil value defaults to 30s.
	AckDeadline *time.Duration

	// PollInterval is the interval to wait between subsequent SELECT queries, if no more messages were found in the database (Prefer using the BackoffManager instead).
	// Must be non-negative. Defaults to 1s.
	PollInterval time.Duration

	// ResendInterval is the time to wait before resending a nacked message.
	// Must be non-negative. Defaults to 1s.
	ResendInterval time.Duration

	// RetryInterval is the time to wait before resuming querying for messages after an error (Prefer using the BackoffManager instead).
	// Must be non-negative. Defaults to 1s.
	RetryInterval time.Duration

	// BackoffManager defines how much to backoff when receiving errors.
	BackoffManager BackoffManager

	// SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc.
	SchemaAdapter SchemaAdapter

	// OffsetsAdapter provides mechanism for saving acks and offsets of consumers.
	OffsetsAdapter OffsetsAdapter

	// InitializeSchema option enables initializing schema on making subscription.
	InitializeSchema bool
}
// ...

Publishing

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/publisher.go

// ...
func NewPublisher(db ContextExecutor, config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/publisher.go

// ...
// Publish inserts the messages as rows into the MessagesTable.
// Order is guaranteed for messages within one call.
// Publish is blocking until all rows have been added to the Publisher's transaction.
// Publisher doesn't guarantee publishing messages in a single transaction,
// but the constructor accepts both *sql.DB and *sql.Tx, so transactions may be handled upstream by the user.
func (p *Publisher) Publish(topic string, messages ...*message.Message) (err error) {
// ...

Transactions

If you need to publish messages within a database transaction, you have to pass a *sql.Tx in the NewPublisher constructor. You have to create one publisher for each transaction.

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events/main.go

// ...
func simulateEvents(db *stdSQL.DB) {
	for {
		tx, err := db.Begin()
		if err != nil {
			panic(err)
		}

		// In an actual application, this is the place where some aggregate would be persisted
		// using the same transaction.
		// tx.Exec("INSERT INTO (...)")

		err = publishEvent(tx)
		if err != nil {
			rollbackErr := tx.Rollback()
			if rollbackErr != nil {
				panic(rollbackErr)
			}
			panic(err)
		}

		err = tx.Commit()
		if err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
	}
}

// publishEvent publishes a new event.
// To publish the event in a separate transaction, a new SQL Publisher
// has to be created each time, passing the proper transaction handle.
func publishEvent(tx *stdSQL.Tx) error {
	pub, err := sql.NewPublisher(tx, sql.PublisherConfig{
		SchemaAdapter: sql.DefaultMySQLSchema{},
	}, logger)
	if err != nil {
		return err
	}

	e := event{
		Name:       "UserSignedUp",
		OccurredAt: time.Now().UTC().Format(time.RFC3339),
	}
	payload, err := json.Marshal(e)
	if err != nil {
		return err
	}

	return pub.Publish(mysqlTable, message.NewMessage(
		watermill.NewUUID(),
		payload,
	))
// ...

Subscribing

To create a subscriber, you need to pass not only proper schema adapter, but also an offsets adapter.

  • For MySQL schema use DefaultMySQLOffsetsAdapter
  • For PostgreSQL schema use DefaultPostgreSQLOffsetsAdapter

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/subscriber.go

// ...
func NewSubscriber(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/subscriber.go

// ...
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (o <-chan *message.Message, err error) {
// ...

Offsets Adapter

The logic for storing offsets of messages is provided by the OffsetsAdapter. If your schema uses auto-incremented integer as the row ID, it should work out of the box with default offset adapters.

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/offsets_adapter.go

// ...
type OffsetsAdapter interface {
	// AckMessageQuery the SQL query and arguments that will mark a message as read for a given consumer group.
	AckMessageQuery(topic string, row Row, consumerGroup string) Query

	// ConsumedMessageQuery will return the SQL query and arguments which be executed after consuming message,
	// but before ack.
	//
	// ConsumedMessageQuery is optional, and will be not executed if query is empty.
	ConsumedMessageQuery(topic string, row Row, consumerGroup string, consumerULID []byte) Query

	// NextOffsetQuery returns the SQL query and arguments which should return offset of next message to consume.
	NextOffsetQuery(topic, consumerGroup string) Query

	// SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS)
	// that the appropriate tables exist to write messages to the given topic.
	SchemaInitializingQueries(topic string) []Query

	// BeforeSubscribingQueries returns queries which will be executed before subscribing to a topic.
	// All queries will be executed in a single transaction.
	BeforeSubscribingQueries(topic string, consumerGroup string) []Query
}