SQL Pub/Sub executes queries on any SQL database, using it like a messaging system. At the moment, MySQL and PostgreSQL are supported.
It be useful for projects that are not using any specialized message queue at the moment, but have access to a SQL database.
The SQL subscriber runs a SELECT query within short periods, remembering the position of the last record. If it finds
any new records, they are returned. One handy use case is consuming events from a database table, that can be later published
on some kind of message queue.
The SQL publisher simply inserts consumed messages into the chosen table. A common approach would be to use it as a persistent
log of events that were published on a queue with short message expiration time.
SQL Pub/Sub is also a good choice for implementing Outbox pattern with Forwarder
component.
SQL Pub/Sub uses user-defined schema to handle select and insert queries. You need to implement SchemaAdapter and pass
it to SubscriberConfig or PublisherConfig.
There is a default schema provided for each supported engine (DefaultMySQLSchema and DefaultPostgreSQLSchema).
It supports the most common use case (storing events in a table). You can base your schema on one of these, extending only chosen methods.
Consider an example project, where you’re fine with using the default schema, but would like to use BINARY(16) for storing
the uuid column, instead of VARCHAR(36). In that case, you have to define two methods:
SchemaInitializingQueries that creates the table.
UnmarshalMessage method that produces a Message from the database record.
Note that you don’t have to use the initialization queries provided by Watermill. They will be run only if you set the
InitializeSchema field to true in the config. Otherwise, you can use your own solution for database migrations.
If you need to publish messages within a database transaction, you have to pass a *sql.Tx in the NewPublisher
constructor. You have to create one publisher for each transaction.
The logic for storing offsets of messages is provided by the OffsetsAdapter. If your schema uses auto-incremented integer as the row ID,
it should work out of the box with default offset adapters.
Instead of the default Pub/Sub schema, you can use the queue schema and offsets adapters.
It’s a simpler schema that doesn’t support consumer groups.
However, it has other advantages.
It lets you specify a custom WHERE clause for getting the messages.
You can use it to filter messages by some condition in the payload or in the metadata.
Additionally, you can choose to delete messages from the table after they are acknowledged.
Thanks to this, the table doesn’t grow in size with time.
Currently, this schema is supported only for PostgreSQL.
Using last processed transaction ID in PostgreSQL to ensure no messages are lost#
In some cases, PostgreSQL SERIAL is not incremental.
The SERIAL value is generated while the transaction is in progress, not when it is committed.
If transactions are committed in a different order than they were started, message offsets based on SERIAL values will not be incremental.
To keep storing acknowledgment information efficient, Watermill keeps only the last message’s acknowledgment information.
To ensure no messages are missed when a message order is not kept, Watermill also uses the transaction ID to ensure no message is lost.
For more details, see Watermill#311
.
It is important to note that very long-running transactions may result in delayed message delivery.
For instance, if a transaction is running for an hour, no messages will be delivered until the transaction is committed.
While we do not recommend the use of such long transactions, if they are necessary, we advise the use of the Queue schema adapter
, which does not depend on the transaction ID.
You have nothing to worry about if you don’t have such long transactions.
If you are migrating your data to a new database, you may need to set last_processed_transaction_id in your offsets table.