CQRS Component

Golang CQRS implementation in Watermill.

CQRS

CQRS means “Command-query responsibility segregation”. We segregate the responsibility between commands (write requests) and queries (read requests). The write requests and the read requests are handled by different objects.

That’s it. We can further split up the data storage, having separate read and write stores. Once that happens, there may be many read stores, optimized for handling different types of queries or spanning many bounded contexts. Though separate read/write stores are often discussed in relation with CQRS, this is not CQRS itself. CQRS is just the first split of commands and queries.

Source: www.cqrs.nu FAQ

Glossary

CQRS Schema

Command

The command is a simple data structure, representing the request for executing some operation.

Command Bus

Full source: components/cqrs/command_bus.go

// ...
// CommandBus transports commands to command handlers.
type CommandBus struct {
// ...

Command Processor

Full source: components/cqrs/command_processor.go

// ...
// CommandProcessor determines which CommandHandler should handle the command received from the command bus.
type CommandProcessor struct {
// ...

Command Handler

Full source: components/cqrs/command_processor.go

// ...
// CommandHandler receives a command defined by NewCommand and handles it with the Handle method.
// If using DDD, CommandHandler may modify and persist the aggregate.
//
// In contrast to EvenHandler, every Command must have only one CommandHandler.
type CommandHandler interface {
// ...

Event

The event represents something that already took place. Events are immutable.

Event Bus

Full source: components/cqrs/event_bus.go

// ...
// EventBus transports events to event handlers.
type EventBus struct {
// ...

Event Processor

Full source: components/cqrs/event_processor.go

// ...
// EventProcessor determines which EventHandler should handle event received from event bus.
type EventProcessor struct {
// ...

Event Handler

Full source: components/cqrs/event_processor.go

// ...
// EventHandler receive event defined by NewEvent and handle it with Handle method.
// If using DDD, CommandHandler may modify and persist the aggregate.
// It can also invoke process manager, saga or just build a read model.
//
// In contrast to CommandHandler, every Event can have multiple EventHandlers.
type EventHandler interface {
// ...

CQRS Facade

Full source: components/cqrs/cqrs.go

// ...
// Facade is a facade for creating the Command and Event buses and processors.
// It was created to avoid boilerplate, when using CQRS in the standard way.
// You can also create buses and processors manually, drawing inspiration from how it's done in NewFacade.
type Facade struct {
// ...

Command and Event Marshaler

Full source: components/cqrs/marshaler.go

// ...
// CommandEventMarshaler marshals Commands and Events to Watermill's messages and vice versa.
// Payload of the command needs to be marshaled to []bytes.
type CommandEventMarshaler interface {
    // Marshal marshals Command or Event to Watermill's message.
   Marshal(v interface{}) (*message.Message, error)

    // Unmarshal unmarshals watermill's message to v Command or Event.
   Unmarshal(msg *message.Message, v interface{}) (err error)

    // Name returns the name of Command or Event.
   // Name is used to determine, that received command or event is event which we want to handle.
   Name(v interface{}) string

    // NameFromMessage return the name of Command or Event from Watermill's message (generated by Marshal).
   //
   // When we have Commnad or Event marshaled to Watermill's message,
   // we should use NameFromMessage instead of Name to avoid unnecessary unmarshaling.
   NameFromMessage(msg *message.Message) string
}
// ...

Usage

Example domain

As an example, we will use a simple domain, that is responsible for handing room booking in a hotel.

We will use Event Storming notation to show the model of this domain.

Legend:

  • blue post-its are commands
  • orange post-its are events
  • green post-its are read models, asynchronously generated from events
  • violet post-its are policies, which are triggered by events and produce commands
  • pink post its are hot-spots; we mark places where problems often occur

CQRS Event Storming

The domain is simple:

  • A Guest is able to book a room.
  • Whenever a room is booked, we order a beer for the guest (because we love our guests).
    • We know that sometimes there are not enough beers.
  • We generate a financial report based on the bookings.

Sending a command

For the beginning, we need to simulate the guest’s action.

Full source: _examples/cqrs-protobuf/main.go

// ...
       bookRoomCmd := &BookRoom{
            RoomId:    fmt.Sprintf("%d", i),
            GuestName: "John",
            StartDate: startDate,
            EndDate:   endDate,
        }
        if err := commandBus.Send(bookRoomCmd); err != nil {
            panic(err)
        }
// ...

Command handler

BookRoomHandler will handle our command.

Full source: _examples/cqrs-protobuf/main.go

// ...
// BookRoomHandler is a command handler, which handles BookRoom command and emits RoomBooked.
//
// In CQRS, one command must be handled by only one handler.
// When another handler with this command is added to command processor, error will be retuerned.
type BookRoomHandler struct {
    eventBus *cqrs.EventBus
}

// NewCommand returns type of command which this handle should handle. It must be a pointer.
func (b BookRoomHandler) NewCommand() interface{} {
    return &BookRoom{}
}

func (b BookRoomHandler) Handle(c interface{}) error {
    // c is always the type returned by `NewCommand`, so casting is always safe
   cmd := c.(*BookRoom)

    // some random price, in production you probably will calculate in wiser way
   price := (rand.Int63n(40) + 1) * 10

    log.Printf("Booked %s for %s from %s to %s", cmd.RoomId, cmd.GuestName, cmd.StartDate, cmd.EndDate)

    // RoomBooked will be handled by OrderBeerOnRoomBooked event handler,
   // in future RoomBooked may be handled by multiple event handler
   if err := b.eventBus.Publish(&RoomBooked{
        ReservationId: watermill.NewUUID(),
        RoomId:        cmd.RoomId,
        GuestName:     cmd.GuestName,
        Price:         price,
        StartDate:     cmd.StartDate,
        EndDate:       cmd.EndDate,
    }); err != nil {
        return err
    }

    return nil
}

// OrderBeerOnRoomBooked is a event handler, which handles RoomBooked event and emits OrderBeer command.
// ...

Event handler

As mentioned before, we want to order a beer every time when a room is booked (“Whenever a Room is booked” post-it). We do it by using the OrderBeer command.

Full source: _examples/cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked is a event handler, which handles RoomBooked event and emits OrderBeer command.
type OrderBeerOnRoomBooked struct {
    commandBus *cqrs.CommandBus
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
    return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(e interface{}) error {
    event := e.(*RoomBooked)

    orderBeerCmd := &OrderBeer{
        RoomId: event.RoomId,
        Count:  rand.Int63n(10) + 1,
    }

    return o.commandBus.Send(orderBeerCmd)
}

// OrderBeerHandler is a command handler, which handles OrderBeer command and emits BeerOrdered.
// ...

OrderBeerHandler is very similar to BookRoomHandler. The only difference is, that it sometimes returns an error when there are not enough beers, which causes redelivery of the command. You can find the entire implementation in the example source code.

Building a read model with the event handler

Full source: _examples/cqrs-protobuf/main.go

// ...
// BookingsFinancialReport is a read model, which calculates how much money we may earn from bookings.
// Like OrderBeerOnRoomBooked, it listens for RoomBooked event.
//
// This implementation is just writing to the memory. In production, you will probably will use some persistent storage.
type BookingsFinancialReport struct {
    handledBookings map[string]struct{}
    totalCharge     int64
    lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
    return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (BookingsFinancialReport) NewEvent() interface{} {
    return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(e interface{}) error {
    // Handle may be called concurrently, so it need to be thread safe.
   b.lock.Lock()
    defer b.lock.Unlock()

    event := e.(*RoomBooked)

    // When we are using Pub/Sub which doesn't provide exactly-once delivery semantics, we need to deduplicate messages.
   // GoChannel Pub/Sub provides exactly-once delivery,
   // but let's make this example ready for other Pub/Sub implementations.
   if _, ok := b.handledBookings[event.ReservationId]; ok {
        return nil
    }
    b.handledBookings[event.ReservationId] = struct{}{}

    b.totalCharge += event.Price

    fmt.Printf("Already booked rooms for $%d\n", b.totalCharge)
    return nil
}

func main() {
// ...

Wiring it up - the CQRS facade

We have all the blocks to build our CQRS application. We now need to use some kind of glue to wire it up.

We will use the simplest in-memory messaging infrastructure: GoChannel.

Under the hood, CQRS is using Watermill’s message router. If you are not familiar with it and want to learn how it works, you should check Getting Started guide. It will also show you how to use some standard messaging patterns, like metrics, poison queue, throttling, correlation and other tools used by every message-driven application. Those come built-in with Watermill.

Let’s go back to the CQRS. As you already know, CQRS is built from multiple components, like Command or Event buses, handlers, processors, etc. To simplify creating all these building blocks, we created cqrs.Facade, which creates all of them.

Full source: _examples/cqrs-protobuf/main.go

// ...
func main() {
    logger := watermill.NewStdLogger(true, false)
    marshaler := cqrs.ProtobufMarshaler{}

    // You can use any Pub/Sub implementation from here: https://watermill.io/docs/pub-sub-implementations/
   pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

    // CQRS is built on already existing messages router: https://watermill.io/docs/messages-router/
   router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }

    // Simple middleware which will recover panics from event or command handlers.
   // More about router middlewares you can find in the documentation:
   // https://watermill.io/docs/messages-router/#middleware
   //
   // List of available middlewares you can find in message/router/middleware.
   router.AddMiddleware(middleware.Recoverer)

    // cqrs.Facade is facade for Command and Event buses and processors.
   // You can use facade, or create buses and processors manually (you can inspire with cqrs.NewFacade)
   cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{
        CommandsTopic: "commands",
        EventsTopic:   "events",
        CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler {
            return []cqrs.CommandHandler{
                BookRoomHandler{eb},
                OrderBeerHandler{eb},
            }
        },
        EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler {
            return []cqrs.EventHandler{
                OrderBeerOnRoomBooked{cb},
                NewBookingsFinancialReport(),
            }
        },
        Router:                router,
        CommandsPubSub:        pubSub,
        EventsPubSub:          pubSub,
        Logger:                logger,
        CommandEventMarshaler: marshaler,
    })
    if err != nil {
        panic(err)
    }

    // publish BookRoom commands every second to simulate incoming traffic
   go publishCommands(cqrsFacade.CommandBus())

    // processors are based on router, so they will work when router will start
   if err := router.Run(); err != nil {
        panic(err)
    }
}
// ...

And that’s all. We have a working CQRS application.

What’s next?

As mentioned before, if you are not familiar with Watermill, we highly recommend reading Getting Started guide.