SQLite
Beta Version Warning: this Pub/Sub is stable, but it has not been widely tested in production environments. It may be sensitive to certain edge cases and combinations of configuration parameters.
SQLite is a C-language library that implements a small, fast, self-contained, high-reliability, full-featured SQL database engine. Our SQLite Pub/Sub implementation provides two CGO-free driver variants optimized for different use cases. Both drivers use pure Go implementations of SQLite, enabling cross-compilation and avoiding CGO dependencies while maintaining full SQLite functionality.
SQLite Pub/Subs provide the easiest way to publish and process events durably, since you do not have to set up or manage a separate database. The database is just a file on disk. Some cloud compute providers offer distributed SQLite clusters, which can provide both durability and unmatched read performance. Tuned SQLite is ~35% faster than the Linux file system.
You can find a fully functional example with SQLite in the Watermill examples .
ModernC vs ZombieZen Driver
The vanilla ModernC driver is compatible with the Golang standard library SQL package and works without CGO. It has fewer dependencies than the ZombieZen variant and uses the modernc.org/sqlite pure Go SQLite implementation. Most users should pick this driver for full compatibility with the standard library.
The advanced ZombieZen driver abandons the standard Golang library SQL conventions in favor of the more orthogonal API and higher performance potential . Under the hood, it also uses the ModernC SQLite3 implementation and does not need CGO. Advanced SQLite users might prefer this driver for its performance benefits. It is about 6 times faster than the ModernC variant. It is currently more stable due to lower level control. It is faster than even the CGO SQLite variants on standard library interfaces, and with some tuning should become the absolute speed champion of persistent message brokers over time.
Characteristics
| Feature | Implements | Note |
|---|---|---|
| ConsumerGroups | yes | See ConsumerGroupMatcher in SubscriberOptions |
| ExactlyOnceDelivery | no | |
| GuaranteedOrder | yes | |
| Persistent | yes |
Like the BoltDB implementation, the SQLite drivers are imbeddable into your application. They do not require any additional infrastructure other than a mounted persistent disk. Their advantage over the BoltDB Pub/Sub is supporting consumer groups and guaranteed order.
Vanilla ModernC Driver
Installation
go get github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc@latestUsage
// ...
import (
"context"
"database/sql"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc"
"github.com/ThreeDotsLabs/watermill/message"
_ "modernc.org/sqlite"
)
func main() {
db := createDB()
defer db.Close()
logger := watermill.NewStdLogger(false, false)
subscriber, err := wmsqlitemodernc.NewSubscriber(
db,
wmsqlitemodernc.SubscriberOptions{
InitializeSchema: true,
Logger: logger,
},
)
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite/main.go
Configuration
// ...
type PublisherOptions struct {
// TableNameGenerators is a set of functions that generate table names for topics and offsets.
// Defaults to [TableNameGenerators.WithDefaultGeneratorsInsteadOfNils].
TableNameGenerators TableNameGenerators
// InitializeSchema enables initialization of schema database during publish.
// Schema is initialized once per topic per publisher instance.
// InitializeSchema is forbidden if using an ongoing transaction as database handle.
// It could result in an implicit commit of the transaction by a CREATE TABLE statement.
InitializeSchema bool
// Logger reports message publishing errors and traces. Defaults value is [watermill.NopLogger].
Logger watermill.LoggerAdapter
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc/publisher.go
// ...
type SubscriberOptions struct {
// ConsumerGroupMatcher differentiates message consumers within the same topic.
// Messages are processed in batches.
// Therefore, another subscriber with the same consumer group name may only obtain
// messages whenever it is able to acquire the row lock.
// Default value is a static consumer group matcher that
// always returns [DefaultConsumerGroupName].
ConsumerGroupMatcher ConsumerGroupMatcher
// BatchSize is the number of messages to read in a single batch.
// Default value is [DefaultMessageBatchSize].
BatchSize int
// TableNameGenerators is a set of functions that generate table names for topics and offsets.
// Default value is [TableNameGenerators.WithDefaultGeneratorsInsteadOfNils].
TableNameGenerators TableNameGenerators
// PollInterval is the interval to wait between subsequent SELECT queries, if
// no more messages were found in the database (Prefer using the BackoffManager instead).
// Must be non-negative. Defaults to one second.
PollInterval time.Duration
// LockTimeout is the maximum duration of the row lock. If the subscription
// is unable to extend the lock before this time out ends, the lock will expire.
// Then, another subscriber in the same consumer group name may
// acquire the lock and continue processing messages.
//
// Duration must not be less than one second, because seconds are added
// to the SQLite `unixepoch` function, rounded to the nearest second.
// A zero duration would create a lock that expires immediately.
// There is no reason to set higher precision fractional duration,
// because the lock timeout will rarely ever trigger in a healthy system.
//
// Normally, the row lock is set to zero after each batch of messages is processed.
// LockTimeout might occur if a consuming node shuts down unexpectedly,
// before it is able to complete processing a batch of messages. Only
// in such rare cases the time out matters. And, it is better to set it
// to a higher value in order to avoid unnecessary batch re-processing.
// Therefore, a value lower than one second is impractical.
//
// Default value is [DefaultLockTimeout].
LockTimeout time.Duration
// AckDeadline is the time to wait for acking a message.
// If message is not acked within this time, it will be nacked and re-delivered.
//
// When messages are read in bulk, this time is calculated for each message separately.
//
// If you want to disable the acknowledgement deadline, set it to 0.
// Warning: when acknowledgement deadline is disabled, messages may block and
// prevent the subscriber from accepting new messages.
//
// Must be non-negative. Default value is [DefaultAckDeadline].
AckDeadline *time.Duration
// InitializeSchema option enables initializing schema on making a subscription.
InitializeSchema bool
// Logger reports message consumption errors and traces.
//
// Defaults to [watermill.NopLogger].
Logger watermill.LoggerAdapter
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc/subscriber.go
Publishing
// ...
// NewPublisher creates a [message.Publisher] instance from a connection interface which could be
// a database handle or a transaction.
func NewPublisher(db SQLiteConnection, options PublisherOptions) (message.Publisher, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc/publisher.go
// ...
// Publish pushes messages into a topic. Returns [ErrPublisherIsClosed] if the publisher is closed.
func (p *publisher) Publish(topic string, messages ...*message.Message) (err error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc/publisher.go
Example:
// ...
publisher, err := wmsqlitemodernc.NewPublisher(
db,
wmsqlitemodernc.PublisherOptions{
InitializeSchema: true,
Logger: logger,
},
)
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite/main.go
// ...
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite/main.go
Publishing in transaction
// ...
import (
"context"
"database/sql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc"
"github.com/ThreeDotsLabs/watermill/message"
)
func publishWithInTransaction(db *sql.DB) {
tx, err := db.BeginTx(context.Background(), &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
})
if err != nil {
panic(err)
}
defer func() {
_ = tx.Commit()
}()
publisher, err := wmsqlitemodernc.NewPublisher(
tx, // transaction presented as database
wmsqlitemodernc.PublisherOptions{
// schema must be initialized elsewhere before using
// the publisher within the transaction
InitializeSchema: false,
},
)
if err != nil {
panic(err)
}
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
_ = tx.Rollback()
panic(err)
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite/transaction.go
Subscribing
// ...
// NewSubscriber creates a new subscriber with the given options.
func NewSubscriber(db SQLiteDatabase, options SubscriberOptions) (message.Subscriber, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc/subscriber.go
Example:
// ...
subscriber, err := wmsqlitemodernc.NewSubscriber(
db,
wmsqlitemodernc.SubscriberOptions{
InitializeSchema: true,
Logger: logger,
},
)
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite/main.go
// ...
// Subscribe streams messages from the topic. Satisfies [watermill.Subscriber] interface.
// Returns [ErrSubscriberIsClosed] if the subscriber is closed.
func (s *subscriber) Subscribe(ctx context.Context, topic string) (c <-chan *message.Message, err error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc/subscriber.go
Advanced ZombieZen Driver
Installation
go get -u github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen@latestUsage
// ...
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen"
"github.com/ThreeDotsLabs/watermill/message"
_ "modernc.org/sqlite"
"zombiezen.com/go/sqlite"
)
func main() {
logger := watermill.NewStdLogger(false, false)
// &cache=shared is critical, see: https://github.com/zombiezen/go-sqlite/issues/92#issuecomment-2052330643
// connectionDSN := "file:db.sqlite3?journal_mode=WAL&busy_timeout=1000&secure_delete=true&foreign_keys=true&cache=shared"
connectionDSN := "file:ephemeral?mode=memory&cache=shared"
conn, err := sqlite.OpenConn(connectionDSN)
if err != nil {
panic(err)
}
defer conn.Close()
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite-zombiezen/main.go
Configuration
// ...
type PublisherOptions struct {
// TableNameGenerators is a set of functions that generate table names for topics and offsets.
// Defaults to [TableNameGenerators.WithDefaultGeneratorsInsteadOfNils].
TableNameGenerators TableNameGenerators
// InitializeSchema enables initialization of schema database during publish.
// Schema is initialized once per topic per publisher instance.
// InitializeSchema is forbidden if using an ongoing transaction as database handle.
// It could result in an implicit commit of the transaction by a CREATE TABLE statement.
InitializeSchema bool
// Logger reports message publishing errors and traces. Defaults value is [watermill.NopLogger].
Logger watermill.LoggerAdapter
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen/publisher.go
// ...
type SubscriberOptions struct {
// ConsumerGroupMatcher differentiates message consumers within the same topic.
// Messages are processed in batches.
// Therefore, another subscriber with the same consumer group name may only obtain
// messages whenever it is able to acquire the row lock.
// Default value is a static consumer group matcher that
// always returns [DefaultConsumerGroupName].
ConsumerGroupMatcher ConsumerGroupMatcher
// BatchSize is the number of messages to read in a single batch.
// Default value is [DefaultMessageBatchSize].
BatchSize int
// TableNameGenerators is a set of functions that generate table names for topics and offsets.
// Default value is [TableNameGenerators.WithDefaultGeneratorsInsteadOfNils].
TableNameGenerators TableNameGenerators
// PollInterval is the interval to wait between subsequent SELECT queries,
// if no more messages were found in the database (Prefer using the BackoffManager instead).
// Must be non-negative. Default value is one second.
PollInterval time.Duration
// LockTimeout is the maximum duration of the row lock. If the subscription
// is unable to extend the lock before this time out ends, the lock will expire.
// Then, another subscriber in the same consumer group name may
// acquire the lock and continue processing messages.
//
// Duration must not be less than one second, because seconds are added
// to the SQLite `unixepoch` function, rounded to the nearest second.
// A zero duration would create a lock that expires immediately.
// There is no reason to set higher precision fractional duration,
// because the lock timeout will rarely ever trigger in a healthy system.
//
// Normally, the row lock is set to zero after each batch of messages is processed.
// LockTimeout might occur if a consuming node shuts down unexpectedly,
// before it is able to complete processing a batch of messages. Only
// in such rare cases the time out matters. And, it is better to set it
// to a higher value in order to avoid unnecessary batch re-processing.
// Therefore, a value lower than one second is impractical.
//
// Defaults to [DefaultLockTimeout].
LockTimeout time.Duration
// AckDeadline is the time to wait for acking a message.
// If message is not acked within this time, it will be nacked and re-delivered.
//
// When messages are read in bulk, this time is calculated for each message separately.
//
// If you want to disable the acknowledgement deadline, set it to 0.
// Warning: when acknowledgement deadline is disabled, messages may block and
// prevent the subscriber from accepting new messages.
//
// Must be non-negative. Default value is [DefaultAckDeadline].
AckDeadline *time.Duration
// BufferPool is a pool of buffers used for reading message payload and
// metadata from the database. If not provided, a default pool will be used.
// The pool may leak message metadata, but never the payload.
//
// Warning: If sync.Pool does not return a buffer, subscription will panic.
BufferPool *sync.Pool
// InitializeSchema option enables initializing schema on making a subscription.
InitializeSchema bool
// Logger reports message consumption errors and traces.
//
// Defaults to [watermill.NopLogger].
Logger watermill.LoggerAdapter
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen/subscriber.go
Publishing
// ...
// NewPublisher creates a [message.Publisher] instance from a [SQLiteDatabase] connection handler.
func NewPublisher(conn *sqlite.Conn, options PublisherOptions) (message.Publisher, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen/publisher.go
// ...
// Publish pushes messages into a topic. Returns [ErrPublisherIsClosed] if the publisher is closed.
//
// This implementation uses a mutex to ensure safety
// when publishing messages concurrently. It ignores
// message context because ZombieZen SQLite treats
// the connection as a file handle. For logging purposes,
// it passes down the context of the first message.
func (p *publisher) Publish(topic string, messages ...*message.Message) (err error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen/publisher.go
Example:
// ...
publisher, err := wmsqlitezombiezen.NewPublisher(conn, wmsqlitezombiezen.PublisherOptions{
InitializeSchema: true,
Logger: logger,
})
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite-zombiezen/main.go
// ...
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello from ZombieZen!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite-zombiezen/main.go
Publishing in transaction
// ...
import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen"
"github.com/ThreeDotsLabs/watermill/message"
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitex"
)
func publishWithInTransaction(connectionDSN string) {
var err error
// create a new connection for each transaction
// unless you guard it with a sync.Mutex
conn, err := sqlite.OpenConn(connectionDSN)
if err != nil {
panic(err)
}
defer conn.Close()
closer := sqlitex.Transaction(conn)
defer func() {
if closer(&err); err != nil {
panic(err)
}
}()
publisher, err := wmsqlitezombiezen.NewPublisher(
conn,
wmsqlitezombiezen.PublisherOptions{
// schema must be initialized elsewhere before using
// the publisher within the transaction
InitializeSchema: false,
},
)
if err != nil {
panic(err)
}
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))
if err = publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
}
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite-zombiezen/transaction.go
Subscribing
// ...
// NewSubscriber creates a new subscriber with the given options.
func NewSubscriber(connectionDSN string, options SubscriberOptions) (message.Subscriber, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen/subscriber.go
Example:
// ...
subscriber, err := wmsqlitezombiezen.NewSubscriber(connectionDSN, wmsqlitezombiezen.SubscriberOptions{
InitializeSchema: true,
Logger: logger,
})
if err != nil {
panic(err)
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sqlite-zombiezen/main.go
// ...
// Subscribe streams messages from the topic. Satisfies [watermill.Subscriber] interface.
// Returns [ErrSubscriberIsClosed] if the subscriber is closed.
func (s *subscriber) Subscribe(ctx context.Context, topic string) (c <-chan *message.Message, err error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitezombiezen/subscriber.go
Marshaler
Watermill’s messages are stored in SQLite using JSON serialization. Both drivers use the same marshaling approach - messages are automatically marshaled to and from JSON format when publishing and subscribing.
The default marshaler handles:
- Message payload (stored as JSON blob)
- Message metadata (stored as JSON object)
- Message UUID (stored as TEXT)
- Timestamps for ordering and consumer group management
Both drivers automatically handle message marshaling and unmarshaling, so no custom marshaler configuration is typically required.
Caveats
SQLite3 does not support querying FOR UPDATE, which is used for row locking when subscribers in the same consumer group read an event batch in official Watermill SQL PubSub implementations. Current architectural decision is to lock a consumer group offset using unixepoch()+lockTimeout time stamp. While one consumed message is processing per group, the offset lock time is extended by lockTimeout periodically by time.Ticker. If the subscriber is unable to finish the consumer group batch, other subscribers will take over the lock as soon as the grace period runs out. A time lock fulfills the role of a traditional database network timeout that terminates transactions when its client disconnects.
All the normal SQLite limitations apply to Watermill. The connections are file handles. Create new connections for concurrent processing. If you must share a connection, protect it with a mutual exclusion lock. If you are writing within a transaction, create a connection for that transaction only.
