Forwarder component

Emitting events along with storing data in a database in one transaction.

Publishing messages in transactions (and why we should care)

While working with an event-driven application, you may in some point need to store an application state and publish a message telling the rest of the system about what’s just happened. In a perfect scenario, you’d want to persist the application state and publish the message in a transaction, as not doing so might get you easily into troubles with data consistency. In order to commit both storing data and emitting an event in one transaction, you’d have to be able to publish messages to the same database you use for the data storage, or implement 2PC on your own. If you don’t want to change your message broker to a database, nor invent the wheel once again, you can make your life easier by using Watermill’s Forwarder component!

Forwarder component

You can think of the Forwarder as a background running daemon which awaits messages that are published to a database, and makes sure they eventually reach a message broker.

Watermill Forwarder component

In order to make the Forwarder universal and usable transparently, it listens to a single topic on an intermediate database based Pub/Sub, where enveloped messages are sent with help of a decorated Forwarder Publisher. The Forwarder unwraps them, and sends to a specified destined topic on the message broker.

Forwarder envelope

Example

Let’s consider a following example: there’s a command which responsibility is to run a lottery. It has to pick a random user that’s registered in the system as a winner. While it does so, it should also persist the decision it made by storing a database entry associating a unique lottery ID with a picked user’s ID. Additionally, as it’s an event-driven system, it should emit a LotteryConcluded event, so that other components could react to that appropriately. To be precise - there will be component responsible for sending prizes to lottery winners. It will receive LotteryConcluded events, and using the lottery ID embedded in the event, verify who was the winner, checking with the database entry.

In our case, the database is MySQL and the message broker is Google Pub/Sub, but it could be any two other technologies.

Approaching to implementation of such a command, we could go various ways. Below we’re going to cover three possible attempts, pointing their vulnerabilities.

Publishing an event first, storing data next

In this approach, the command is going to publish an event first and store data just after that. While in most of the cases that approach will probably work just fine, let’s try to find out what could possibly go wrong.

There are three basic actions that the command has to do:

  1. Pick a random user A as a lottery winner.
  2. Publish a LotteryConcluded event telling that lottery B has been concluded.
  3. Store in the database that the lottery B has been won by the user A.

Every of these steps could potentially fail, breaking the flow of our command. The first point wouldn’t have huge repercussions in case of its failure - we would just return an error and consider the whole command failed. No data would be stored, no event would be emitted. We can simply rerun the command.

In case the second point fails, we’ll still have no event emitted and no data stored in the database. We can rerun the command and try once again.

What’s most interesting is what could happen in case the third point fails. We’d already have the event emitted after the second point, but no data would be stored eventually in the database. Other components would get a signal that the lottery had been concluded, but no winner would be associated to the lottery ID sent in the event. They wouldn’t be able to verify who’s the winner, so their action would have to be considered failed as well.

We still can get out of this situation, but most probably it will require some manual action, i.e., rerunning the command with the lottery ID that the emitted event has.

Full source: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 1. Publishes event to Google Cloud Pub/Sub first, then stores data in MySQL.
func publishEventAndPersistData(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	publisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	// In case this fails, we have an event emitted, but no data persisted yet.
	if err = simulateError(); err != nil {
		logger.Error("Failed to persist data", err, nil)
		return err
	}

	_, err = db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}
// ...

Storing data first, publishing an event next

In the second approach, we’re going to try address first approach’s drawbacks. We won’t leak our failure to outer components by not emitting an event in case we don’t have the state persisted properly in database. That means we’ll change the order of our actions to following:

  1. Pick a random user A as a lottery winner.
  2. Store in the database that the lottery B has been won by the user A.
  3. Publish a LotteryConcluded event telling that lottery B has been concluded.

Having two first actions failed, we have no repercussions, just as in the first approach. In case of failure of the 3rd point, we’d have data persisted in the database, but no event emitted. In this case, we wouldn’t leak our failure outside the lottery component. Although, considering the expected system behavior, we’d have no prize sent to our winner, because no event would be delivered to the component responsible for this action.

That probably can be fixed by some manual action as well, i.e., emitting the event manually. We still can do better.

Full source: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 2. Persists data to MySQL first, then publishes an event straight to Google Cloud Pub/Sub.
func persistDataAndPublishEvent(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	_, err := db.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	if err != nil {
		return err
	}

	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	// In case this fails, we have data persisted, but no event emitted.
	if err = simulateError(); err != nil {
		logger.Error("Failed to emit event", err, nil)
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}
// ...

Storing data and publishing an event in one transaction

Let’s imagine our command could do the 2nd, and the 3rd point at the same time. They would be committed atomically, meaning that any of them can’t succeed having the other failed. This can be achieved by leveraging a transaction mechanism which happens to be implemented by most of the databases in today’s world. One of them is MySQL used in our example.

In order to commit both storing data and emitting an event in one transaction, we’d have to be able to publish our messages to MySQL. Because we don’t want to change our message broker to be backed by MySQL in the whole system, we have to find a way to do that differently.

There’s a good news: Watermill provides all the tools straight away! In case the database you’re using is one among MySQL, PostgreSQL (or any other SQL), Firestore or Bolt, you can publish messages to them. Forwarder component will help you with picking all the messages you publish to the database and forwarding them to a message broker of yours.

Everything you have to do is to make sure that:

  1. Your command uses a publisher working in a context of a database transaction (i.e. SQL, Firestore, Bolt).
  2. Forwarder component is running, using a database subscriber, and a message broker publisher.

The command could look like following in this case:

Full source: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
// 3. Persists data in MySQL and emits an event through MySQL to Google Cloud Pub/Sub, all in one transaction.
func persistDataAndPublishEventInTransaction(lotteryID int, pickedUser string, logger watermill.LoggerAdapter) error {
	tx, err := db.Begin()
	if err != nil {
		return err
	}

	defer func() {
		if err == nil {
			tx.Commit()
		} else {
			logger.Info("Rolling transaction back due to error", watermill.LogFields{"error": err.Error()})
			// In case of an error, we're 100% sure that thanks to MySQL transaction rollback, we won't have any of the undesired situations:
			// - event is emitted, but no data is persisted,
			// - data is persisted, but no event is emitted.
			tx.Rollback()
		}
	}()

	_, err = tx.Exec(`INSERT INTO lotteries (lottery_id, winner) VALUES(?, ?)`, lotteryID, pickedUser)
	if err != nil {
		return err
	}

	var publisher message.Publisher
	publisher, err = sql.NewPublisher(
		tx,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		return err
	}

	// Decorate publisher so it wraps an event in an envelope understood by the Forwarder component.
	publisher = forwarder.NewPublisher(publisher, forwarder.PublisherConfig{
		ForwarderTopic: forwarderSQLTopic,
	})

	// Publish an event announcing the lottery winner. Please note we're publishing to a Google Cloud topic here,
	// while using decorated MySQL publisher.
	event := LotteryConcludedEvent{LotteryID: lotteryID}
	payload, err := json.Marshal(event)
	if err != nil {
		return err
	}

	err = publisher.Publish(googleCloudEventTopic, message.NewMessage(watermill.NewULID(), payload))
	if err != nil {
		return err
	}

	return nil
// ...

In order to make the Forwarder component work in background for you and forward messages from MySQL to Google Pub/Sub, you’d have to set it up as follows:

Full source: github.com/ThreeDotsLabs/watermill/_examples/real-world-examples/transactional-events-forwarder/main.go

// ...
	// Setup the Forwarder component so it takes messages from MySQL subscription and pushes them to Google Pub/Sub.
	sqlSubscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	expectNoErr(err)

	gcpPublisher, err := googlecloud.NewPublisher(
		googlecloud.PublisherConfig{
			ProjectID: projectID,
		},
		logger,
	)
	expectNoErr(err)

	fwd, err := forwarder.NewForwarder(sqlSubscriber, gcpPublisher, logger, forwarder.Config{
		ForwarderTopic: forwarderSQLTopic,
	})
	expectNoErr(err)

	go func() {
		err := fwd.Run(context.Background())
		expectNoErr(err)
	}()

// ...

If you wish to explore the example more, you can find it implemented here.