CQRS: Sagas with Event Sourcing (Part II of II)

In my first post, I explained a little bit about how sagas can be leveraged to deal with the problem of nested transactions—”transactions” that span more than a single message.  There were a few community questions related to me redefining the concept of a saga.  That’s definitely not what I’m trying to do.  Per my understanding, a saga sits there and handles multiple messages and exists to coordinate multiple distinct transactions and perform compensating actions relative to the entire “transaction” as a whole.  While by definition a saga may not be responsible for message ordering and de-duplication of messages, as a side-effect of implementation it can handle those things without difficulty.  If anything, the way I’m utilizing  sagas is perhaps somewhat too narrow—a selected subset of the overall meaning and capability of a saga.  Regardless, the main concern herein is how to apply event sourcing as the persistence mechanism for sagas.

One final thought before we look at some code: why should we even bother using event sourcing for sagas?  Why not just persist the state and be done with it?  That’s a great question.  In fact, there isn’t anything wrong with persisting the state and moving forward, per se.  But as we have seen when event sourcing is applied to aggregates, it gives us, as programmers, the ability to re-evaluate our model, adapt it, and even re-implement it in a completely different way.  Furthermore, in the community there have been various examples of the benefits of testing aggregates through watching the events that are persisted after handling a command.  Anyone who has done this can see that testing in this way is much more natural and it avoids the brittle tests that often exist when doing purely state-based testing against aggregates.  This same idea can be applied to sagas.  In essence, applying event sourcing to sagas allows our sagas to be more agile.

Okay…now the code.  How do we leverage event sourcing to implement a saga?  It’s actually not that bad.  Let’s examine at what ISaga might look like:

public interface ISaga{    Guid Id { get; }    long Version { get; } // for optimistic concurrency

    void Transition(object message);

    ICollection GetUncommittedEvents();    void ClearUncommittedEvents();

    ICollection GetUndispatchedMessages();    void ClearUndispatchedMessages();}

This should look somewhat similar to the interface of an aggregate root that utilizes event sourcing.  The primary difference is that we now have an additional set of methods related to getting and clearing undispatched messages.

When a message arrives to be processed it is routed into the appropriate message handler whose “Handle” would look something like this:

public void Handle(OrderReceivedEvent message){    var sagaId = message.OrderId; // purchase correlation    var saga = repository.GetById(sagaId);    saga.Transition(message);    repository.Save(saga);}

Granted, this could potentially be inlined into the saga itself, but for our purposes hear, I am choosing to keep the concept of handling a message separate from giving it to the saga.

We’re not attempting to use any kind of unit of work in this example.  We simply get a message, provide it to the saga, and save the saga using a repository.

The Transition method of such a saga is fairly straightforward:

void ISaga.Transition(OrderReceivedEvent message){   orderReceived = true;   orderTotal = message.Total;   stateMachine.Fire(Trigger.OrderReceived);

   // to avoid duplication, these should be in a base class   version++;   uncommittedEvents.Add(message);}

Typically the base class would be responsible for receiving event messages from the caller (in this case the bus message handler) and then routing it either dynamically or statically to the specific Transition method capable of working with the specific event message.

Be aware that we’re not using a 100% pure state machine here.  There are two member variables—“orderReceived” and “orderTotal”.  These variables give the saga a little bit of “memory”.  We will want those variables later.  “orderReceived” is used to help us determine if the state machine can transition to a completed state, while “orderTotal” will be used on the resulting command message once we transition to a Completed state.

(Implementation note: Rather than having separate boolean variables to track each message that is received, you can use an enum and concatenate it using bitwise operations, e.g. orderProgress = orderProgress | Progress.OrderReceived).

State Machines, Transitions, and Memory

In multi-message scenarios, especially as the number of messages being handled grows, it becomes increasingly difficult to use a pure state machine to know where things are without having some kind of “memory”.  This is like the “vending machine state machine” problem.  In that problem, every single pathway for every possible permutation of coin must be explicitly modeled up to the price of the item.  It gets nasty fast, especially as the different kinds of coins such as nickels and pennies are accepted and the price of the item increases.

By utilizing a little bit of “memory”, we are able to side-step a large amount of complexity to keep things simple.  This is where the “orderReceived” variable comes into play in the sample code above.  By using it along with other memory variables, we can leverage the best parts of a state machine:

stateMachine.Configure(OrderState.Open)    .PermitIf(Trigger.OrderReceived, OrderState.Completed,        () => orderReceived && paymentReceived);

The above code is an example of a state machine called Stateless, which is still actively developed as compared to SimpleStateMachine which that author has basically abandoned.  The above code says that when we are in the Open state and the OrderReceived trigger occurs, transition to the “Completed” state when the order has been received AND payment has been received.

When we transition to the Completed state is when something observable happens:

stateMachine.Configure(OrderState.Completed)     .OnEntry(OnCompleted); 

.... 

private void OnCompleted() {     undispatchedMessages.Add(new DoSomethingInterestingCommand     {         OrderId = orderId,         Total = orderTotal,         ...     }); }

In the above code, we have configured the state machine to invoke the OnCompleted event when the state transitions to Completed.  When that occurs, we add a new message to dispatch into our list of undispatched messages.  Later, during the repository “Save” operation, we’ll perform the actual dispatch.

Message Idempotency

In this above example, no matter how many times the OrderReceived message is handled, it will only result in the state transition to Completed once.  For messages that increment/decrement values, such as CashDeposited and the like, we could easily keep track of the identifier for the message and only increment the value if the message hasn’t been handled before.

Persistence

To load the saga from persist storage, we grab the committed events and transition a new instance of the particular type of saga:

public TSaga GetById(Guid sagaId) where TSaga : class, ISaga, new(){    var saga = new TSaga(); // can be done in different ways

    var stream = this.eventStore.Read(sagaId);    foreach (var @event in stream.Events)        saga.Transition(@event);

    saga.ClearUncommittedEvents();    saga.ClearUndispatchedMessages();    return saga;}

To persist a saga, we do the following:

public void Save(ISaga saga){    var events = saga.GetUncommittedEvents();    eventStore.Write(new UncommittedEventStream    {        Id = saga.Id,        Type = saga.GetType(),        Events = events,        ExpectedVersion = saga.Version - events.Count    });

    foreach (var message in saga.GetUndispatchedMessages())        bus.Send(message); // can be done in different ways

    saga.ClearUncommittedEvents();    saga.ClearUndispatchedMessages();}

In case of an optimistic concurrency exception, our messaging infrastructure will re-deliver the message and we would rebuild the saga, but this time with the committed events from the other, competing process that caused the concurrency exception.  Then, we would re-apply the message that was the victim of the concurrency exception.

What About “Now”?

One quirk related to event sourcing is the concept of “now”.  Because you’re rebuilding the saga from scratch every time, you can’t persist what “now” is.  If you use DateTime.UtcNow just anywhere inside of your saga, it can result in subtle bugs because DateTime.UtcNow changes each time you rebuild the saga from the events.  A rule of thumb is to ensure that each incoming event message always carries with it the concept of time, e.g. when the order was accepted.  Any timeout messages that occur can also carry the time at which they timed out.  Lastly, any messages that are going to be dispatched can get DateTime.UtcNow.

Conclusion

Why use event sourcing for sagas?  Options.  Just like event sourcing for aggregates opens up a world of possibilities, it can do the same thing with sagas.  Even if you don’t buy into the idea of using event sourcing with sagas, do try to understand the importance of sagas as a pattern and where they fit in a messaging-oriented world.

I have applied the above concepts, including ISaga, SagaBase, and SagaRepository, into my CommonDomain project on GitHub.  Perhaps this isn’t quite the best place for the code, but I wanted to get something out for people to see.

  • James Bradt

    Jonathan,

    Well written post. I have a question.

    How would you envision how the state machine would handle the concept of message timeouts? (ex: If the DoSomethingInterestingCommand message is lost)

  • http://www.blogger.com/profile/16836313591238262040 Jonathan Oliver

    James,

    Excellent question. If you're using something like NServiceBus, you can send a timeout message. When the timeout elapses the timeout manager will send the message back to you.

    At that point, just have your saga handle the timeout and take appropriate action.

    If you're not using NServiceBus, you'll have to create something like this yourself, but it's not terribly difficult.

  • seagile

    Do you consider the undispatched commands part of the saga state? From your code, I derive you don't. But what if one of those commands fails? Who's responsible for tracking the command that failed?

  • http://www.blogger.com/profile/16836313591238262040 Jonathan Oliver

    @seagile,

    A saga will only dispatch a command as a result of a state transition. Once it has been dispatched, the saga is in a state where it is aware that the dispatched command has been sent. In other words, it knows or remembers what was sent. If a command fails, it needs to be made aware of that failure through some kind of fault message.

    Your question is probably more related to the tracking of correlation identifiers within the saga so that you can have a single "command rejected" fault message that can be tied back to the command that was sent using the correlation ID, correct?

  • seagile

    No, it was really about tracking the entire command message. How is the saga to track the command identifiers if the only thing it uses to rebuild state are the events/faults it received? One could argue that there should be an alternate identifier (embedded in the event) that is used when issueing commands. But a correlation identifier and business identifier should not be mixed (see "Idempotency" in Hophe & Woolf's Enterprise Integration Patterns). Another approach could be to apply a state change – as an aggregate would do internally – which carries the correlation identifiers. But in that case it'd be inclined to just treat the produced commands as part of the saga changeset upon persistence. That way events, faults, and commands can get replayed to bring the saga back to the appropriate state.
    I'm really balancing between having specialized state change events (which encapsulate both the command correlation identifiers and the event that triggered the saga transition) to rebuild saga state, and considering the generated commands part of that state.

    Another thing, when a command gets sent from a saga, who is responsible for tracking it? The saga if it fails and the aggregate if it succeeds? But how will the saga be able to log the entire command message if it has already been sent? By putting all of its details back into the fault, or by logging it as part of the saga changeset in the first place (before it is sent)? The command plays a dual role, i.e. rebuilding state (in the form of correlation identifiers) and asking an aggregate to execute something in reaction to an event.

    I get the impression I'm confusing a MessageStore (which tracks all messages sent/received in messaging environment) with the role of an EventStore (or more appropriately a StreamStore).

    Anyway, will let this sink in some more (sorry for rambling on your blog) …

  • http://www.blogger.com/profile/10007606396724463142 DesDesDes

    I was trying to get sagas to in the CommonDomain project, I got into trouble because the saga has a streamid which is the same as a streamid from the aggregate root which generated the event.

    If you take the sample code you wrote here the saga id will be the orderid but it is very likely there will be an aggregateroot which produces a stream with the same id.

    I am thinking about two solutions. The first is adding string column to the commits table which holds the .net type name of the type generating the event. After that I would have to change the unique indexes to include this column. This would allow storing the same streamed twice. The second is adding a guid column to the commits table which holds an alternative search id to search for the event. After that I would add a unique index on that column so I can search fast. This would allow giving sagas their own streamid and still allowing me to search for a saga based on the orderid which is on the newly created column.

    I am leaning towards the second solution. The second solution would require fewer changes and allow me to create snapshots on sagas without any modification to the snapshot table.

    Do you have any advice in which road I should take?

  • http://www.blogger.com/profile/16836313591238262040 Jonathan Oliver

    @DesDesDes,

    There are two good solutions beyond what you mention. The first (and easiest) solution is to have a completely separate EventStore database for sagas. In this way, the events from the Sagas and Domains don't mix.

    The other possible solution is by using the "Correlation" pattern as defined in Enterprise Integration Patterns.

  • Pingback: CQRS: Sagas with Event Sourcing (Part I of II)