When using non-transactional queues processing a message is trivially easy—just handle the message and write to your data store. If the process dies and we lose a message, that's okay. But when using a transactional queue things can become more difficult because we want our infrastructure to ensure consistency between our message queues and application database. I have already outlined the trouble of using two-phase commits in a previous post.
What if we were able to handle the inconsistency between our message queue and application data store at an application rather than infrastructure level? What if we embraced receiving duplicate messages? What if our application had the ability to make message processing idempotent, despite the message not being idempotent? What if we were able to pick and choose the messages that that became idempotent on a per-message type basis?
A Typical Example
Let's go through a typical message processing example to see how this might work.
When everything is working properly we:
- Receive a message from the queue.
- Process the message.
- Save any changed application state to our data store and call Bus.Publish.
- Publish resulting messages.
- Message transactionally removed from the message queue.
But what about failure conditions? This is exactly the type of thing that transactions are designed to handle. How can we maintain application integrity without 2PC? Let's consider a few failure conditions.
When the database transaction fails:
- Receive a message from the queue.
- Process the message.
- Write the changes to the database.
- Fail to commit, e.g. power outage, process restart, etc.
When the process comes back online, it's going to pick up right where it left off and the application and message queue will be in a consistent state.
Now let's examine a failure condition where the database commits but resulting messages failed to publish because of a power or hardware failure.
- Receive the message from the queue.
- Process the message.
- Save application state and call Bus.Publish.
- Commit application state (inside of a separate transaction).
- Power failure.
Herein lies the problem. When the process comes back up, it's going to reprocess the message…but it's already been processed. How do we handle that?
The Solution
What if we were to keep a log of all received messages (at least their identifiers) along with all bus activity (Publish, Send, Return, Reply) associated with the processing of that message? What if we stored that list as part of our application state during the commit operation? Doing so would allow us to check if a message was already processed. If it was already processed, we could get the associated messages that resulted from processing and redispatch them. It may be worth noting that event sourcing already stores all of the changes that resulted from processing a message. In that scenario you're already saving the data, now it's just a small initial check before processing.
What I have outlined above is actually a well-understood pattern and some message queue providers even provide database hooks to facilitate message de-duplication. Pat Helland who worked at Amazon and Microsoft wrote a really good paper on this exact same topic a few years back. One critical difference between what Pat proposes vs. what the message queue providers do is that Pat always works on local copies of data. In other words, the boundary of each transaction is against a single machine or tightly bound cluster of machines. This rings true to me because it is harmonious with SOA principles.
I recently created a proof of concept around these very same principles and I call it "IdempotentConsumer". It's an NServiceBus message handler along with associated application services that facilitate de-duplication of a message before handing it off. In my proof of concept I am relying on two concepts—that each logical business message can be uniquely identified by a business identifier specific to the message and that each message is addressed to a single business entity—like an aggregate root. Plug-ins could be written to work with alternative storage engines.
UPDATE: IdempotentConsumer has been updated to utilize more optimistic styles of concurrency. Rather than checking each message that arrives before handing it off to the application, we instead assume that messages are not duplicate and handle any optimistic concurrency exceptions raised by our data store.
NoSQL
While the above works great when you have a resource that supports transactional integrity in the form of TransactionScope, it becomes more difficult when working with NoSQL storage engines, such as CouchDB, Cassandra, Riak, etc.
In a relational DB we can perform multiple writes to various tables and have all writes be transactional. In NoSQL the boundary of a transaction is a single write. Multiple writes means multiple transactions. Granted, CouchDB does support batching, but you lose some consistency checks when doing a batch write to multiple documents simultaneously, such as the ability to raise an optimistic concurrency exception if you're not writing using the latest version of a particular document.
So how do we write the list of published messages resulting from the message currently being processed and maintain transactional integrity with the changes in application state? Easy.
Write the list of messages to be published as a result of processing the incoming message. Once that transaction is complete, write the associated document containing the changes in application state--but include the ID of the message being processed *inside* the document. When we query to obtain the list of messages we do a server-side or client-side MapReduce of sorts to effectively "join" the list of published messages with the associated parent message ID found in the application state document/row. If the application state contains the ID found in the list of messages, redispatch the stored messages and we're done. If it doesn't, delete the list of published message—they were never published because a failure occurred prior to writing the application state record.
Because we are doing this on a per-message type, we can customize the way we query each business object—we can have a standard way of querying most and then customize the way we query specific ones depending upon how they store the parent message ID. We can vary the way we query based upon the document storage structure or even storage engine.
Conclusion
The biggest advantage of what I have elaborated is that we can effectively simulate a two-phase commit among multiple resources without the overhead of a 2PC. These allows us to use storage engines whose drivers don't support two-phase, e.g. MySQL, PostgresSQL, or virtually all NoSQL implementations. Furthermore the receipt and publishing of a message is still transactional, it's just a separate transaction from the storage of application state. As I mentioned previously those that do event sourcing effectively get this for free, but a small amount of application service code must be written to facilitate it.
One big argument against the above is that we're doing lots of unneeded queries to determine if we should even bother to process the message. Yes, that's true, but we should only ever be querying a local storage (truly local or inside of our cluster). [UPDATE: The previous argument is nullified by using optimistic concurrency as noted in the previous UPDATE above.] Next, we don't need to implement this for every message type. You are free to make your messages inherently idempotent. You are also free to use safe, slow two-phase commit if the situation requires or operational environment allows.
These same patterns can be applied even when using non-transactional resources, such as sending an email. The only quirk here is that there may be a one in a million scenario where we publish the same email twice. But in that case, it's not really a big deal if someone gets duplicate emails.