Monthly Archives: April 2011

CQRS: Out of Sequence Messages and Read Models

One of the tricks to message-based solutions is handling messages that come out of order.  In some instances the application code must be made resilient to this situation.  In other cases, it can be easier to re-sequence the messages and then apply each messages in sequence.  In one of my current systems this is the route we took—re-sequencing messages if they arrive out of order.

One of the easier possibilities is to have the read model poll the event store for new commits.  I dislike this solution because it doesn’t leverage the innate capabilities of push-based notifications and feels like a resource drain.  Nonetheless, it’s probably one of the easier ones to handle until you start dealing with scaling out the read model.  At that point, a different approach must be taken.

The solution that we use is to dequeue a message and to place it in a “holding table” until all messages with a previous sequence are received.  When all previous messages have been received we take all messages out of the holding table and run them in sequence through the appropriate handlers.  Once all handlers have been executed successfully, we remove the messages from the holding table and commit the updates to the read models.

This works for us because the domain publishes events and marks them with the appropriate sequence number.  Without this, the solution below would be much more difficult—if not impossible.

This solution is using a relational database as a persistence storage mechanism, but we’re not using any of the relational aspects of the storage engine.  At the same time, there’s a caveat in all of this.  If message 2, 3, and 4 arrive but message 1 never does, we don’t apply any of them.  The scenario should only happen if there’s an error processing message 1 or if message 1 somehow gets lost.  Fortunately, it’s easy enough to correct any errors in our message handlers and re-run the messages.  Or, in the case of a lost message, to re-build the read models from the event store directly.

One interesting and very positive side effect that we discovered when we implemented this solution was full message idempotency.  Basically we have a small “Sequence” table that gives us the most recent min/max message sequence received for a given aggregate.  Only when we have received a full sequence do we run the messages through the handlers.  If a message arrives more than once and it have already been handled, the sequence table’s “min” value would be greater than the current message’s “sequence” and we would know to drop the message.

The other caveat that we have yet to address is how to apply this pattern to a NoSQL solution.  It definitely would involve some additional overhead that we don’t need at this point.  The solution was quite straightforward to implement using pure SQL and would have been much more difficult using something like NHibernate.

How I Avoid Two-Phase Commit

In my last blog post I ranted about two-phase commit (2PC) and MSDTC.  This is going to be a quick post about a few techniques that I use.  The basic premise is that I break a two-phase commit apart into multiple transactions such that each is handled independently but with resiliency against failure scenarios.

There are a number of different methods and patterns that can be used to avoid 2PC and each method is different depending upon the application-level requirements.  The easiest one to reason about is when de-queuing a message, processing it, and then saving it to durable storage.  With MSDTC, the above is performed as a single transaction.  There are a number of issues with this approached as outlined in my previous post.

To avoid 2PC we must split the transaction apart and have one for the act of de-queuing the message and one for persisting to durable storage such as a relational database or even a NoSQL solution.  The argument that is typically raised with this approach is that there is a possibility that the message dequeue may occur more than once thus raising the possibility of duplicate messages being processed and the results stored.

Nested Transactions_thumb[4]

I have written extensively in my blog on the subject of message idempotency and the guarantees offered by messages queues of at-least-once delivery.  To sum up these previous posts, message queues by definition deliver a message at least once.  Cloud-based queues are even more fun because they have no concept of traditional, fully consistent transactions.  One of the easiest ways to handle idempotency is to keep track of the message identifiers for previously processed messages and then to drop messages that have already been handled.

As an interesting aside, sagas are naturally idempotent because they can be implemented as a state machine.  Let’s imagine that we were modeling a message-controlled MP3 player.  The user could dispatch “PushPlay” command 100 times and we could receive the message 1000 times (due to failure scenarios like network failures, etc.), but we would only transition to the “Playing” state once.  Even though “PushPlay” isn’t necessarily idempotent, the saga makes it so.

But what about a more complex scenario?  What if we must receive a message and then publish?  We can handle that by splitting out one more transaction.  Okay, great, we’ve been able to split things apart using separate transactions, but we still have to deal with failure scenarios.  For example, what happens if the message is received and written to durable storage (transactionally), but the outbound message dispatch fails?  These are critical failure scenarios that must be explicitly handled.

Three Transactions

In my applications I have things configured to where, once the application state has been written to durable storage, I then push the messages to be published onto the wire.  If that fails, it’s pretty simple to use a Circuit Breaker Pattern implementation to retry after a specified interval.  It also really depends the type of queue that you’re delivering to. For example, a local queue has much lower failure possibilities than a remote queue because the network has been removed from the picture—at least for delivery to the local queue.

In any case, if the message fails to dispatch because of power failure, etc., the durable storage has a list of messages it must dispatch when the application restarts.  At startup, I scan that list and dispatch the set of undispatched messages.  I then mark each message as dispatched in the database.

Interestingly enough, the above pattern is the exact once I use for my EventStore project.  A message is received (be it from a queue or even an RPC call) and the resulting work is committed as a unit to some kind of persistent storage.  Once successfully persisted, the EventStore hands off any resulting work in the form of messages to be dispatched to a “dispatcher”.  This dispatcher then pushes the messages onto the queue of your choice or can even make RPC calls on another thread.  Once the message is considered dispatched, it is marked as such against the durable storage. In the event of a failure scenario the message is never marked as dispatched and during the next application restart, those messages are pushed onto the wire.

Conclusion

What’s amazing to me about these patterns is how they enable choice.  That’s the bottom line for me.  I want to choose the technologies that fit based upon the requirements of the application—not the requirements of the infrastructure.  Infrastructure is there to serve the application, not the other way around.  All too often we start our application design by taking different components in our infrastructure as a given, little realizing the profound effects and demands that these infrastructure components place upon our application code.  This is one of the major factors behind vendor lock-in. But by switching things around, we can keep our infrastructure flexible with the premise that it exists only to serve the needs of the application.  And when a more effective infrastructure component comes along, we can make the switch without significant effort.

My Beef with MSDTC and Two-Phase Commits

I’m just not a fan of the Microsoft Distributed Transaction Coordinator.  I’ve tweeted and blogged a few times about it, but I’ve never really gone into why.  I should preface my remarks by saying that MSDTC does work—it will facilitate and coordinate distributed transactions using a two-phase commit (2PC) protocol.  I should also say also that much of what follows is a rant against 2PC and MSDTC is a casualty in the argument.  But like most things, it’s about tradeoffs.   Do you really know how much 2PC using MSDTC costs you?  Do you know what you’re giving up when you rely upon it and allow it to become an integral part of your system?

Now the details.  The biggest “beef” I have with MSDTC is lack of real support among the different products that exist.  To be fair, this isn’t really the fault of MSDTC.  For example, how many queuing solutions (beyond MSMQ) have you found that support not only transaction enlistment (when using TransactionScope), but promotion to a two-phase commit.  There are literally less than a handful of solution.  If you want to leverage cloud-based queuing systems—Microsoft Azure Queues, Amazon SQS, sorry, there isn’t any support.  You’ve gotta build it yourself and the model is fundamentally and diametrically opposed to the one used by MSDTC.

But what about database vendors?  Surely there are a number of good RDBMS solutions that support 2PC.  Sure, there are a some that support 2PC—but even fewer that have good driver implementations of transaction enlistment and promotion.  In the RDBMS world, you’ve got a few “real” choices for MSDTC support: Microsoft SQL Server, Oracle, IBM DB2, and perhaps a few others.  MySQL? Nope. PostgreSQL? Nope. Microsoft SQL Azure??? Not a chance.

Another beef that I have is a weird “edge case” that shouldn’t really ever happen except that it can and does and you need to be prepared for it.  There is a small window in a two-phase commit during which one of the “cohorts” may go offline causing the transaction to be “in doubt”.  I don’t know about you, but I like my transactions to be atomic.  It succeeded or failed.  In doubt?  Seriously?  That’s like giving a lock to a thread on some data and then having it corrupt the data when the thread aborts halfway through.

Even among software that does support 2PC on MSDTC there are weird bugs and issues that arise.  NHibernate would leak connections; MySQL would forget it was a cohort in a transaction during a server restart and rollback the transaction—even when other cohorts had already committed; there have even been a few small but critical bugs in RavenDB when participating in a distributed transaction.  2PC is hard.  There are lots of weird conditions and edge cases.  Do you really want to encounter an issue like this?  Making a single resource transactional in its own right isn’t terribly difficult—but when it has to participate and collaborate with other resources things become exponentially more difficult.

So far, most of the issues that we’ve discussed are specific to distributed transactions in general and MSDTC is a helpless victim.  So let’s talk about MSDTC quirks specifically.  First and foremost, when working on a single machine MSDTC configuration and setup is a piece of cake.  Just make sure the service is running and you’re good to go.  But once you choose to *distribute* across multiple machines it’s another story altogether.  There are a number of steps that you must follow and missing one results in cryptic error messages.  Microsoft has even released a few tools to help diagnose issues related to MSDTC configuration.  You also must be sure that each MSDTC instance can authenticate with the others—above and beyond the authentication required to connect to each durable resource (database, message queue, etc.).  This means either running in a Windows domain environment or synchronizing user accounts across machines.

The next issue is that of interoperability.  One of the fallacies of distributed computing is that the network is homogenous.  Do all of your servers run Windows?  Do you have database servers running on Linux or some other operating system?  Technically speaking MSDTC does support the “XA transactions”, but I’m not convinced that all parties involved in a distributed transaction are on speaking terms with MSDTC.  If you avoided 2PC altogether, this is a non-issue.  You could execute transactions against whatever resource using whatever operating system you choose.

What about if your software was written in .NET and you wanted to run on Mono?  Good luck.  Distributed transactions are not supported–MSDTC is Windows only.  Without rewriting portions of your software, you can’t move over to Mono.  In other words, reliance upon MSDTC has restricted the ability to choose the best technologies that best meet our needs.

The last issue is about weighing the costs involved.  How much more “expensive” is a distributed transaction as compared with a simple lightweight transaction?  MSDTC transaction “escalation” can be very expensive in terms of the latency involved because latency *isn’t* zero and because of the number of round trips involved in synchronizing all of the cohorts.  Furthermore, MSDTC incurs significant overhead by merely acting as the intermediary, middleman, or broker required to coordinate everything.

Conclusion

For me, the cost of using MSDTC is unacceptably high and I have found ways to achieve the same end result without paying for the overhead incurred by MSDTC.  As a result, I have a significant amount of flexibility and freedom in all of my technology choices.  This freedom gives me a “swapability” and portability not achievable by someone that is shackled with awful chains of MSDTC.  By default most use MSDTC because it’s “easier” but be aware what you’re giving up because you’re giving up a lot when you use it.

In my next post I will go over the various techniques and methods I have used to avoid a reliance upon distributed transactions.  Most of the techniques are very simple and can be implemented without a lot of overhead thus adding transparency to your system while avoiding the magic black box of MSDTC.

ILMerge Gotcha

Just remember that when two assemblies have internalized a reference to another assembly using ILMerge, each gets their own “copy” of static variables.  That is to say, that the class MyClass1.SomeStaticInstance is no longer the same between the two assemblies.  I was kicking against this one for the better part of an hour.  At first I thought it was some quirk with the [ThreadStatic] attribute I was using.  It wasn’t.