Extending NServiceBus: Thread-Specific Message Modules

NServiceBus message modules are global objects that are invoked at various stages of processing for a physical message, e.g. during receipt, when processing is complete, and upon any error condition. Their typical use case is to setup thread-specific values, such as outbound message headers or starting up a new instance of ISession and storing it on the thread.

Message modules, while incredibly powerful, are somewhat troublesome. First, because they're shared, we have to remember to to avoid storing any state about message processing unless we store it on the thread by decorating it with the ThreadStatic attribute.

One other quirk of message modules is that they execute from first to last for HandleBeginMessage, HandleEndMessage, and HandleError. In some situations, such as deterministic resource disposal, it would be nice to have them invoked from first to last for HandleBeginMessage and then last to first for HandleEndMessage/HandleError—in an inner-to-outer style.

The biggest challenge relates to not having a "final"-type method. While message modules do expose HandleEndMessage and HandleError, we have no way of knowing which will be called last. True, HandleError will always be called after HandleEndMessage, but we have no way to determine if HandleError will be called once HandleEndMessage is invoked. Note that HandleEndMessage is always called, even in error conditions.

A Typical Example

We have a situation that requires setting up an alternate TransactionScope outside of the normal TransactionScope associated with the processing of the message. Essentially we want to perform a "new TransactionScope(TransactionScopeOption.RequiresNew)". We can do this in a message module without difficulty. The problem comes when we're ready to Complete() the scope. Where do we call Complete()? NServiceBus is responsible for the transaction surrounding the processing of the message from the queue and is responsible for committing that transaction, but what about our transaction? Our transaction is not enlisted in the ambient transaction which means that we are responsible for calling Complete() explicitly.

If we called Complete() from HandleEndMessage, how can we be sure that HandleError will not be called? If we did call Complete() and HandleError was called, it would be too late to roll back.

One workaround is to create a base message handler from which our message handler derives. This works great if you only have a single message handler per message (which is a best practice), but suppose we needed to have two handlers process the message? We would be required to implement a pre-handler and a post-handler for our example above. While it works, it less than ideal.

The Solution

What if there was something kind of like a message module but kind of like a message handler—something in between? What if we had something whose lifetime was specific the processing of a physical message? What if it was invoked at critical points before and after all message handlers? What if it was thread specific so we wouldn't have to worry about ThreadStatic attributes.

I created a proof of concept around the these principles and am calling it a "MessageSink". A MessageSink exposes four critical points during the processing of a physical message:

  • Initialize: Before message processing.
  • (Message handlers run here)
  • Success: After all handlers have completed successfully.
  • Failure: If any handler has thrown an exception.
  • Dispose: After success or failure, when we're "cleaning up the sink".

One other cool think about MessageSinks is that, unlike message modules, they are called in order for Initialize and then in reverse order for Success/Failure/Dispose. This means that resources can be committed and cleaned up from an inner to outer manner.

Rather than modifying the NSB source code to implement this custom behavior, I found that by implementing a custom ITransport which wraps the actual transport, usually MsmqTransport, I was able to create the above behavior.

I don't yet have a convenient way to get this into NSB per the regular configuration syntax. I'm planning on creating a small proxy for IContainer which detects resolution of ITransport and then wraps the actual ITransport received from the container with the "MessageSinkTransport" and then returns the wrapped ITransport to NSB.

Conclusion

While NServiceBus message modules are incredibly capable there are certain scenarios where they do not adequately cover our requirements. By leveraging message sinks we can more predictably gain access into the message processing pipeline to better respond to the various events which occur during such processing.