A few weeks ago I started a blog post series to summarize my experiences in building an event-sourced web application using Scala and Akka. This was done based on an example application (see branch part-1). There, I gave an overview of the application architecture and presented some details of the immutable domain model and the service layer. Since then, the example application has been extended (see branch part-2) with a number of new features and enhancements. Here's an overview:
- The STM-based state management was completely revised and generalized into the traits
UpdateProjectionis similar to an Akka Agent: it applies state transition functions asynchronously and can participate in STM transactions. The major difference is that an
UpdateProjectionis specialized on domain object updates and can log captured events to a persistent event log. By default, update events are logged before state changes are visible via STM references (this is an important difference to the service layer of part 1).
UpdateProjectionimplementors (such as the example application's
InvoiceService) are domain event producers.
EventProjectionimplementors, on the other hand, are domain event consumers. They internally use plain Akka Agents to manage state and derive new state values from received domain events (using an application-defined event projection function).
EventProjectionimplementors are usually components that manage read models or coordinate business processes in event-driven architectures, for example.
- The domain model was enhanced by introducing the domain classes
PaidInvoiceto represent the states an invoice can have. Valid state transitions are defined by the methods on these domain classes and can therefore be checked by the compiler. This approach is explained more detailed here although the implementation used in our example application slightly differs. In part 1, we only had a single
Invoiceclass and valid state transitions had to be checked at runtime.
New features include:
- A revised
EventLogtrait together with two implementations:
JournalioEventLogis based on Journal.IO and
BookkeeperEventLogon Apache BookKeeper. An
EventLogsupports synchronous and asynchronous appending of events as well as iterating over stored events, either from the beginning or from an application-defined position in the event history. Event log entries are also assigned sequence numbers so that event consumers can detect gaps in event streams or re-order (resequence) them, if needed.
- Event consumers. One example is
EventProjectionthat derives invoice update statistics from domain events. Here, a separate read model is used (following the CQRS pattern) to serve invoice statistic queries. Another example is
EventProjectionthat simply reconstructs the invoice map (as maintained by the
InvoiceService) from invoice events at a different location. It can be used to replicate application state across different nodes and to serve (eventually consistent) reads. The replicated state could also be used by a snapshot service to take snapshots of application state.
InvoiceReplicatorneeds to receive events in the correct order and is therefore configured to resequence the received event stream. A third example is the
PaymentProcess. It coordinates the activities of
PaymentService. Instead of having these services sending commands to each other, it is the
PaymentProcessthat sends commands to (i.e. calls methods on) these services in reaction to domain events. This event-driven approach to implementing business processes not only decouples the services from each other but also lets other components extend (or monitor) the business process by subscribing to and reacting on the relevant domain events. The
PaymentProcessis currently stateless. Processes that need to maintain state should implement
EventProjectionand recover the process state during application start (or failover) from the event history
- A RESTful web interface for invoices and invoice statistics with support for HTML, XML and JSON representations. These can be negotiated with the HTTP Accept header. The web layer is based on the Jersey web framework (the JAX-RS reference implementation). HTML representations are rendered with the Scalate template engine. The mapping between XML and JSON representations and immutable domain classes is based on JAXB annotations. A JAXB-based XML provider must be supported by any JAX-RS implementation but Jersey additionally comes with a JAXB-based JSON provider so that the same metadata (JAXB annotations) can be used to generate both XML and JSON representations. Following some simple rules, it is possible to JAXB-annotate Scala case classes without polluting them with Java collection types or getters and setters. One major drawback of the current JAX-RS specification is that it doesn't support asynchronous responses yet. This will change with JAX-RS 2.0 and then we can make full use of the asynchronous
- A communication
Channelfor connecting domain event producers to consumers. The example application provides a
SimpleChannelimplementation for local communication. Alternative implementations, for example, could connect to a distributed event bus to communicate events across components of a distributed application.
Running the example applicationThe example application can be started with
sbt run-main dev.example.eventsourcing.server.Webserver
Two classes relevant for starting the application are:
Appserver: configures the event log, services, read models and processes and connects them via an event channel. It also recovers application state from the event history.
Webserver: configures the web service layer and starts an embedded web server (Jetty).
Appserverand additionally start a test BookKeeper instance with
sbt run-main dev.example.eventsourcing.server.Zookeeper
sbt run-main dev.example.eventsourcing.server.Bookkeeper
Examples how to interact with the RESTful web interface can be found here.
Service Layer EnhancementsIn the service layer implementation from part 1 we've seen how to keep the order of logged events in correspondence with the order of updates to the application state. We used a transient event log that could participate in STM transactions. After the transaction commits, the events from the transient event log have been transferred to a persistent event log. A drawback of this approach is that one can loose updates in case of crashes after an STM reference has been updated but before the changes have been written to the persistent event log. This can lead to situations where clients can already see application state that cannot be recovered from the event log. While some applications may tolerate this, others may require that any visible application state must be fully recoverable from the event log. Therefore, an alternative approach must be chosen.
We need a way to write events, captured during domain object updates, to a persistent event log before the STM reference is updated. But writing to the persistent event log must be done outside an STM transaction for reasons explained in part 1. Updates must also be based on the current (i.e. latest) application state. We therefore need to
- Get the current state value from a transactional reference (STM transaction 1)
- Update domain object(s) obtained from the current state (no STM transaction)
- Write the captured update event(s) to a persistent event log (no STM transaction)
- Compute a new state value from the domain object update and write it to the transactional reference (STM transaction 2)
- Instances of
UpdateProjectionmanage (part of) application state with a transactional
Sis the state value type. Clients concurrently read application state via
currentState. Sequential writes to the transactional
refare done exclusively by the
updateractor (more on write concurrency below).
UpdateProjectionimplementors change application state with the
updateparameter is a function that computes a domain object
Update[Event, B]from current state
Bis a domain object type. The update result (either
Failure[DomainError]) is returned as future value from the
updatefunction and the underlying
Futureimplementation object (
promise) are sent to the updater actor with an
updaterthen reads the current state and applies the
updatefunction to it. If the update succeeds, it writes the captured events to an
EventLogand projects the update result onto the current state. The projection is done with the
projectfunction. It creates a new state value from the current state and the update result. The new state value is then finally set on the transactional
promiseis completed with the update result.
- Furthermore, the
transactedmethod can participate in STM transactions. If there's an enclosing transaction, the
updateractor will only be triggered if the enclosing transaction successfully commits. If there's no enclosing transaction the
updateractor will always be triggered.
UpdateProjectionto implement the stateful
InvoiceService. The state is of type
Map[String, Invoice]i.e. a single map containing draft, sent and paid invoices. Here's a simplified version of
updateInvoicemethod uses the
transactedmethod of the
UpdateProjectiontrait. It tries to get an invoice with given
invoiceIdfrom the current state and applies the supplied update function
fto it. The
updateInvoicemethod is used by
updateDraftInvoicefor updating draft invoices in the invoice map. The
updateDraftInvoicemethod is used by the service methods
sendInvoiceTo. Adding an item to an existing draft invoice yields a future value of an updated draft invoice (return type
Future[DomainValidation[DraftInvoice]]). Sending an existing draft invoice, on the other hand, causes a state transition of that invoice to a sent invoice (return type
Future[DomainValidation[SentInvoice]]) i.e. the service methods make use of the newly introduced domain object types. The
InvoiceServicemust also implement the abstract members
projectimplementation projects an updated invoice onto the current state by simply adding it to the map of invoices (replacing an old invoice if present).
initialState(empty map by default) and an
EventLoginstance are provided during construction of an
InvoiceServiceimplementation supports concurrent reads but only sequential writes to the whole invoice map. This may be sufficient for many applications but if a higher degree of write concurrency is needed, one could choose to have a separate
UpdateProjectioninstance per invoice object (which is comparable to have a separate Akka Agent for each invoice object). This allows both concurrent reads and writes to the invoices of an application. The following snippet shows the general idea (not part of the example project).
InvoiceServicemaintains a map of
PersistentInvoiceinstances where a
UpdateProjectionthat contains a reference to a single (draft, sent or paid) invoice. Consequently, updates to different invoices can now be made concurrently. The projection function degenerates to a function that simply returns the updated invoice.