CQRS: Sagas with Event Sourcing (Part II of II)

In my first post, I explained a little bit about how sagas can be leveraged to deal with the problem of nested transactions—"transactions" that span more than a single message. There were a few community questions related to me redefining the concept of a saga. That's definitely not what I'm trying to do. Per my understanding, a saga sits there and handles multiple messages and exists to coordinate multiple distinct transactions and perform compensating actions relative to the entire "transaction" as a whole. While by definition a saga may not be responsible for message ordering and de-duplication of messages, as a side-effect of implementation it can handle those things without difficulty. If anything, the way I'm utilizing sagas is perhaps somewhat too narrow—a selected subset of the overall meaning and capability of a saga. Regardless, the main concern herein is how to apply event sourcing as the persistence mechanism for sagas.

One final thought before we look at some code: why should we even bother using event sourcing for sagas? Why not just persist the state and be done with it? That's a great question. In fact, there isn't anything wrong with persisting the state and moving forward, per se. But as we have seen when event sourcing is applied to aggregates, it gives us, as programmers, the ability to re-evaluate our model, adapt it, and even re-implement it in a completely different way. Furthermore, in the community there have been various examples of the benefits of testing aggregates through watching the events that are persisted after handling a command. Anyone who has done this can see that testing in this way is much more natural and it avoids the brittle tests that often exist when doing purely state-based testing against aggregates. This same idea can be applied to sagas. In essence, applying event sourcing to sagas allows our sagas to be more agile.

Okay…now the code. How do we leverage event sourcing to implement a saga? It's actually not that bad. Let's examine at what ISaga might look like:

public interface ISaga
{
Guid Id { get; }
long Version { get; } // for optimistic concurrency

void Transition(object message);

ICollection GetUncommittedEvents();
void ClearUncommittedEvents();

ICollection GetUndispatchedMessages();
void ClearUndispatchedMessages();
}


This should look somewhat similar to the interface of an aggregate root that utilizes event sourcing. The primary difference is that we now have an additional set of methods related to getting and clearing undispatched messages.



When a message arrives to be processed it is routed into the appropriate message handler whose "Handle" would look something like this:



public void Handle(OrderReceivedEvent message)
{
var sagaId = message.OrderId; // purchase correlation
var saga = repository.GetById(sagaId);
saga.Transition(message);
repository.Save(saga);
}


Granted, this could potentially be inlined into the saga itself, but for our purposes hear, I am choosing to keep the concept of handling a message separate from giving it to the saga.



We're not attempting to use any kind of unit of work in this example. We simply get a message, provide it to the saga, and save the saga using a repository.



The Transition method of such a saga is fairly straightforward:



void ISaga.Transition(OrderReceivedEvent message)
{
orderReceived = true;
orderTotal = message.Total;
stateMachine.Fire(Trigger.OrderReceived);

// to avoid duplication, these should be in a base class
version++;
uncommittedEvents.Add(message);
}


Typically the base class would be responsible for receiving event messages from the caller (in this case the bus message handler) and then routing it either dynamically or statically to the specific Transition method capable of working with the specific event message.



Be aware that we're not using a 100% pure state machine here. There are two member variables—"orderReceived" and "orderTotal". These variables give the saga a little bit of "memory". We will want those variables later. "orderReceived" is used to help us determine if the state machine can transition to a completed state, while "orderTotal" will be used on the resulting command message once we transition to a Completed state.



(Implementation note: Rather than having separate boolean variables to track each message that is received, you can use an enum and concatenate it using bitwise operations, e.g. orderProgress = orderProgress | Progress.OrderReceived).



State Machines, Transitions, and Memory



In multi-message scenarios, especially as the number of messages being handled grows, it becomes increasingly difficult to use a pure state machine to know where things are without having some kind of "memory". This is like the "vending machine state machine" problem. In that problem, every single pathway for every possible permutation of coin must be explicitly modeled up to the price of the item. It gets nasty fast, especially as the different kinds of coins such as nickels and pennies are accepted and the price of the item increases.



By utilizing a little bit of "memory", we are able to side-step a large amount of complexity to keep things simple. This is where the "orderReceived" variable comes into play in the sample code above. By using it along with other memory variables, we can leverage the best parts of a state machine:



stateMachine.Configure(OrderState.Open)
.PermitIf(Trigger.OrderReceived, OrderState.Completed,
() => orderReceived && paymentReceived);


The above code is an example of a state machine called Stateless, which is still actively developed as compared to SimpleStateMachine which that author has basically abandoned. The above code says that when we are in the Open state and the OrderReceived trigger occurs, transition to the "Completed" state when the order has been received AND payment has been received.



When we transition to the Completed state is when something observable happens:



stateMachine.Configure(OrderState.Completed) 
.OnEntry(OnCompleted);

....

private void OnCompleted()
{
undispatchedMessages.Add(new DoSomethingInterestingCommand
{
OrderId = orderId,
Total = orderTotal,
...
});
}


In the above code, we have configured the state machine to invoke the OnCompleted event when the state transitions to Completed. When that occurs, we add a new message to dispatch into our list of undispatched messages. Later, during the repository "Save" operation, we'll perform the actual dispatch.



Message Idempotency



In this above example, no matter how many times the OrderReceived message is handled, it will only result in the state transition to Completed once. For messages that increment/decrement values, such as CashDeposited and the like, we could easily keep track of the identifier for the message and only increment the value if the message hasn't been handled before.



Persistence



To load the saga from persist storage, we grab the committed events and transition a new instance of the particular type of saga:



public TSaga GetById(Guid sagaId) where TSaga : class, ISaga, new()
{
var saga = new TSaga(); // can be done in different ways

var stream = this.eventStore.Read(sagaId);
foreach (var @event in stream.Events)
saga.Transition(@event);

saga.ClearUncommittedEvents();
saga.ClearUndispatchedMessages();
return saga;
}


To persist a saga, we do the following:



public void Save(ISaga saga)
{
var events = saga.GetUncommittedEvents();
eventStore.Write(new UncommittedEventStream
{
Id = saga.Id,
Type = saga.GetType(),
Events = events,
ExpectedVersion = saga.Version - events.Count
});

foreach (var message in saga.GetUndispatchedMessages())
bus.Send(message); // can be done in different ways

saga.ClearUncommittedEvents();
saga.ClearUndispatchedMessages();
}


In case of an optimistic concurrency exception, our messaging infrastructure will re-deliver the message and we would rebuild the saga, but this time with the committed events from the other, competing process that caused the concurrency exception. Then, we would re-apply the message that was the victim of the concurrency exception.



What About "Now"?



One quirk related to event sourcing is the concept of "now". Because you're rebuilding the saga from scratch every time, you can't persist what "now" is. If you use DateTime.UtcNow just anywhere inside of your saga, it can result in subtle bugs because DateTime.UtcNow changes each time you rebuild the saga from the events. A rule of thumb is to ensure that each incoming event message always carries with it the concept of time, e.g. when the order was accepted. Any timeout messages that occur can also carry the time at which they timed out. Lastly, any messages that are going to be dispatched can get DateTime.UtcNow.



Conclusion



Why use event sourcing for sagas? Options. Just like event sourcing for aggregates opens up a world of possibilities, it can do the same thing with sagas. Even if you don't buy into the idea of using event sourcing with sagas, do try to understand the importance of sagas as a pattern and where they fit in a messaging-oriented world.



I have applied the above concepts, including ISaga, SagaBase, and SagaRepository, into my CommonDomain project on GitHub. Perhaps this isn't quite the best place for the code, but I wanted to get something out for people to see.