CQRS: Out of Sequence Messages and Read Models

One of the tricks to message-based solutions is handling messages that come out of order.  In some instances the application code must be made resilient to this situation.  In other cases, it can be easier to re-sequence the messages and then apply each messages in sequence.  In one of my current systems this is the route we took—re-sequencing messages if they arrive out of order.

One of the easier possibilities is to have the read model poll the event store for new commits.  I dislike this solution because it doesn’t leverage the innate capabilities of push-based notifications and feels like a resource drain.  Nonetheless, it’s probably one of the easier ones to handle until you start dealing with scaling out the read model.  At that point, a different approach must be taken.

The solution that we use is to dequeue a message and to place it in a “holding table” until all messages with a previous sequence are received.  When all previous messages have been received we take all messages out of the holding table and run them in sequence through the appropriate handlers.  Once all handlers have been executed successfully, we remove the messages from the holding table and commit the updates to the read models.

This works for us because the domain publishes events and marks them with the appropriate sequence number.  Without this, the solution below would be much more difficult—if not impossible.

This solution is using a relational database as a persistence storage mechanism, but we’re not using any of the relational aspects of the storage engine.  At the same time, there’s a caveat in all of this.  If message 2, 3, and 4 arrive but message 1 never does, we don’t apply any of them.  The scenario should only happen if there’s an error processing message 1 or if message 1 somehow gets lost.  Fortunately, it’s easy enough to correct any errors in our message handlers and re-run the messages.  Or, in the case of a lost message, to re-build the read models from the event store directly.

One interesting and very positive side effect that we discovered when we implemented this solution was full message idempotency.  Basically we have a small “Sequence” table that gives us the most recent min/max message sequence received for a given aggregate.  Only when we have received a full sequence do we run the messages through the handlers.  If a message arrives more than once and it have already been handled, the sequence table’s “min” value would be greater than the current message’s “sequence” and we would know to drop the message.

The other caveat that we have yet to address is how to apply this pattern to a NoSQL solution.  It definitely would involve some additional overhead that we don’t need at this point.  The solution was quite straightforward to implement using pure SQL and would have been much more difficult using something like NHibernate.

  • http://www.blogger.com/profile/13094952618057460795 CARFIELD

    HI, so you store the whole message in single column? Just wonder why not separate individual attributes and store in individual column?

  • http://www.blogger.com/profile/16836313591238262040 Jonathan Oliver

    I just store the payload of each message as a blob and then deserialize all the messages when they have been resequenced. At that point, I just each through the appropriate handler(s).

  • James

    Jonathan,

    A few thoughts/questions:

    1) I don't recall off the top of my head. Does the EventStore have the capability to provide sequence numbers?

    2) How would we deal with a read model (or other subscriber) that was only concerned with specific messages dispatched from the commit stream. Would the subscriber need to listen for all messages? Otherwise, it seems the sequence number strategy would be fragile.

    3) How would we deal with a read model (or other subscriber) that is receiving messages from multiple different sources of commit streams and the read model 'needs' to have temporal order across all streams? I'm wonder if this is a situation where using a saga is appropriate. Also, it may be a good indication that further analysis is required. (referring to Udi's 'Race Conditions Don't Exist" post http://www.udidahan.com/2010/08/31/race-conditions-dont-exist/ )

  • http://www.blogger.com/profile/16836313591238262040 Jonathan Oliver

    James,

    Excellent questions. The EventStore pushes the events to NServiceBus (or whatever dispatching mechanism you use). This dispatching mechanism can append sequence numbers if you choose based upon the metadata provided by the EventStore:
    http://stackoverflow.com/questions/5564130/eventstore-nservicebus-setup

    In our case the read models get everything from the domain. Further our read models only ever listen to the domain which "owns" them. They don't listen to messages from other domains.

    Sagas are by far the best way to handle the supposed "race conditions". We just wanted to something lightweight and fast that was handled exclusively within the read model.

  • http://www.blogger.com/profile/05012641151191187213 devirr

    I think a posible solution using redis could be. A key/value pair for the version, some key/value pairs for messages and a sorted set for the sequence.
    key convensions:
    - versionKey = "version:@aggregateId"
    - msgKey = "msg:@aggregateId:@version"
    - seqKey = "seq:@aggregateId"

    pseudo code:

    public class ReSequenceHandler {
    IMapHandlers handlerMapper;

    public ReSequenceHandler(IMapHandlers handlerMapper) {
    this.handlerMapper = handlerMapper;
    }
    public void Handle(message msg) {
    int currentVersion = redis.Get(msg.AggregateId.ToVersionKey());

    if(msg.SeqNo == currentVersion + 1) {
    IList heldMsgVersions = redis.ZRange(msg.AggregateId.ToSeqKey(), msg.SeqNo, int.MaxValue);
    IList heldMsgs = redis.MGet(heldMsgVersions.ToMsgKeys(msg.AggregateId)).OrderBy(m => m.SeqNo);
    (msg + heldMsgs).ForEach(m => handlerMapper.ByType(m.GetType()).Handle(m));
    redis.Del(heldMsgs.ToMsgKeys() + msg.AggregateId.ToSeqKey());
    redis.Set(msg.AggregateId.ToVersionKey(), msg.SeqNo);
    }
    else {
    redis.Set(msg.AggregateId.ToMsgKey(), msg);
    redis.ZAdd(msg.AggregateId.ToSeqKey(), msg.SeqNo, msg.SeqNo);
    }
    //outer handler should acknowledge message
    }
    }