SQL

SQL-based Pub/Sub.

SQL

SQL Pub/Sub executes queries on any SQL database, using it like a messaging system.

While the performance of this approach isn’t the best, it fits many use cases, where eventual consistency is acceptable. It can also be useful for projects that are not using any specialized message queue at the moment, but have access to a SQL database.

The SQL subscriber runs a SELECT query within short periods, remembering the position of the last record. If it finds any new records, they are returned. One handy use case is consuming events from a database table, that can be later published on some kind of message queue.

The SQL publisher simply inserts consumed messages into the chosen table. A common approach would be to use it as a persistent log of events that were published on a queue with short message expiration time.

See also the SQL example.

Characteristics

FeatureImplementsNote
ConsumerGroupsyesSee ConsumerGroup in SubscriberConfig
ExactlyOnceDeliveryyes
GuaranteedOrderyes
Persistentyes

Schema

SQL Pub/Sub uses user-defined schema to handle select and insert queries. You need to implement SchemaAdapter and pass it to SubscriberConfig or PublisherConfig.

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/schema_adapter_mysql.go

// ...
// DefaultMySQLSchema is a default implementation of SchemaAdapter based on MySQL.
// If you need some customization, you can use composition to change schema and method of unmarshaling.
//
//    type MyMessagesSchema struct {
//        DefaultMySQLSchema
//    }
//
//    func (m MyMessagesSchema) SchemaInitializingQueries(topic string) []string {
//        createMessagesTable := strings.Join([]string{
//            "CREATE TABLE IF NOT EXISTS " + m.MessagesTable(topic) + " (",
//            "`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,",
//            "`uuid` BINARY(16) NOT NULL,",
//            "`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,",
//            "`payload` JSON DEFAULT NULL,",
//            "`metadata` JSON DEFAULT NULL",
//            ");",
//        }, "\n")
//
//        return []string{createMessagesTable}
//    }
//
//    func (m MyMessagesSchema) UnmarshalMessage(row *sql.Row) (offset int, msg *message.Message, err error) {
//        // ...
//
// For debugging your custom schema, we recommend to inject logger with trace logging level
// which will print all SQL queries.
type DefaultMySQLSchema struct {
// ...

There is DefaultSchema defined for most common use case (storing events in a table). You can base your schema on this one, extending only chosen methods.

Extending schema

Consider an example project, where you’re fine with using the default schema, but would like to use BINARY(16) for storing the uuid column, instead of VARCHAR(36). In that case, you have to define two methods:

  • SchemaInitializingQueries that creates the table.
  • UnmarshalMessage method that produces a Message from the database record.

Note that you don’t have to use the initialization queries provided by Watermill. They will be run only if you set the InitializeSchema field to true in the config. Otherwise, you can use your own solution for database migrations.

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/schema_adapter_mysql.go

// ...
// DefaultMySQLSchema is a default implementation of SchemaAdapter based on MySQL.
// If you need some customization, you can use composition to change schema and method of unmarshaling.
//
//    type MyMessagesSchema struct {
//        DefaultMySQLSchema
//    }
//
//    func (m MyMessagesSchema) SchemaInitializingQueries(topic string) []string {
//        createMessagesTable := strings.Join([]string{
//            "CREATE TABLE IF NOT EXISTS " + m.MessagesTable(topic) + " (",
//            "`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,",
//            "`uuid` BINARY(16) NOT NULL,",
//            "`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,",
//            "`payload` JSON DEFAULT NULL,",
//            "`metadata` JSON DEFAULT NULL",
//            ");",
//        }, "\n")
//
//        return []string{createMessagesTable}
//    }
//
//    func (m MyMessagesSchema) UnmarshalMessage(row *sql.Row) (offset int, msg *message.Message, err error) {
//        // ...
//
// For debugging your custom schema, we recommend to inject logger with trace logging level
// which will print all SQL queries.
type DefaultMySQLSchema struct {
// ...

Configuration

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/publisher.go

// ...
type PublisherConfig struct {
    // SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc.
   SchemaAdapter SchemaAdapter

    // AutoInitializeSchema enables initialization of schema database during publish.
   // Schema is initialized once per topic per publisher instance.
   AutoInitializeSchema bool
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/subscriber.go

// ...
type SubscriberConfig struct {
    ConsumerGroup string

    // PollInterval is the interval to wait between subsequent SELECT queries, if no more messages were found in the database.
   // Must be non-negative. Defaults to 1s.
   PollInterval time.Duration

    // ResendInterval is the time to wait before resending a nacked message.
   // Must be non-negative. Defaults to 1s.
   ResendInterval time.Duration

    // RetryInterval is the time to wait before resuming querying for messages after an error.
   // Must be non-negative. Defaults to 1s.
   RetryInterval time.Duration

    // SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc.
   SchemaAdapter SchemaAdapter

    // OffsetsAdapter provides mechanism for saving acks and offsets of consumers.
   OffsetsAdapter OffsetsAdapter

    // InitializeSchema option enables initializing schema on making subscription.
   InitializeSchema bool
}
// ...

Publishing

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/publisher.go

// ...
func NewPublisher(db db, config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
   publisher, err := sql.NewPublisher(
        db,
        sql.PublisherConfig{
            SchemaAdapter: sql.DefaultSchema{},
        },
        logger,
    )
    if err != nil {
        panic(err)
    }
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/publisher.go

// ...
// Publish inserts the messages as rows into the MessagesTable.
// Order is guaranteed for messages within one call.
// Publish is blocking until all rows have been added to the Publisher's transaction.
// Publisher doesn't guarantee publishing messages in a single transaction,
// but the constructor accepts both *sql.DB and *sql.Tx, so transactions may be handled upstream by the user.
func (p *Publisher) Publish(topic string, messages ...*message.Message) (err error) {
// ...

Transactions

If you need to publish messages within a database transaction, you have to pass a *sql.Tx in the NewPublisher constructor. You have to create one publisher for each transaction.

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events/main.go

// ...
func simulateEvents(db *stdSQL.DB) {
    for {
        tx, err := db.Begin()
        if err != nil {
            panic(err)
        }

        err = publishEvent(tx)
        if err != nil {
            rollbackErr := tx.Rollback()
            if rollbackErr != nil {
                panic(rollbackErr)
            }
            panic(err)
        }

        err = tx.Commit()
        if err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}

// publishEvent publishes a new event.
// To publish the event in a separate transaction, a new SQL Publisher
// has to be created each time, passing the proper transaction handle.
func publishEvent(tx *stdSQL.Tx) error {
    pub, err := sql.NewPublisher(tx, sql.PublisherConfig{
        SchemaAdapter: sql.DefaultSchema{},
    }, logger)
    if err != nil {
        return err
    }

    e := event{
        Name:       "UserSignedUp",
        OccurredAt: time.Now().UTC().Format(time.RFC3339),
    }
    payload, err := json.Marshal(e)
    if err != nil {
        return err
    }

    return pub.Publish(mysqlTable, message.NewMessage(
        watermill.NewUUID(),
        payload,
    ))
// ...

Subscribing

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/subscriber.go

// ...
func NewSubscriber(db *sql.DB, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) {
// ...

Example:

Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
   subscriber, err := sql.NewSubscriber(
        db,
        sql.SubscriberConfig{
            SchemaAdapter:    sql.DefaultSchema{},
            OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
            InitializeSchema: true,
        },
        logger,
    )
    if err != nil {
        panic(err)
    }
// ...

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/subscriber.go

// ...
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (o <-chan *message.Message, err error) {
// ...

Offsets Adapter

The logic for storing offsets of messages is provided by the OffsetsAdapter. The MySQL variant is defined and ready to use.

Full source: github.com/ThreeDotsLabs/watermill-sql/pkg/sql/offsets_adapter.go

// ...
type OffsetsAdapter interface {
    // AckMessageQuery the SQL query and arguments that will mark a message as read for a given consumer group.
   AckMessageQuery(topic string, offset int, consumerGroup string) (string, []interface{})

    // ConsumedMessageQuery will return the SQL query and arguments which be executed after consuming message,
   // but before ack.
   //
   // ConsumedMessageQuery is optional, and will be not executed if query is empty.
   ConsumedMessageQuery(topic string, offset int, consumerGroup string, consumerULID []byte) (string, []interface{})

    // NextOffsetQuery returns the SQL query and arguments which should return offset of next message to consume.
   NextOffsetQuery(topic, consumerGroup string) (string, []interface{})

    // SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS)
   // that the appropriate tables exist to write messages to the given topic.
   SchemaInitializingQueries(topic string) []string
}