Monthly Archives: December 2010

Sagas, Event Sourcing, and Failed Commands

There is an interesting thread on the DDD/CQRS group involving sagas.  I’m posting my reply below because I want to make it available as a solution to those that encounter this same problem.  All replies should be directed on the CQRS thread instead of replying here below.

To give a small amount of background, the question is related to how a failure or refusal on the part of the domain to perform some action is communicated back to the saga such that it can take the appropriate, compensating action.

Here’s the post:

We’ve run into almost this exact situation and considered a number of ways to solve the problem.

To sum up the problem, it’s that we need to communicate a failure or rejection of a command/instruction by the domain back to a saga such that the saga can take the appropriate, compensating action.

As previously mentioned in this thread, an aggregate must enforce its invariants and is not allowed to enter an invalid state. This presents somewhat of an issue because we want to communicate the failure back to the saga but this failure cannot be communicated *through* the domain.

One potential solution (although not the one that we use) was raised during Greg’s course when he was discussing sagas. He hinted at the idea that a call from the saga to the domain could be made via RPC. In essence the command message is sent synchronously via some kind of RPC-like call or web service instead of asynchronously using a message bus or message queue. In this way the failure can easily be communicated back in the form of a fault that is understood by the saga. Ultimately we went with something a little bit different because we were exposing our domain behavior through message handlers that listened to a message bus and we wanted to maintain asynchronous messaging throughout or system.

Our solution was the following (and it’s worked quite well):

1. The saga asynchronously dispatches a command message to the domain.

2. The message handler receives the command message, loads the aggregate from the repository, and calls the appropriate method on the aggregate.

3. The domain object checks its invariants and ultimately decides to throws an intention-revealing, well-named, domain-specific exception.

4. The command handler catches the well-named exception. Because the command handler knows the type of message received along with its intention and the type of exception that was thrown and caught, it’s in a position to relay that exception in the form of a message back to the saga via a bus.Reply(). But this time, it’s not an event message, which represents something that something happened. Instead it’s a message that describes something that didn’t happen. We didn’t have a name for this kind of message but the terms notification and alert kept coming back. Ultimately we decided to call these kinds of messages alert. These messages could also potentially be called faults or something else, but they should be considered separate from events.

5. The saga receives the alert/fault message and applies the appropriate action.

The one question that still remains is that we lose the alert message once it’s consumed by the saga and the concern is that we want to keep track of everything that has happened. I couldn’t agree more. Keeping track of what’s happened and it’s extremely important. But a better question is who’s responsible for keeping track of this fault/alert? Should the domain keep track of something that didn’t happen or something that it refused to do? Isn’t it the saga that cares about the command and associated failure? Shouldn’t the saga (or its infrastructure) be responsible for tracking all messages relative to the saga?

In our implementation, we actually implement our sagas using event sourcing. What this means is that all messages addressed to the saga are replayed in order to re-build the state of the saga. In addition (and as an auditing benefit), we use an event store to dispatch outbound message from the saga. This means that a saga is completely autonomous and separate from the domain and is only coupled by the message contracts. It also means that we can replay incoming messages against a new implementation of a saga if necessary to come up with an alternate model as our sagas and domain evolve to changing business requirements.

To sum things up:

1. When the domain refuses to do something, it doesn’t generate an event. Instead, it throws an exception.

2. The command handler handles the exception and does a bus.Reply() with an “alert” or “fault” message.

3. Because the message is not generated by the domain, but by the layer just on top of the domain, we don’t track this alert/fault message inside of the domain.

4. The saga receives the message and takes appropriate, compensating action.

5. Because the saga is implemented using event sourcing, nothing is ever lost. We have a complete business/audit history of what happened and we can evolve our saga model and rebuild it with full confidence in our message history.

Event Store: Transaction Integrity Without Transactions

As outlined in the architectural overview document, the central model in version 2.0 of the EventStore is that of a “commit” consisting of a series of events. By focusing on a commit we are able to liberate ourselves from the high requirements introduced when using a two-phase commit protocol. At the same time, by reordering slightly the sequence of operations when performing a commit to the configured persistence engine, the EventStore is able to avoid using transactions at all and yet still maintain transactional integrity, all of which has been outlined in the architectural overview document. Our purpose here is to understand how we are able to maintain transactional integrity even when it is possible, such as when using a relational database as the configured persistence mechanism.

First and foremost, in a relational database there is often a concept of foreign key constraints. These constraints are used as a guard to protect relational integrity. Fortunately, in our model this integrity is not necessary. As silly and ridiculous as this may sound, the reason for this surrounds several factors implicit in the model. The primary factory is there is only one central table in our relational database—the Commits table. The other tables, such as Dispatch and Streams are actually only heuristics to aid in a few simple queries. The other reason is that, by removing these constraints, we are able to reorder the sequence of operations such that the inserts.

The Commits “table” is our central source of truth. Without it, there is no event store. It is also what enables and facilitates optimistic concurrency control. By numbering each commit for a particular stream we are assured that no two commits will ever step on the other’s respective toes. This simplistic form of optimistic concurrency control is the foundation upon which we can build using alternate persistence mechanisms. If a commit does not make it into this table, the commit never occurred. No matter what other tables might say, the commit never took place.

When employing a relational database, we can also use a concept known as a Dispatch table. The Dispatch table contains weak references to a set of commits which have not been dispatched or put on the wire for interested subscribers. Typically the lifespan of an entry in this table is very small, perhaps a few dozen milliseconds. Generally, this same concept could be structured as a simple Boolean field on the commit row itself, which is how is implemented in document persistence engines. But in a relational model, indexes on a Boolean are seldom utilized because of their low cardinality. As such, a separate table is employed. Furthermore, this table is only ever used for one purpose—for when a process fails to dispatch or succeeds in dispatching but hasn’t marked the commit as dispatched. When the application starts, it can query the Dispatch table to determine what work has not yet been performed. Generally speaking, an entry in the Dispatch table indicates that there is an outstanding commit to be dispatched.

The Streams “table” or concept both in a relational model and document-oriented model is also interesting in that it doesn’t authoritatively dictate the attributes of a particular stream. Typically the Streams table will have information such as diagnostic/name info about what kind of stream it is, e.g. a .NET Type. Furthermore, it will have several values which indicate the sequence or revision of the most recent commit as well as the revision of the most recent snapshot. It is for this purpose that we utilize a Streams table—to be able to determine how far the most recent revision is ahead of the most recent snapshot. Beyond this, the table does not serve any purpose. In contrast to Greg Youg’s event store document, the Streams table does not aid in optimistic concurrency control, that’s what the Commits table does.

Interestingly enough, by making a commit the central concept, and yet still employing the stream identifier, e.g. StreamId as the single identifier for all things related to a stream, we’re able to take advantage of horizontal sharding capabilities of a number of storage engines such as MongoDB, MySQL Cluster, and others. Furthermore, because we’ve removed all foreign key constraints, the storage engine doesn’t need to enforce them, such as MySQL Cluster (NDB) which would otherwise handle this at the database level rather than at the storage engine level, e.g. InnoDB. In other words, we have sacrificed very little, if anything at all, to have massive gains in scalability.

The current source code master/trunk commit of the EventStore has reordered slightly the sequence of operations to take advantage of all of the above understanding thus avoiding the overhead of transactional integrity along with the complexity of isolation levels. But we now need to consider failure conditions and potential race conditions to see how they are handled elegantly by the order of operations when using a relational database.

The current sequence of operations is thus:

1. Insert a weak reference to the commit using StreamId and CommitSequence into the Dispatch table.

2. Insert the actual commit into the Commits table.

3. Insert or update the Streams table—insert for the first commit; update for subsequent commits.

What happens if something fails between the first and second steps? The way the current SQL query is written, a Dispatch is only inserted if one doesn’t already exist for that particular commit sequence. In other words, if a commit is attempted and fails just after the first step, we have a dangling reference to a commit. But this dangling reference isn’t a problem as we shall see.

In most cases, the commit will be retried, but even if it’s not retried the next commit related to the stream in question will attempt to perform an insert into the Dispatch table and see that a corresponding row already exists and it will simply skip the insert for that commit into the Dispatch table. In other words, it will assume control of the Dispatch row. The bottom line is that whether our commit succeeds after retry or another commit succeeds first, a corresponding Dispatch row (which before was dangling) will now point to a commit. And once the commit is dispatched, it will remove the row from the Dispatch table. With the row removed, we no longer have a dangling row in the Dispatch table. Interestingly enough, we don’t have foreign key constraints, but we have perfect referential integrity.

In a race condition between two commits, both will attempt to write to the table. But the way the insert is written, only a single row will ever result. Following that, the first process to insert into the Commits table wins the optimistic concurrency “race”, while the other fails. This means that the Dispatch table is correctly pointing to the victorious insert in the Commits table. The failed insert has nothing to roll back because it never actually inserted anything into the Commits table and the associated Dispatch entry is correctly pointing to the successful Commit. As we can see, race conditions are also gracefully handled.

But what about a failure scenario after the Commit row has been inserted and the Streams table is never updated? As mentioned previously, the Streams table is only a heuristic to determine how far the most recent stream revision is away from a snapshot. In a failure condition, the Streams table is never updated. This will also only happen in extreme circumstance such as database failure (process termination, hardware failure, power loss, etc.) because the application has already sent the entire batch of SQL statements as a single transfer across the wire to the database process.

In the event the database somehow does fail to update the associated row in the Streams table, the worst possible consequence is that a snapshot that might have otherwise been taken is not taken. At the same time, the very next commit to occur when the database comes back online will properly update the Streams table.

The above can even be taken to an extreme whereby the Streams table is only ever updated every X commits, thus avoiding the overhead of an additional insert for extremely low-latency systems. By revising the database insert statement slightly, the Streams table could be updated every 5 commits, for example by performing a modulus the StreamRevision by 5, e.g. (WHERE @StreamRevision % 5 = 0).

There are several other performance tuning techniques that can be done at the application level to reduce the number of IO operations such that the application becomes bound by CPU, memory speed, and network bandwidth, not to mention all of the streamlining that can be done for infrastructure services, operating system tweaks, and hardware.

So there we have it, full database consistency and integrity, and massive horizontal scalability, all without the use of foreign key constraints or transactions, etc. Event sourcing is an extremely powerful model that solves a number of significant problems in a very elegant and easy-to-understand manner.

CQRS: EventStore v2 Architectural Overview

[The following is a design document that gives an overview of version 2.0 of my EventStore project that is currently under active development.  Buckle up.  It’s long.]

EventStore v2 Architectural Overview

The EventStore is designed to store a series of events, known as an event stream, to durable storage. Because of the simplicity of the model of an event stream, it can easily be persisted to durable storage using a large number of storage engines, including relational databases, plain files, NoSQL document databases, or even key-value stores. Herein is a brief summary that describes some of the guiding architectural principles and the underlying philosophy that governs the implementation of the EventStore and dictates the programming interface and calling contracts.

Event Sourcing

Event sourcing is a concept whereby a series of events are applied to an object to bring it back to a known state. Event sourcing itself is beyond the scope of this document, but suffice it to say that event sourcing and the concept of storing events as a stream have a number of desirable and interesting properties. Among these are true, verifiable, and correct business audit logs, the ability to reconstruct alternate models from the stream, as well as easier consistency due to the immutability of the persisted events.

The Model

The model in a “typical” event store focuses and surrounds two key concepts: The stream and the events or messages for that stream. Ancillary concepts may include metadata such as dates and times as well as snapshot—an optimization for large streams of events.

The model for the EventStore is somewhat different while allowing a great deal of flexibility with regards to underlying storage engine technology without sacrificing any of the valuable properties of event sourcing and storage of events. In fact, this new model is what allows many NoSQL implementations to be leveraged as a durable store for the events.

This new model focuses on two main concepts: streams and commits. At first glance this may not appear to be any different than the typical model. Indeed, it’s very similar. The critical difference lies in the focus on the commit rather than on the event or message. An event is an atomic unit, a “something happened” notification with application and business significance. This fundamental concept cannot be abandoned. But the focus of our new model is on the storage of events, not the events themselves.

By focusing on such a fine-grained unit of atomicity, a typical event store may require multiple roundtrips to the underlying storage engine to fully persist all events generated during a single unit of work. Herein lies the problem. Only handful of storage engines have the capability to handle multiple, disparate writes as a single unit, such as a relational databases and only a handful of NoSQL engines support multiple writes as part of a transaction. This severely limits our ability to choose a technology to better fit our needs.

The new model, by focusing on the concept of a commit, allows the implementation to be much simpler and allows development and operational teams to select almost any storage engine available based upon need and applicability. In essence, a commit is a collection of events resulting from a unit of work. But a commit need not consist of just a series of events. It may also consist of metadata or other attributes that have business or operational significance such as date and time of the commit, IP address and user agent of the user as forwarded by the web server (or client application), as well as the actual command that was issued to the aggregate which caused the events to be generated.

Ancillary concepts of the new model include snapshots along with a corresponding snapshotter as well as a dispatcher which is responsible for notifying interested parties of the commits that have just occurred. The dispatcher deserves additional consideration because it also facilitates and enables choice among storage engine solutions. In a typical model, a message is received and events are generated. But a significant problem arises when those events are then persisted durably and simultaneously dispatched to interested parties. This creates what is known as a two-phase commit (2PC) which are often very expensive. Besides the additional latency incurred by having a two-phase commit, a fundamental problem is that only a few select storage engines and queuing systems are capable of participating as cohorts in the commit. This, again, limits our technological choices. By eliminating 2PC, we are able to utilize virtually any queuing infrastructure, including Amazon SQS, Azure Queues, RabbitMQ, ZeroMQ, etc.

The dispatcher liberates us from 2PC by allowing asynchronous dispatching of committed events. In fact, the commit and dispatcher concepts have a harmonious relationship. Unlike a simple set of events, the commit concept carries with it all metadata and context forward to the dispatcher. This allows the dispatcher to append metadata and other pertinent header information when publishing or dispatching the events held by the commit. If the dispatcher did not have all of the corresponding metadata from the commit and simply received a series of events to dispatch, recipients of the dispatched messages could be missing valuable context that might have otherwise enabled business decision-making processes.

Storage Engine Choices and Implementation Considerations

Several of the interfaces and programming contracts in the EventStore have been influenced by the need to keep a consistent, non-leaky abstraction of storage and queuing infrastructures. At the same time, it must be recognized and understood that different technological choices and implementations have different strengths and weaknesses. There are two critical examples of this that should be understood.

The Dispatcher Interface

The contract of the dispatcher is such that, as a commit is fully dispatched, it notifies the underlying storage of that fact so that it can be marked as dispatched. There is a small possibility during failure scenarios where a set of events may be dispatched but not yet marked as such. This may result in the same set of messages being redispatched. In a message-oriented system, the concept of at-least-once delivery is always something that should be considered and this situation is no different. Message consumers should be made idempotent (potentially by tracking a list of previous messages) to avoid duplicate incoming messages. Again, the trade-off that we gain by avoiding 2PC is the ability to utilize different storage and queuing systems.

Furthermore, in a relational database or some other kind of local, yet transactional system, it would seem logical to number all commits, regardless of stream identifier, with an order-preserving, monotonically increasing number—an auto-increment column of sorts. Then the dispatcher could simply dispatch all events for a given commit, mark the index as dispatched and continue to the next. Indeed, Greg Young’s paper on this very subject recommends this method and it is a solution that works very well. Nonetheless, our purpose here is to understand how the varying capabilities of storage systems, which may or may not be able to create auto-incrementing values, affects the programming contracts of the dispatcher and the EventStore in general.

Message Idempotency

One other core area where storage engine differences have affected the programming contracts slightly is that of duplicate commits. A duplicate commit occurs when a message is received twice and processed twice. With idempotent messages this is never a problem. But not all messages are idempotent. When utilizing a relational database as the storage engine for the EventStore, each commit is marked with a commit id based upon the identifier of the incoming message being handled. If an attempt is made to perform a commit resulting from a message that has already been handled, the relational database will throw an exception.

Not all storage engines have or enforce secondary indexes and their uniqueness. In fact, beyond relational databases, very few do. The simplicity of alternate storage engines results in additional implementation burden for the EventStore, but the burden is far from insurmountable. Yet this understanding has influenced the core interfaces utilized to call the EventStore and may be required to facilitate true message idempotency.

When loading a stream of events in order to process an incoming message, the EventStore will query the storage engine and load all the way from the previous snapshot, if any, until the most recent commit. It will then return what is known as a “committed event stream” which contains only the most basic information necessary to facilitate processing of the incoming message. Most incoming messages will be served using this call.

As an optimistic concurrency technique, incoming messages can be marked using the last known version (or revision) of an aggregate (a materialized stream). By leveraging this technique, we can create idempotency when using a NoSQL solution. Our purpose here is to avoid a duplicate commit, but most NoSQL solutions don’t enforce secondary indexes. Hence we have structured the calling API to elegantly address this issue. When the calling application detects that a command has been issued against a previous version of an aggregate, the calling application can invoke the EventStore and request all commits for a particular stream where the revision is at least the version indicated on the command. The EventStore obediently responds by retrieving all commits. On each commit is found a value which uniquely identifies it. This is how we can detect and prevent duplicate commits.

When the application calls and receives a set of all commits since the message version, we are guaranteed that if the message has already been processed, the corresponding commit will have been retrieved from storage. As a result, the EventStore simply identifies and tracks all commits being retrieved. Then, if a write is attempted to the same stream using a commit that has been identified, the EventStore will throw an exception thus maintaining the integrity of the stream.

While portions of the API may not be completely intuitive at first glance, they have been designed to fully encapsulate any underlying storage engine infrastructure preventing the implementation from leaking through the abstraction.

The Stream Document and Commit Sequence

In a typical event store, the stream or “aggregate” document or database row, is used to maintain optimistic concurrency. Such is not the case with the EventStore. Instead each commit is given an application-assigned numerically increasing value. In many ways this value is similar to an aggregate version. This allows optimistic concurrency to be maintained regardless of storage implementation choices.

The problem with, and significant strength of, most NoSQL solutions is that multiple writes are considered to be separate commits. Because we are creating a commit as a single write, it takes an additional write to update the associated stream document to indicate the correct version of the stream. But this isn’t necessarily a problem because the stream document (or aggregate row) has become a heuristic that enables us to determine if a snapshot should be taken rather than to know the version of the most recent commit. As such, the stream document’s revision value can be updated asynchronously and perhaps even by another process altogether. And if, for some reason, the value isn’t updated upon every single commit, that’s fine too because it doesn’t affect our ability to process messages.

Snapshots

Snapshots are performance optimization and can often be ignored altogether except in the systems where latency is mission critical. Snapshots are a materialization of the stream at a certain revision. The snapshot can then be consumed by an aggregate to bring it back to a known state before applying all events which have occurred since the snapshot. Snapshots, if required, should be handled either by an outside process or, on a minimum, a different thread to avoid blocking main message processing.

The one trouble with snapshots is the ability to query for and retrieve the most recent snapshot along with all new commits since the snapshot. Relational databases have no trouble with this, but NoSQL solutions typically have only rudimentary querying capabilities. As such the structure of where snapshots are placed and how they are queried may vary from implementation to implementation. In a relational database, the EventStore places the serialized snapshot alongside the commit, as a separate column on the row to which it applies. This could be done in other ways, such as only having a single snapshot per stream on the Streams table or having a separate Snapshots table altogether.

When using a NoSQL solution and implementing a storage engine adapter for the EventStore, it may be necessary to issue two queries to the storage infrastructure in order to retrieve the most recent snapshot and all events since that snapshot. Fortunately, this may not be a large concern as snapshots can be cached in memory.

Another possibility for NoSQL would be to insert the snapshot between commits in a stream using a semi-alphabetic key. This may not possible or even desirable in a SQL solution where advanced querying exists, but is actually quite practical in NoSQL. To do this, the document key should be “StreamId:CommitSequence” as normal, but this time, we append some kind of non-numeric value such as “Snapshot” to the key. This would have the effect of inlining the snapshot into the result set when a query was performed without the need for an additional query.

Eventual Consistency and Replication

One other area where the EventStore has a distinct advantage is that of eventual consistency in the store itself. Because the storage engine is not dictated, an engine such as CouchDB, Cassandra, or Riak could be utilized which support multi-master replication. One potential issue this could have is that of allowing concurrency violations to occur because two or more processes could write to commit #5 for a given stream, for example.

In a fully consistent store, this would be a problem and would actually result in an exception. But when leveraging a store such as CouchDB or a Dynamo clone such as Riak, we can take advantage of the fact that commits are dispatched asynchronously to correct the problem. When the dispatcher goes to dispatch the commit, it could perform a consistent read from its peers and determine if anyone else has written again to that particular commit. If so, it would deterministically pick a commit to dispatch as the winner and it would then “revoke” the other commit by providing it back to the application. The application could then extract from the metadata the underlying command(s) that were issued and re-enqueue them to be processed again. In this way, no command messages are lost, and we maintain a fully consistent event store but still have eventual consistency.

Conclusion

While there are other event stores to choose from with varying degrees of community and maturity the EventStore as outlined herein aims to be a first-class choice due to its focus on simplicity and how it enables implementers to utilize technologies with which they are already comfortable and proficient rather than forcing and dictating the use of a relational database.