Monthly Archives: April 2010

Message Idempotency Patterns and Disk Usage

One of the main concerns that may be expressed in reviewing my previous blog post about idempotent messaging patterns is that of disk usage.  The concern is about retaining every. single. message we ever publish.  That’s a lot of messages.  That’s a lot disk space.

Before we dig into one plausible solution, let’s first look at a few mitigating factors.  First, disk space is cheap.  Really cheap.  We’re moving into the sub-$100 for 2 TBs of disk space.  Utilizing different disk-saving techniques such as efficient (and even custom) serialization and compression we can easily store more messages than a lot of systems ever dream of sending.  By further breaking down our code into business services and business components, we have logically separated and can thus physically separate our message storage across multiple physical nodes.

Next, for those that plan on or have implemented solutions utilizing Greg Young’s flavor of event sourcing, you’re already keeping your messages around.  The messages that you store in the event log are the same ones being dispatched on commit.  When utilizing event sourcing we have no choice but to keep all messages because that what we use to rebuild application state.

Okay, now that we have looked at reasons why we may want to keep all of our dispatched messages around, let’s consider the opposite.  How can we get rid of messages as quickly as possible?  How can we free up our own resources?  How can we avoid our disks accidentally running out of space (and brining down the server) because our message logs consumed all available space?

In looking over the various idempotent messaging patterns one thing stands out.  The reason for keeping messages around is for failure scenarios.  In the situation where we are re-handling a message because our process died while handling a message, but when we got far enough for it to transform some application state, we want to be able to redispatch the messages that should have been sent had the original set of transactions been fully successful.

Because the idempotency patterns thus far have only been addressing failure conditions, how can we address the situation where there were no failures.  If a message was successfully handled and dispatched a set of resultant messages, how can we avoid the needless redispatching or even the reprocessing a known duplicate message without storing every single message we’ve ever dispatched?

There are two parts to the proposed solution which can be applied to varying degrees depending upon how aggressive you’d like to be with your space-saving constraints.  The first is to purge successfully dispatched messages and the second is to purge the record that a message was ever processed in the first place.

Because the message log contains messages that we would redispatch upon handling a duplicate message, we only need to keep those messages around so long as we are not confident that the source message was not fully processed.  In other words, once we have successfully published the results of processing, we have at-least-once delivery guarantees from our messaging infrastructure.  Because of this guarantees we don’t need to keep these messages around anymore.  They only clutter up our message log and occupying disk space.

When a message has been successfully processed we need to somehow indicate to our application-level idempotency middleware that there is no longer a need to keep messages related to the source message.  While we may purge the log, we will want to keep the application-level identifier of the message that was processed.  We keep this to know if we should even bother reprocessing the message should we receive it again.  Keeping a list of identifiers is extremely cheap as compared to storing the contents of every dispatched message indefinitely.

Our application can indicate that a purge of the message log should occur in any number of different ways.  One possible way might include using a separate application thread that receives notification when all resulting messages have been published.  Once this other thread receives notification, it could then purge the contents of the message log for that message while retaining the identifier of the message that was processed.

Instead of using a different thread, we could also utilize the messaging infrastructure itself.  We could tack on a “MessagesSuccessfullyDispatched” message that was sent (or maybe even published) to a receiver/subscriber queue to purge the message log.  Note that MessagesSuccessfullyDispatched is naturally idempotent because processing that message multiple times results in the same value.  Because it is naturally idempotent, we need not worry about apply the same patterns to the handling of this special infrastructure message.

Another possibility would be to have a completely separate process polling the message log database to see which messages could be purged because they have been successfully processed.

The last part of the solution is to have some kind of out-of-process cleanup that would occasionally wake up and purge the message log of the fact that a particular message was ever received in the first place by purging the identifiers of all messages received from the log.  This purging could be done on a scheduled basis.  The only question then becomes, how long do we wait before purging old message identifiers from the log?  This can best be answered by understanding how long duplicate messages may be re-dispatched throughout the system.  In many cases a few hours or days may be sufficient, but to be extremely safe, we could set the threshold to be something like a few weeks or perhaps even a month.  This could even potentially be tuned on a per-message-type level.  It all depends upon the idempotency threshold of each message type along with the space constraints and requirements of the business service upon its operational environment.

Messaging: At-Least-Once Delivery

One of the guarantees behind messaging is guaranteed delivery.  Most of us don’t give any additional thought to the subject.  Until we get into production.  While messages may be guaranteed deliverable, have we ever considered that a message may be delivered more than once?

When I first started seriously looking at and investigating messaging solutions a number of years ago one of the things that baffled me was how to ensure exactly-once delivery of messages.  Virtually all messaging solutions provided at-least-once delivery.  How could I prevent messages from being delivered more than once?  And what about failure and failover of the message queuing infrastructure itself?  Some of the cheaper or open source message infrastructure solutions that I investigated had some sort of quasi-failover, usually depending upon a shared drive as a single point of failure OR some asynchronous gossip protocol to let other peers know which messages had been delivered successfully.

Now we live in a more cloudy environment, with cloud-based infrastructure starting to occupy the minds of development and operations teams alike.  Many of the messaging solutions such as Windows Azure Queues and Amazon’s SQS continue to offer guaranteed delivery, but like their traditional, non-cloud counterparts, they only offer it in the from of at-least-once delivery.

In the end we have no way to prevent the infrastructure from delivering a message more than once.  What can we do?  As I have outlined in a previous post, we can apply a few idempotency patterns to make our application robust against the prospects of multiple delivery of messages.

One of the primary reasons, if not the primary reason, that messaging systems offer at-least-once delivery rather than exactly-once delivery is that we run into limitations as expressed in the CAP theorem—all “nodes” cannot possibly have a globally consistent snapshot and be available and partition tolerant.  There is no unfailing “global brain”.

Idempotency Patterns

In several of my previous posts I discussed how to avoid the overhead of a two-phase commit while still being able to maintain an application-level transactional consistency between two resources, such as a message queue and durable data store, e.g. a database.  When considering how to implement idempotency there are a number of factors to consider.  Let us examine each of these in turn.

First, is the message inherently idempotent?  That is, does it contain and operation that contains a fixed value at a point in time such as a stock quote rather than a transformational instruction such as “add $10 to balance”.  If the message is naturally idempotent, then much of the following discussion can be ignored because reprocessing the message multiple times results in the same value, e.g. f(x) = f(f(x)).

If the message is not intrinsically idempotent, we will want to take steps to mitigate or resolve the effects of processing the same message multiple times.  In other words, we want artificial idempotency.  In this case, we need to consider the computational overhead of reprocessing a particular message.  Understanding this cost can be important because it helps us determine if we must proactively or reactively handle application-level message idempotency.

As mentioned in my previous post, one requirement for creating idempotency in messages that aren’t idempotent is to establish a message store.  This message store is keyed off of the application-level identifier of the message being processed.  It contains a list of messages that have been dispatched as a result of processing the message being received.  Hereafter we shall simply call this a message store.

When the computational overhead of reprocessing a message is high as compared to querying to determine if a message has been processed and the probability of a message being received more than once, we will want to proactively filter messages that have already been processed.  Remember that the reasons for receiving the same message twice can be varied and range from unexpected process termination to other applications (including our own) redispatching messages.

When proactively filtering messages, we simply query our message store to determine if the message being received has already been processed.  If so, redispatch the messages loaded from the query and discontinue processing the current message.  Note that the way we store messages in our data store very much depends upon the transactional characteristics of the data store itself.  The specifics are outlined in my previous post.

When we reactively filter a message we know that the computational overhead of processing is very low and/or the probability of a duplicate message is relatively low.  In this scenario, we simply reprocess as normal.  Then, when we attempt to save any to-be-dispatched-messages resulting from processing to our data store, we catch unique key violations (or similar exceptions) thrown by our data store.  If we catch an exception, we abandon our current unit of work and redispatch the messages saved to the data store.

The above scenarios are all fine and good, but what about if we aren’t operating against a transactional resource?  What if we are performing an operation against a web service or some other RPC-based call using a non-reliable protocol?  How can we know if the operation was successful?  This requires a little bit more work but still falls within the same general set of patterns outlined herein.  Generally we will want to follow the proactive approach discussed above but with a few notable exceptions.

The main theme surrounding this last “pattern” is that we cannot truly wrap all work in a unit and submit it as a batch because we are operating against a non-transactional resource.  One possible solution would simply be to query the non-transactional resource to determine the state for a given message.  One problem is the latency involved when querying remote resources such as web services over HTTP.  Other considerations may include that some HTTP calls incur a fee on a per-call basis making it prohibitively expensive to perform a status-checking query for every single message. Lastly, some resources may not even support a querying operation.

At this point, we probably should disambiguate the meaning of non-transactional resource a little bit more.  Although the resource may be transactional in the sense that it did or did not perform an operation, it’s effectively all or nothing for only a single call, such as a web service.  Furthermore, our application may not even know that the remote invocation succeeded or failed because the response never made it back to our application.  Or, our process may have terminated after the “request” packet had been sent across the wire but before any “response” packet was received.

Further, just because we’re working with a non-transactional resource, doesn’t mean that we can’t effectively simulate transactional behavior by recording the application status to our own durable resource.

In the cases where the resource supports queries, we can implement a few techniques to avoid excessive and sometimes expensive calls.  The first technique is to record the status of the call at each step to a durable resource.  For example, when we are about to call a web service we record that fact.  When we receive the results of a web service call, we also log that fact.  Finally when we are about to dispatch a message containing the results of the web service call, we log that message to our message store.  As we receive a message from our own queue and process it, we must check the log to determine what work, if any, has been done for this particular message.  If we have previously logged that we called the web service but no corresponding log entry for the results of the web service call was recorded, we can then incur the additional overhead of querying the resource to see what happened and then taking appropriate action.

Let’s look at the above for something like calling a shipping web service from FedEx or UPS or perhaps processing a credit card transaction.  If our application has logged that we called the web service but there is no log entry containing the results, we can query the web service and ask what has been done for credit card ABC or order 123.  Depending upon the results we can take appropriate action such as authorizing a credit card if the transaction was never received or picking up where the previous attempt failed.

The last “pattern” that merits our consideration is when the resource doesn’t support query operations.  This is where things get a little bit sticky because we have no way to know if the work was really performed.  The canonical example is sending an email by connecting to an SMTP server.  Like the previous scenario of calling FedEx or UPS, we can perform virtually all of the same steps.  The key difference is that we are unable to query to determine the state of the resource involved.  In this case, we may have to accept a small level of inconsistency for a very specific failure condition.

The inconsistency being referred to is the possibility of an email being sent twice (continuing with the SMTP example).  If our process terminates after us logging that we are going to call the SMTP server (and perhaps after the SMTP server was called), but before we are able to log that the SMTP server was called, we have absolutely no way to know what work has been done.  This being the case we have no choice but to perform the operation again.  At the same time, is it really that bad if an end user gets an email twice?

Conclusion

I hope you will find some of these patterns useful when building applications using messaging patterns using guaranteed at-least-once delivery provided by messaging infrastructure.

Granted, these are only brief summaries of some of the various “patterns” for making messages idempotent.  The examples are a little bit arbitrary  and contrived and we might easily poke holes in them, but the principles still hold.

Extending NServiceBus: Avoiding Two-Phase Commits

When using non-transactional queues processing a message is trivially easy—just handle the message and write to your data store.  If the process dies and we lose a message, that’s okay.  But when using a transactional queue things can become more difficult because we want our infrastructure to ensure consistency between our message queues and application database.  I have already outlined the trouble of using two-phase commits in a previous post.

What if we were able to handle the inconsistency between our message queue and application data store at an application rather than infrastructure level?  What if we embraced receiving duplicate messages?  What if our application had the ability to make message processing idempotent, despite the message not being idempotent?  What if we were able to pick and choose the messages that that became idempotent on a per-message type basis?

A Typical Example

Let’s go through a typical message processing example to see how this might work.

When everything is working properly we:

  1. Receive a message from the queue.
  2. Process the message.
  3. Save any changed application state to our data store and call Bus.Publish.
  4. Publish resulting messages.
  5. Message transactionally removed from the message queue.

But what about failure conditions?  This is exactly the type of thing that transactions are designed to handle.  How can we maintain application integrity without 2PC?  Let’s consider a few failure conditions.

When the database transaction fails:

  1. Receive a message from the queue.
  2. Process the message.
  3. Write the changes to the database.
  4. Fail to commit, e.g. power outage, process restart, etc.

When the process comes back online, it’s going to pick up right where it left off and the application and message queue will be in a consistent state.

Now let’s examine a failure condition where the database commits but resulting messages failed to publish because of a power or hardware failure.

  1. Receive the message from the queue.
  2. Process the message.
  3. Save application state and call Bus.Publish.
  4. Commit application state (inside of a separate transaction).
  5. Power failure.

Herein lies the problem.  When the process comes back up, it’s going to reprocess the message…but it’s already been processed.  How do we handle that?

The Solution

What if we were to keep a log of all received messages (at least their identifiers) along with all bus activity (Publish, Send, Return, Reply) associated with the processing of that message?  What if we stored that list as part of our application state during the commit operation?  Doing so would allow us to check if a message was already processed.  If it was already processed, we could get the associated messages that resulted from processing and redispatch them.  It may be worth noting that event sourcing already stores all of the changes that resulted from processing a message.  In that scenario you’re already saving the data, now it’s just a small initial check before processing.

What I have outlined above is actually a well-understood pattern and some message queue providers even provide database hooks to facilitate message de-duplication.  Pat Helland who worked at Amazon and Microsoft wrote a really good paper on this exact same topic a few years back.  One critical difference between what Pat proposes vs. what the message queue providers do is that Pat always works on local copies of data.  In other words, the boundary of each transaction is against a single machine or tightly bound cluster of machines.  This rings true to me because it is harmonious with SOA principles.

I recently created a proof of concept around these very same principles and I call it “IdempotentConsumer”.  It’s an NServiceBus message handler along with associated application services that facilitate de-duplication of a message before handing it off.  In my proof of concept I am relying on two concepts—that each logical business message can be uniquely identified by a business identifier specific to the message and that each message is addressed to a single business entity—like an aggregate root.  Plug-ins could be written to work with alternative storage engines.

UPDATE: IdempotentConsumer has been updated to utilize more optimistic styles of concurrency.  Rather than checking each message that arrives before handing it off to the application, we instead assume that messages are not duplicate and handle any optimistic concurrency exceptions raised by our data store.

NoSQL

While the above works great when you have a resource that supports transactional integrity in the form of TransactionScope, it becomes more difficult when working with NoSQL storage engines, such as CouchDB, Cassandra, Riak, etc.

In a relational DB we can perform multiple writes to various tables and have all writes be transactional.  In NoSQL the boundary of a transaction is a single write.  Multiple writes means multiple transactions.  Granted, CouchDB does support batching, but you lose some consistency checks when doing a batch write to multiple documents simultaneously, such as the ability to raise an optimistic concurrency exception if you’re not writing using the latest version of a particular document.

So how do we write the list of published messages resulting from the message currently being processed and maintain transactional integrity with the changes in application state?  Easy.

Write the list of messages to be published as a result of processing the incoming message.  Once that transaction is complete, write the associated document containing the changes in application state–but include the ID of the message being processed *inside* the document.  When we query to obtain the list of messages we do a server-side or client-side MapReduce of sorts to effectively “join” the list of published messages with the associated parent message ID found in the application state document/row.  If the application state contains the ID found in the list of messages, redispatch the stored messages and we’re done.  If it doesn’t, delete the list of published message—they were never published because a failure occurred prior to writing the application state record.

Because we are doing this on a per-message type, we can customize the way we query each business object—we can have a standard way of querying most and then customize the way we query specific ones depending upon how they store the parent message ID.  We can vary the way we query based upon the document storage structure or even storage engine.

Conclusion

The biggest advantage of what I have elaborated is that we can effectively simulate a two-phase commit among multiple resources without the overhead of a 2PC.  These allows us to use storage engines whose drivers don’t support two-phase, e.g. MySQL, PostgresSQL, or virtually all NoSQL implementations.  Furthermore the receipt and publishing of a message is still transactional, it’s just a separate transaction from the storage of application state.  As I mentioned previously those that do event sourcing effectively get this for free, but a small amount of application service code must be written to facilitate it.

One big argument against the above is that we’re doing lots of unneeded queries to determine if we should even bother to process the message.  Yes, that’s true, but we should only ever be querying a local storage (truly local or inside of our cluster). [UPDATE: The previous argument is nullified by using optimistic concurrency as noted in the previous UPDATE above.]  Next, we don’t need to implement this for every message type.  You are free to make your messages inherently idempotent.  You are also free to use safe, slow two-phase commit if the situation requires or operational environment allows.

These same patterns can be applied even when using non-transactional resources, such as sending an email.  The only quirk here is that there may be a one in a million scenario where we publish the same email twice.  But in that case, it’s not really a big deal if someone gets duplicate emails.

NServiceBus Distributed Transaction Woes

In the industry there is a general trend away from locking, two-phase commits.  In theory, quorum and Paxos-style commits ensure consistency across disparate resources, but why not avoid the issue altogether?  The fundamental idea behind each of these commit protocols is to ensure application consistency at an infrastructure level.  Rather than relying on low-level protocols to ensure consistency, why not embrace inconsistency and handle it explicitly within our application?  In this way we can avoid anything but simple, “single phase” commits against a single transactional resource such as a database or message queue.

Okay, hold on, stop the press.  Don’t we have to wrap our transaction around both the message queue and database?  The general rule for NServiceBus is…yes.  But as we can see, there are problems and difficulties getting NServiceBus to place nicely with two-phase, distributed transactions.  To be fair, it’s not actually NServiceBus, it’s subpar database drivers and configuration overhead related to MSDTC.

A while back we spent some time trying to get NServiceBus to play nicely in a two-phase distributed transaction against various databases.  We wanted to try out databases other than SQL Server and Oracle both of  which have excellent drivers, tools, paid and community support, but would require us to mortgage our children and grandchildren.  With that in mind we tried out MySQL and PostgreSQL.  We ran each database server in both Windows and Linux both locally and remotely and we tested various drivers—both the “official” drivers as well as available third-party free and paid drivers related to each database.  We created a test harness to exercise each driver to flush out specific failure conditions we might find in production.

Our test harness worked with each driver as a simple IDbConnection.  We had the driver connect to the database locally (on Windows), perform some work and commit. We also had it do the same against a remote database server (both on Windows and Linux).  We had the test harness perform simulated failures like disabling the network in the middle of a transaction and watching the result.  We also had it send shutdown and kill commands to each DB instance at various points in the commit process to verify.  We shut down and killed MSDTC at various points and watched the result.  We performed distributed transactions against variations of the same database, the same vendor DB on a different machine, different drivers against the same database, or finally a different DB driver and DB vendor altogether.  At each stage we created successful scenarios and then variations on failure scenarios from the process dying during the each stage of the transaction, e.g. pre/post “PREPARE” and pre/post “COMMIT”.

If it sounds like a lot of tests, it is.  We ran each permutation of the test with each driver against each local database and each remote database (Windows and Linux).  We ran each outside of a transaction, inside a transaction, and inside a distributed transaction.  Then we looked at the results.

The only drivers that consistently performed in a distributed transaction were the official Microsoft drivers running against SQL Server.  Big surprise.  [Note: We didn’t run these tests against Oracle.]  While each driver held its own and performed consistently in a single transaction (using both TransactionScope and connection.BeginTransaction()), it was the distributed transactions that gave us trouble.

The official MySQL .NET driver doesn’t support distributed transactions.  A 3rd party MySQL driver did support distributed transactions, but enlisted itself as a “volatile” resource in the distributed transaction.  Further MySQL actually rolls back *after* a PREPARE if the connection is lost.  Bad. Bad. Bad.

The various PostgreSQL drivers had other issues.  One driver properly enlisted in the DTC but couldn’t handle rollbacks of a failed distributed transaction.  One cohort would vote “no” and the PostgreSQL driver wouldn’t rollback the PREPARED transaction which would create an exclusive lock on the associated row and block SELECTs against the table.  All in all, it was a little troubling.

We didn’t inspect Firebird which after SQLite is probably the only other open source DB we’d consider.  There are several considerations in our choice for a relational storage engine.  The first is NHibernate support, the second is community support/size/activity.  If we don’t have a degree of traction from the thriving community, we won’t touch it.

But there is hope.  First and foremost, the above issues are only manifest when working against a transactional queue.  If the queue isn’t transactional, then we don’t have this problem.  Second, if we absolutely positively required a distributed, two-phase commit, we can use SQL Server.

If we design our services along business boundaries according to Udi’s SOA principles, we can have multiple instances of SQL Express running on multiple boxes.  Each DB within each Express instance is limited to 4GB of storage.  That’s actually quite a bit of storage.  If we had a single autonomous component exceed 4GB, we could easily fork out the cash for a proper DB license because the AC would have justified its need.

At the same time, what if we were able to avoid 2PC commits or any style of distributed commit altogether?  What if we wanted to use an alternative storage engine, like CouchDB, Redis, Cassandra or Riak?  All of these troubles lead us down a different path, which is the subject of my next post: “Extending NServiceBus: Avoiding Two-Phase Commits”.

NServiceBus Message Modules (IMessageModule)

When handling messages in NServiceBus it may become necessary to consume the same message multiple times but in different ways.  For example, suppose you wanted to update separate yet related tables in the database.  You could throw all of the business logic into a single message handler and be done with it.  The only problem is that this message handler would be performing multiple actions and the intention behind the handler would be obscured.  Instead, create multiple, distinct message handlers and have each perform one complete action related to the message, e.g. updating a table and publishing a message.  In this way it becomes much easier to reason about each handler independently.

Logical vs. Physical Messages

When NServiceBus receives a message from a message queue, it receives what is known as a TransportMessage.  The TransportMessage can be viewed as a message envelope of sorts.  It holds multiple business/application messages and gives some metadata about its return address, correlation identifier, deliverability guarantees, and time to be received.

NServiceBus receives a TransportMessage from the queue and deserializes it, typically using the default XmlSerializer.  Deserializing this physical message yields a collection of logical business/application messages.  Each logical message is then dispatched to the list of registered handlers.

But what if we wanted to wrap all of the handlers that we’re executing?  What if we wanted to have some behavior occur prior to the handlers receiving messages and after all handler code was complete?  We need message modules.

Using Message Modules

At the most basic level a message module is a .NET class that implements the IMessageModule interface which is found in the NServiceBus.dll assembly.  This interface exposes three parameterless methods: HandleBeginMessage(), HandleEndMessage(), and HandleError().  These methods are invoked exactly once per physical TransportMessage received.  In a typical scenario the only methods invoked will be HandleBeginMessage() and HandleEndMessage().

Message modules are designed to wrap the receipt of a physical message.  They allow you to set values and invoke behavior during the initial stages of receiving a message and to perform any cleanup after all handlers have executed.  Lastly, they allow you to invoke additional behavior in failure scenarios.

The canonical example of implementing a message module is when working with NHibernate’s ISession and ISessionFactory.  During HandleBeginMessage(), we create a new session and bind that session to the thread, e.g.

CurrentSessionContext.Bind(sessionFactory.OpenSession());

This allows each message handler executing on the same thread to obtain the same instance of ISession and thus participate in the same unit of work when interacting with a database.  When all logical messages have been handled NServiceBus will invoke the module’s HandleEndMessage() method where we can unbind the current session from the thread and perform additional clean as necessary.

Message Module Behavior

There are a few things to be aware of when implementing a message module.  The first is that message modules are global or singleton scoped.  When the bus is started it requests the set of configured message modules from the configured IoC container.  This set of modules is then held and stored for the duration of the bus lifetime.

Because a message module is singleton scoped, care must be taken not to store message-specific state directly in the module.  Instead, this information should be stored on the thread performing the processing by decorating the associated static member variable with the TheadStatic attribute.  In addition NServiceBus reuses threads.  This means that all values stored on the thread using the ThreadStatic attribute must be cleared and reinitialized at the start of processing a new message.  Failure to reset all state within a module may lead to subtle bugs in your infrastructure code.

Multiple message modules can be registered to execute.  Simply have each implement IMessageModule.  NServiceBus will automatically scan all assemblies in the runtime directory and register any classes that implement IMessageModule.  Generally, the ordering of message modules should be unimportant to your application code.  Nonetheless, you may explicitly control the ordering of the message modules by registering each with your IoC container explicitly and in the order that best fits your requirements.  Message modules are always invoked from first to last.  This means that when HandleBeginMessage() is called, it is called on the first registered module then on the second registered module and so on.  This same ordering—first to last—is maintained when invoking HandleEndMessage() and HandleError().

When a message module is registered with the bus, at least two of the three methods will always be invoked—HandleBeginMessage() and HandleEndMessage().  In error or failure conditions these two methods will be invoked and then HandleError() will be invoked.  This means that there isn’t a “Finally()” or “Dispose()” method for each module.  That is, there’s no way to be sure that HandleEndMessage() is the last method to be invoked because it may be followed by HandleError() in failure conditions.

Conclusion

Despite a few minor quirks in the current implementation message modules serve a valuable purpose inside of an application and can be leveraged when the situation dictates their use, just like any other tool in the expansive NServiceBus toolbelt.

Extending NServiceBus: Thread-Specific Message Modules

NServiceBus message modules are global objects that are invoked at various stages of processing for a physical message, e.g. during receipt, when processing is complete, and upon any error condition.  Their typical use case is to setup thread-specific values, such as outbound message headers or starting up a new instance of ISession and storing it on the thread.

Message modules, while incredibly powerful, are somewhat troublesome.  First, because they’re shared, we have to remember to to avoid storing any state about message processing unless we store it on the thread by decorating it with the ThreadStatic attribute.

One other quirk of message modules is that they execute from first to last for HandleBeginMessage, HandleEndMessage, and HandleError.  In some situations, such as deterministic resource disposal, it would be nice to have them invoked from first to last for HandleBeginMessage and then last to first for HandleEndMessage/HandleError—in an inner-to-outer style.

The biggest challenge relates to not having a “final”-type method.  While message modules do expose HandleEndMessage and HandleError, we have no way of knowing which will be called last.  True, HandleError will always be called after HandleEndMessage, but we have no way to determine if HandleError will be called once HandleEndMessage is invoked.  Note that HandleEndMessage is always called, even in error conditions.

A Typical Example

We have a situation that requires setting up an alternate TransactionScope outside of the normal TransactionScope associated with the processing of the message.  Essentially we want to perform a “new TransactionScope(TransactionScopeOption.RequiresNew)”.  We can do this in a message module without difficulty.  The problem comes when we’re ready to Complete() the scope.  Where do we call Complete()?  NServiceBus is responsible for the transaction surrounding the processing of the message from the queue and is responsible for committing that transaction, but what about our transaction?  Our transaction is not enlisted in the ambient transaction which means that we are responsible for calling Complete() explicitly.

If we called Complete() from HandleEndMessage, how can we be sure that HandleError will not be called?  If we did call Complete() and HandleError was called, it would be too late to roll back.

One workaround is to create a base message handler from which our message handler derives.  This works great if you only have a single message handler per message (which is a best practice), but suppose we needed to have two handlers process the message?  We would be required to implement a pre-handler and a post-handler for our example above.  While it works, it less than ideal.

The Solution

What if there was something kind of like a message module but kind of like a message handler—something in between?  What if we had something whose lifetime was specific the processing of a physical message?  What if it was invoked at critical points before and after all message handlers?  What if it was thread specific so we wouldn’t have to worry about ThreadStatic attributes.

I created a proof of concept around the these principles and am calling it a “MessageSink”.  A MessageSink exposes four critical points during the processing of a physical message:

  • Initialize: Before message processing.
  • (Message handlers run here)
  • Success: After all handlers have completed successfully.
  • Failure: If any handler has thrown an exception.
  • Dispose: After success or failure, when we’re “cleaning up the sink”.

One other cool think about MessageSinks is that, unlike message modules, they are called in order for Initialize and then in reverse order for Success/Failure/Dispose.  This means that resources can be committed and cleaned up from an inner to outer manner.

Rather than modifying the NSB source code to implement this custom behavior, I found that by implementing a custom ITransport which wraps the actual transport, usually MsmqTransport, I was able to create the above behavior.

I don’t yet have a convenient way to get this into NSB per the regular configuration syntax.  I’m planning on creating a small proxy for IContainer which detects resolution of ITransport and then wraps the actual ITransport received from the container with the “MessageSinkTransport” and then returns the wrapped ITransport to NSB.

Conclusion

While NServiceBus message modules are incredibly capable there are certain scenarios where they do not adequately cover our requirements.  By leveraging message sinks we can more predictably gain access into the message processing pipeline to better respond to the various events which occur during such processing.

Extending NServiceBus: Per Unit of Work IoC Container

NServiceBus is great.  That’s really all there is to it.  The framework is incredibly flexible such that it can be extended without too much effort.  But there are a few small points of friction that we’ve encountered.  This article shows how we’ve been able to address one such point.

Choose Your IoC Container

Because NServiceBus allows us to choose an IoC container rather than forcing us to use a predefined one, we gain a lot of flexibility.  At the same time NServiceBus must play to the lowest-common denominator of IoC containers and thus has not been able to take advantage of all of the developments across the dependency injection world.

While most containers support “inner” or “nested” containers, some do not.  Typically these inner containers are used to facilitate a unit of work, such that all resources resolved during the unit of work are shared.  We happen to be using a container that does support nested scoping.  Unfortunately NServiceBus does not leverage this extremely useful capability.

Furthermore, more and more containers are handling the disposal of resources explicitly upon disposal of the inner or nested container.  While some containers require you to call IoC.Release(someResourceHere), others perform that work for you when you call Dispose() on the nested container.

Both of these capabilities combined—nested containers along with deterministic disposal of resources created by the nested container alleviate much of the development burden related to tracking and cleanup of resources.

In NServiceBus the logical unit of work is a message handler while the physical unit of work is the receipt of a “TransportMessage” which may contain one or more logical messages (business or application messages).  This means that the physical handling or processing of a TransportMessage is the perfect place to leverage our nested container unit of work pattern.

This is where things get difficult.  NServiceBus exposes no direct facility or capability to create a nested container upon receipt of a TransportMessage.  While it does support the concept of message modules through implementing the IMessageModule interface, modules are not well suited to this task because they are singleton scoped and contain global, rather that unit of work-specific behavior.

A Typical Example

The best example of this problem is demonstrated with NHibernate’s ISession.  ISession is our unit of work for all database-related activity.  If per chance we have multiple message handlers handling the same message during the same physical/TransportMessage receive we want all of them to share the same ISession.  We want a single database connection rather than multiple for handling a message.  Conversely, we do not want to share the session with other handlers which may be concurrently processing separate physical messages on separate threads.  The current workaround is to inject ISessionFactory into your handlers directly and then call GetCurrentSession() to get the session for the thread.  While this does work, it is less than ideal and represents a point of friction.

The same goes for all dependencies.  While ISession is really easy because ISessionFactory exposes thread-bound instances, what about other dependencies that do not have similar capabilities?  A few IoC containers offer thread-static caching, but if we’re using one without that capability baked into the container, we’ve got to write our own.

The Solution

What if all handlers that processed a single message could share dependencies?  What if all of those dependencies were deterministically and reliably disposed at the end of processing for each physical message (TransportMessage)?  What if we could leverage our container’s ability to create nested or inner containers?

I created a proof of concept around these very principles and I have done some exploratory testing to show that it works.  Rather than modifying the NSB source code to create an extension point, I simply leverage the facilities of our IoC container.  The code currently works against Autofac 1.4, but could be ported to any popular container that supports nested containers, e.g. StructureMap, Castle, Ninject, etc.

To utilize the sample, you must first compile the code in VS2008 and then add the associated assembly, NServiceBus.MessageSinks.AutofacConfiguration, to your project.  At that point you change your NSB startup configuration to say:

var builder = new ContainerBuilder();

builder.Register(new MessageSinkConfigurationModule());

Conclusion

Ideally there will be extension points added to NServiceBus which will allow us to utilize nested containers during the message handling portion, but in the meantime we’re able to leverage additional IoC container capabilities without touching an NServiceBus code.

I will be posting more about this and related solutions all touching on NServiceBus over the next few days.