SQL

SQL Pub/Sub executes queries on any SQL database, using it like a messaging system. At the moment, MySQL and PostgreSQL are supported.

While the performance of this approach isn’t the best, it fits many use cases, where eventual consistency is acceptable. It can also be useful for projects that are not using any specialized message queue at the moment, but have access to a SQL database.

The SQL subscriber runs a SELECT query within short periods, remembering the position of the last record. If it finds any new records, they are returned. One handy use case is consuming events from a database table, that can be later published on some kind of message queue.

The SQL publisher simply inserts consumed messages into the chosen table. A common approach would be to use it as a persistent log of events that were published on a queue with short message expiration time.

SQL Pub/Sub is also a good choice for implementing Outbox pattern with Forwarder component.

See also the SQL example .

Installation

go get github.com/ThreeDotsLabs/watermill-sql/v3

Characteristics

FeatureImplementsNote
ConsumerGroupsyesSee ConsumerGroup in SubscriberConfig (not supported by the queue schema)
ExactlyOnceDeliveryyes*Just for MySQL implementation
GuaranteedOrderyes
Persistentyes

Schema

SQL Pub/Sub uses user-defined schema to handle select and insert queries. You need to implement SchemaAdapter and pass it to SubscriberConfig or PublisherConfig.

// ...
// DefaultMySQLSchema is a default implementation of SchemaAdapter based on MySQL.
// If you need some customization, you can use composition to change schema and method of unmarshaling.
//
//	type MyMessagesSchema struct {
//		DefaultMySQLSchema
//	}
//
//	func (m MyMessagesSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) []string {
//		createMessagesTable := strings.Join([]string{
//			"CREATE TABLE IF NOT EXISTS " + m.MessagesTable(topic) + " (",
//			"`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,",
//			"`uuid` BINARY(16) NOT NULL,",
//			"`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,",
//			"`payload` JSON DEFAULT NULL,",
//			"`metadata` JSON DEFAULT NULL",
//			");",
//		}, "\n")
//
//		return []string{createMessagesTable}
//	}
//
//	func (m MyMessagesSchema) UnmarshalMessage(params UnmarshalMessageParams) (offset int, msg *message.Message, err error) {
//		// ...
//
// For debugging your custom schema, we recommend to inject logger with trace logging level
// which will print all SQL queries.
type DefaultMySQLSchema struct {
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/schema_adapter_mysql.go

There is a default schema provided for each supported engine (DefaultMySQLSchema and DefaultPostgreSQLSchema). It supports the most common use case (storing events in a table). You can base your schema on one of these, extending only chosen methods.

Extending schema

Consider an example project, where you’re fine with using the default schema, but would like to use BINARY(16) for storing the uuid column, instead of VARCHAR(36). In that case, you have to define two methods:

  • SchemaInitializingQueries that creates the table.
  • UnmarshalMessage method that produces a Message from the database record.

Note that you don’t have to use the initialization queries provided by Watermill. They will be run only if you set the InitializeSchema field to true in the config. Otherwise, you can use your own solution for database migrations.

// ...
// DefaultMySQLSchema is a default implementation of SchemaAdapter based on MySQL.
// If you need some customization, you can use composition to change schema and method of unmarshaling.
//
//	type MyMessagesSchema struct {
//		DefaultMySQLSchema
//	}
//
//	func (m MyMessagesSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) []string {
//		createMessagesTable := strings.Join([]string{
//			"CREATE TABLE IF NOT EXISTS " + m.MessagesTable(topic) + " (",
//			"`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,",
//			"`uuid` BINARY(16) NOT NULL,",
//			"`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,",
//			"`payload` JSON DEFAULT NULL,",
//			"`metadata` JSON DEFAULT NULL",
//			");",
//		}, "\n")
//
//		return []string{createMessagesTable}
//	}
//
//	func (m MyMessagesSchema) UnmarshalMessage(params UnmarshalMessageParams) (offset int, msg *message.Message, err error) {
//		// ...
//
// For debugging your custom schema, we recommend to inject logger with trace logging level
// which will print all SQL queries.
type DefaultMySQLSchema struct {
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/schema_adapter_mysql.go

Configuration

// ...
type PublisherConfig struct {
	// SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc.
	SchemaAdapter SchemaAdapter

	// AutoInitializeSchema enables initialization of schema database during publish.
	// Schema is initialized once per topic per publisher instance.
	// AutoInitializeSchema is forbidden if using an ongoing transaction as database handle;
	// That could result in an implicit commit of the transaction by a CREATE TABLE statement.
	AutoInitializeSchema bool
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/publisher.go

// ...
type SubscriberConfig struct {
	ConsumerGroup string

	// AckDeadline is the time to wait for acking a message.
	// If message is not acked within this time, it will be nacked and re-delivered.
	//
	// When messages are read in bulk, this time is calculated for each message separately.
	//
	// If you want to disable ack deadline, set it to 0.
	// Warning: when ack deadline is disabled, messages which are not acked may block PostgreSQL subscriber from reading new messages
	// due to not increasing `pg_snapshot_xmin(pg_current_snapshot())` value.
	//
	// Must be non-negative. Nil value defaults to 30s.
	AckDeadline *time.Duration

	// PollInterval is the interval to wait between subsequent SELECT queries, if no more messages were found in the database (Prefer using the BackoffManager instead).
	// Must be non-negative. Defaults to 1s.
	PollInterval time.Duration

	// ResendInterval is the time to wait before resending a nacked message.
	// Must be non-negative. Defaults to 1s.
	ResendInterval time.Duration

	// RetryInterval is the time to wait before resuming querying for messages after an error (Prefer using the BackoffManager instead).
	// Must be non-negative. Defaults to 1s.
	RetryInterval time.Duration

	// BackoffManager defines how much to backoff when receiving errors.
	BackoffManager BackoffManager

	// SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc.
	SchemaAdapter SchemaAdapter

	// OffsetsAdapter provides mechanism for saving acks and offsets of consumers.
	OffsetsAdapter OffsetsAdapter

	// InitializeSchema option enables initializing schema on making subscription.
	InitializeSchema bool
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/subscriber.go

Publishing

// ...
func NewPublisher(db ContextExecutor, config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/publisher.go

Example:

// ...
	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
// Publish inserts the messages as rows into the MessagesTable.
// Order is guaranteed for messages within one call.
// Publish is blocking until all rows have been added to the Publisher's transaction.
// Publisher doesn't guarantee publishing messages in a single transaction,
// but the constructor accepts both *sql.DB and *sql.Tx, so transactions may be handled upstream by the user.
func (p *Publisher) Publish(topic string, messages ...*message.Message) (err error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/publisher.go

Transactions

If you need to publish messages within a database transaction, you have to pass a *sql.Tx in the NewPublisher constructor. You have to create one publisher for each transaction.

Example:

// ...
func simulateEvents(db *stdSQL.DB) {
	for {
		tx, err := db.Begin()
		if err != nil {
			panic(err)
		}

		// In an actual application, this is the place where some aggregate would be persisted
		// using the same transaction.
		// tx.Exec("INSERT INTO (...)")

		err = publishEvent(tx)
		if err != nil {
			rollbackErr := tx.Rollback()
			if rollbackErr != nil {
				panic(rollbackErr)
			}
			panic(err)
		}

		err = tx.Commit()
		if err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
	}
}

// publishEvent publishes a new event.
// To publish the event in a separate transaction, a new SQL Publisher
// has to be created each time, passing the proper transaction handle.
func publishEvent(tx *stdSQL.Tx) error {
	pub, err := sql.NewPublisher(tx, sql.PublisherConfig{
		SchemaAdapter: sql.DefaultMySQLSchema{},
	}, logger)
	if err != nil {
		return err
	}

	e := event{
		Name:       "UserSignedUp",
		OccurredAt: time.Now().UTC().Format(time.RFC3339),
	}
	payload, err := json.Marshal(e)
	if err != nil {
		return err
	}

	return pub.Publish(mysqlTable, message.NewMessage(
		watermill.NewUUID(),
		payload,
	))
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events/main.go

Subscribing

To create a subscriber, you need to pass not only proper schema adapter, but also an offsets adapter.

  • For MySQL schema use DefaultMySQLOffsetsAdapter
  • For PostgreSQL schema use DefaultPostgreSQLOffsetsAdapter
// ...
func NewSubscriber(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/subscriber.go

Example:

// ...
	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}
// ...

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (o <-chan *message.Message, err error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/subscriber.go

Offsets Adapter

The logic for storing offsets of messages is provided by the OffsetsAdapter. If your schema uses auto-incremented integer as the row ID, it should work out of the box with default offset adapters.

// ...
type OffsetsAdapter interface {
	// AckMessageQuery the SQL query and arguments that will mark a message as read for a given consumer group.
	AckMessageQuery(params AckMessageQueryParams) (Query, error)

	// ConsumedMessageQuery will return the SQL query and arguments which be executed after consuming message,
	// but before ack.
	//
	// ConsumedMessageQuery is optional, and will be not executed if query is empty.
	ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error)

	// NextOffsetQuery returns the SQL query and arguments which should return offset of next message to consume.
	NextOffsetQuery(params NextOffsetQueryParams) (Query, error)

	// SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS)
	// that the appropriate tables exist to write messages to the given topic.
	SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error)

	// BeforeSubscribingQueries returns queries which will be executed before subscribing to a topic.
	// All queries will be executed in a single transaction.
	BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error)
}

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/offsets_adapter.go

Queue

Instead of the default Pub/Sub schema, you can use the queue schema and offsets adapters.

It’s a simpler schema that doesn’t support consumer groups. However, it has other advantages.

It lets you specify a custom WHERE clause for getting the messages. You can use it to filter messages by some condition in the payload or in the metadata.

Additionally, you can choose to delete messages from the table after they are acknowledged. Thanks to this, the table doesn’t grow in size with time.

Currently, this schema is supported only for PostgreSQL.

// ...
// PostgreSQLQueueSchema is a schema adapter for PostgreSQL that allows filtering messages by some condition.
// It DOES NOT support consumer groups.
// It supports deleting messages on ack.
type PostgreSQLQueueSchema struct {
	// GenerateWhereClause is a function that returns a where clause and arguments for the SELECT query.
	// It may be used to filter messages by some condition.
	// If empty, no where clause will be added.
	GenerateWhereClause func(params GenerateWhereClauseParams) (string, []any)

	// GeneratePayloadType is the type of the payload column in the messages table.
	// By default, it's JSON. If your payload is not JSON, you can use BYTEA.
	GeneratePayloadType func(topic string) string

	// GenerateMessagesTableName may be used to override how the messages table name is generated.
	GenerateMessagesTableName func(topic string) string

	// SubscribeBatchSize is the number of messages to be queried at once.
	//
	// Higher value, increases a chance of message re-delivery in case of crash or networking issues.
	// 1 is the safest value, but it may have a negative impact on performance when consuming a lot of messages.
	//
	// Default value is 100.
	SubscribeBatchSize int
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/queue_schema_adapter_postgresql.go

// ...
// PostgreSQLQueueOffsetsAdapter is an OffsetsAdapter for the PostgreSQLQueueSchema.
type PostgreSQLQueueOffsetsAdapter struct {
	// DeleteOnAck determines whether the message should be deleted from the table when it is acknowledged.
	// If false, the message will be marked as acked.
	DeleteOnAck bool

	// GenerateMessagesTableName may be used to override how the messages table name is generated.
	GenerateMessagesTableName func(topic string) string
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/queue_offsets_adapter_postgresql.go


Check our online hands-on training