Monthly Archives: March 2009

DDDD: Pipes (Producers) and Filters (Consumers)

[UPDATE: This article may be better titled as Pipes (Publishers) and Filters (Consumers) because as of a recent message group thread, the concept of “Produce” has been renamed to “Publish”.]

As I really started looking at the interface for IConsume and IPublish (sorry, I can’t shake putting “I” on interfaces), I was slightly perplexed.  In essence, it appears that these two methods could do the same thing: Consume(Message) and Publish(Message) because the method signature is identical except the name.  If they have the same signature, why have a different name or even a separate interface?  It must be something related to the intent of each interface.

So, why do we need both?  Pipes and filters.  They both work with some kind of stream like grep does.  So which is which?  Well, the aggregate root of a domain implements IConsume, which means it’s a filter, because it takes a message and processes it, transforms it, etc.

This means that IPublish is the pipe portion—it takes the message and routes it in a certain direction—perhaps across process or network boundaries (serialization) or perhaps thread boundaries, or whatever.  It may not necessarily interpret the message data itself (which is what a filter does), it just sends the message where it’s supposed to go.

DDDD: Producers and Consumers

About a year ago, Greg wrote a good article on producers and consumers.  It’s an excellent read but is a little too abstract if you’re brand new to DDDD.  I would strongly recommend reading (at least three times) each of the previous blog posts from Greg in the DDDD series and watching all of the videos as listed in my Getting Started post.

Fortunately, Greg is very active in the Domain-Driven Design Yahoo Group.  Because of this, there are little gems that can be extracted by following his posts.  Before I post the link, I must say that often times message boards replies—and the corresponding original question(s)–don’t have have an adequate amount of context to be able to decipher them properly.  [E.g. “What is the marital status of the number five?"]  The reason for this is that there is a mountain of knowledge being brought to bear on the subject and only little flakes come through in a response.

Greg’s blog post was about a year ago, but his technique, per this thread, has evolved slightly since that time—as techniques should over time and experience.

In his original blog post he lists three separate interfaces, IConsume, IProduce, and IPublish.  In the aforementioned thread, it has been consolidated to two—Consumes (notice the missing “I” prefix) and IPublish—but the total number of methods remains the same: three.

The great thing about the IPublish interface is that it’s pretty much a pipes and filters interface which, as he mentions in the thread, allows you to do a lot of really cool things—queuing, serialization (sending through a message channel), changing thread boundaries, etc.

DDDD: Didn’t You Get The Message?

Greg has generously shed some light on a critical aspect of DDDD: How do I get the command (or event) message into the aggregate root?

First a small amount of background.  When a message arrives for consumption, typically a message handler like MapMessageToAggregateRootHandler will be responsible for loading the aggregate from the repository and pushing the message into the aggregate root.  (Note: The messages handled by this handler all have a GUID property representing the aggregate ID which the handler can use to load the aggregate from the repository.)  From a high level, the concept is easy enough.  But what about implementation specifics?  What does the code actually look like to get the message into the aggregate root?

Originally I was thinking that the aggregate root would somehow need to expose a method for every single type of message (command or event) that it could consume.  This could work, but it would make the message handler bloated and tightly coupled to the aggregate root.

Then I thought about having a single HandleMessage(IMessage msg) method on an aggregate root.  This is better (in some respects) because at least the application service is protected from the internals of the domain.  But still it’s a big smelly method!  A single HandleMessage method with a massive switch statement is just the kind of procedural code we’re trying to get away from.

The Elegant and Simple Solution

Greg informed me that his aggregates explicitly implement an interface of:

IConsume where T : Message

Here’s the beauty of the solution.  Your application service would load the aggregate from the repository.  It would then cast the aggregate to IConsume and it would then push the message into the aggregate through the Consume() method of the IConsume interface.  Brilliant!

The reason this works so well is:

  1. The message handler is completely decoupled from domain-specific methods.
  2. The aggregate root doesn’t need a single point of entry for all messages.  Each message comes through its own point of entry.

DDDD: Eric Evans Interviews Greg Young

[UPDATE: Well this is embarrassing.  This post is starting to get good amount of traffic and is getting listed on daily link blogs such as Dew Drop.  Unlike the other posts in my DDDD series, this particular post was supposed to serve more as a reminder to me, rather than a polished set of coherent and sequential thoughts.  It probably got ranked fairly high because it’s so keyword dense.]

I have watched the InfoQ video of Eric Evans interviewing Greg Young several times.  What follows below could be considered the “minutes” of the interview.  I’m mostly recording the things that stuck out to me from what both Greg and Eric said.  The thoughts aren’t supposed to be coherent out of context, but as you watch the video you may see these points jump out.

  • When a OrderCreateMessage is received (sounds like a command), the system will locate the symbol for the order from on the message.  Then the order will be added to the appropriate order book.  At which point the order book will sit there and wait.
  • [Question to self:] How is the “context” of the original command—it’s intent–maintained as corresponding events are generated as a result of the command?  ANSWER: Newly generated events are specific to the commands that resulted in their creation—e.g. RemoveTradedVolumeMessage command generates a VolumeTradedMessage—even though several types of messages may have the same effect on the object.
  • Changes always appear in groupings.
  • The framework underneath the state transition messages allows them to run either a pipeline or a peer-to-peer pub/sub pattern.
  • The domain is broken down into bounded contexts.  Those contexts are asynchronously mapped to each other instead of coming back to the root domain.
  • The decision to use a pipeline vs. a pub/sub model has to do with the percentage of messages a particular bounded context cares about.  If it cares about the majority, use a pipeline.
  • The anti-corruption layer lives on the receiving bounded context.
  • State transitions are part of the core domain—or perhaps a shared kernel.  The receiving bounded context will only ever work with those messages (as transformed by the anti-corruption layer) rather than the domain objects from the other bounded context.
  • Pub/sub works so well because the receiving context has no idea where it’s getting the data from—this seems to imply that in a pipeline, the receiving context does know where it’s getting the data from.
  • Distribution of the aggregates (partitioning) must be governed in a very specific way—in their scenario it’s to ensure that no type stock symbols live on the same machine in their mirrored cluster.
  • Understanding of the domain is critical to the correct partitioning of the domain across different servers, e.g. keeping stock symbols in different mirrored nodes.
  • Objects are responsible for tracking their own state changes, e.g. “We say to the order: ‘Give me your state changes.’  The order will say, ‘This is what has changed since I’ve been opened.’” 
  • We can take those state changes *from* the repository and push it to a pipeline or directly process it to a database.
  • The repository is still responsible for knowing what to do with state changes.
  • One repository per aggregate root (normal DDD).
  • The call for state changes to the aggregate root causes the root to interrogate all of the children for their state changes, which becomes a single Unit of Work.  [The child entities could potentially use the observer pattern to notify the aggregate root of changes.]
  • “We have found in pretty much everything we have done that your partitioning boundaries are almost always on your aggregate roots.”  This appears to be partitioning of the bounded context across multiple servers such that the same aggregate root doesn’t live on two different servers simultaneously (excepting the mirroring scenario as mentioned above).
  • You can separate data models on the aggregate roots, e.g. one DB instance per aggregate root.
  • DDD Book: Aggregate is a transaction boundary, but Evans has encountered the aggregate roots make great boundaries for partitioning AND distribution.
  • [This was a bit confusing for me:] “The other thing that we have noticed with aggregate roots is whether or not something lives in an aggregate root is actually me explicitly saying that it should belong to the same data model as the aggregate root.  Often times we have shared data which is shared between many aggregate roots which we will deliberately put a service over and have the aggregate root actually talk to the service when it needs to.  And the reason that we do this is so that in our back-end data model, we can take that separate piece of data that’s being used in many aggregate roots and actually pull it out into its own data store which is shared between them.  We then use a database per aggregate root or a set of databases which are partitioned on the aggregate root.”
  • The anti-corruption layer receives a message, transforms it and publishes the new, transformed message to its own bounded context.

Ensure Code File Line Lengths

Wouldn’t it be great to ensure that you had consistent line lengths in your code files in Visual Studio?  All versions of Visual Studio since 2003 have the ability to have what’s called a “guide line” as shown below:

clip_image002

You can even have multiple “guide lines”.

The following command will add two guidelines, one at column 82 and the other at column 98 (zero-based):

REG ADD "HKCUSoftwareMicrosoftVisualStudio9.0Text Editor" /v Guides /d "RGB(128,0,0) 83, 99

References

http://blogs.msdn.com/robcamer/archive/2009/03/28/how-to-know-when-lines-are-a-certain-length-in-visual-studio.aspx

http://dotnettipoftheday.org/tips/6B29ED91-DE6C-425F-9E28-95EED8153F0C.aspx?discussion=1

DDDD: The Event Pipeline—Bringing It All Together

There are little nuggets of gold in comments that Greg makes on various groups, such as Yahoo’s DDD group.  One of them is in regards to the event pipeline.

The responsibility of the event pipeline is “to provide an indexed view of the event storage.”  In other words, it’s like the repository pattern, but this time at a much lower level.  Basically we want to make it look like all of the events are in memory.

In Greg’s response to a question Yahoo Groups, he mentions that he created a custom pipeline implementation using the file system which preserved the message ordering because it had to.  I have been digging for this very response for a few weeks now.  I was trying to determine the actual implementation details of the pipeline.  This understanding of some of the implementation details helps me to reconcile my understanding of where the write-ahead log fits into the picture.  The write-ahead log *is* the pipeline—or at least part of it.

It appears that the pipeline is broken down into two pieces: a local storage (the write-ahead log) and permanent, shared storage, meaning a persistence engine of some type—perhaps CouchDB?  These two persistence mechanisms would most likely implement some type of IPipeline interface.  Then, there would be another object, let’s call it CompositePipeline that would also implement IPipeline, which would contain two IPipeline references—one to the local file system, and the other to the database.

Here is where it gets interesting.  As soon as a unit of work is complete, several things would happen transactionally.  First, the message(s) sent to the bounded context would be removed from their message channel (if we’re using a service bus) and the events would be serialized to the file system.  There is a slight issue regarding the transactional capabilities of the file system prior to Vista, but that’s beyond the scope of this blog post.

In a completely separate process, we would have an instance of the FileSystemPipeline and a CouchDbPipeline and we would read events from the file system and put them into CouchDB in a transactional manner.  This would in a multi-threaded, thread-safe manner.  To completely avoid issues with file locking, we ensure that the instance of the pipeline inside of the bounded content memory space only adds to the file system—it never reads or deletes an event once written.

What about concurrency?  If the events are temporarily stored in the file system and the system goes offline before writing them to the database, how is the rest of the system made aware of those state changes?

If you’re only running a single instance of a bounded context, then it doesn’t matter.  Your reporting DB and other bounded contexts won’t get the events until the machine comes back online—that’s all part of the “eventually consistent” paradigm.

I am still investigating how best to handle offline scenarios when there are multiple instances of the same bounded context.  In general though, it doesn’t appear to be much different—the rest of the system gets updated when the node comes back online.

One possibility would be to have each event produced have some sort of correlation identifier which tied back to the original command message.  Then, if the locator (content-based router) detected a particular node was offline, it could re-dispatch those same commands to another node for processing.  When the failed node came back online and started persisting to the database the persistence process would detect whether or not the particular correlation identifier was already stored and simply skip storing those events.  I haven’t thought this through all the way, but in general the idea might work.

DDDD Locator—Not a Single Point of Failure

In my previous blog post, I elaborated upon Greg’s devTeach talk in which he hints at distributing a bounded context.  I talked about how using what Greg calls a “locator” can facilitate distribution without introducing concurrency problems.

The question was asked in his talk at 46:40, “Is the locator a single point of failure?”  Because his talk was not actually about distributed systems, but rather about CQS and some foundational aspects of DDDD, Greg pulled back from that question and moved on.

Here’s the answer:  No, it’s not a single point of failure.

If you had a single instance of a “locator”, yes, it would be.  But if you had multiple instances, then you’d be okay.  Great, now we reintroduced a whole mess of concurrency issues by having multiple locators.

Enterprise Integration Patterns

Before we dig into solving concurrency between each instance of the locator, we should probably use the appropriate name for the solution.  In Gregor Hohpe’s book, Enterprise Integration Patterns, he talks about a pattern he calls a Content-based Router.  The “locator”, per Greg’s talk, seems to be content-based router.  The word “locator” to me sounds much more request/response-oriented, while a content-based router sounds much more like its Ethernet counterpart, which forwards messages (or packets) to an endpoint.  Therefore, rather than calling it a “locator”, I will simply refer to it as a content-based router or “router” for short.

Semi-stateful Routing

The content-based router could either be implemented with hard-coded rules (stateless), or it could function in a much more dynamic fashion (stateful).

In Greg’s devTeach talk, he talked about how the router would be aware of CPU time and available memory on each node to which it could forward a command.  In his interview with Eric Evans, he talked about ensuring that no two ticker symbols in his stock trading system would live on the exact same node—thus drastically reducing the probability that multiple node failures would bring down any ticker symbol.

From a performance standpoint, the router makes all of its decisions based upon information available in memory, rather than using any kind of external service call or database.  In this manner, the router would perform as if it was completely stateless, while in fact, it remembers on which node a particular aggregate root lives.

Single Point of Failure

The router has the responsibility of determining the node to which commands will be sent and remembering that information.  By itself, it’s a single point of failure.  To avoid this, we simply apply the same pub/sub model to the router.  Whenever a decision is made regarding routing tables, we publish that information to other instances of the router.  In this manner, we avoid the router being a single point of failure.

Concurrency Revisited

If we have multiple routers, don’t we have to manage concurrency between them?  Yes, if they process messages simultaneously.  But if we only have one instance of the router active at a time and listening for messages, then we don’t have a problem.

How do we ensure that one and only one router is listening for messages to forward?

Stayin’ Alive / Another One Bites The Dust

At this point, we should introduce an infrastructure message, the heartbeat message.  At a specified interval each instance of a bounded context AND each router would publish a heartbeat message and publish it to each router saying “I’m alive.”  We can use this message and embed a few other tidbits of information—including available memory or CPU time, etc.

This message serves two purposes.  First, if a particular node misses X hearbeats, we know that the node has failed and we can re-direct the messages to another node.  We then notify all other routers of the same so that they know of the change.  [Aside: We also need to figure out how to deal with messages that have already been sent to the failed node.]

The same goes for the routers themselves.  Only one router is actively forwarding messages while the others are idling, listening to state changes from the active router and heartbeats from the nodes.  When the active router misses X heartbeats, the remaining routers elect a new leader.  (A super-easy formula for leader election is to use the one with highest fixed value, e.g. IP address, or # of CPUs, or # of GB of RAM, etc.)

Once the new leader is elected, it takes over and starts processing messages and forwards them to the appropriate node.  When the failed router comes back online, it sees that another router is actively processing (probably through the heartbeat message from the newly elected primary router) and assumes its place in idle mode.  [Aside: How would it get the list of routes from the primary router?  Or is that information truly necessary?]

When either a new or failed node comes online, it should publish a message to all routers as if it was a brand-new node because it probably doesn’t have any in-memory state any more.  Or even if it did, the routers may have redirected messages elsewhere.

Conclusion

Message-based systems can be exponentially more complex to orchestrate than simple request/reply models, but it’s because they’re exponentially more scalable.  Fortunately this complexity resides outside the domain and is in separate, reusable, infrastructure components so that you can grow into this solution as necessary.

As a final thought, I would recommend that you have a set of routers for each bounded context that needs concurrency.

Concurrency in a DDDD World

Introduction

One of the tough things about DDDD right now is there isn’t a mountain of information available like with other programming topics.  I guess that’s a blessing in disguise because there’s also not a ton of misinformation either.

In traditional N-tier systems and in repository-based, DDD systems, we typically let the database manage all of our state for us.  Then we use different patterns, such as the optimistic offline lock (AKA optimistic concurrency), to ensure concurrency when multiple processes are trying to modify the same information at roughly the same time.

In DDDD things work a little bit differently.  Once you start to scale a bounded context out beyond one server, you have to worry about concurrency—meaning that we ensure that the understanding of the known world is consistent between any instance of the same bounded context.  This is a very challenging problem, but it is one that has been solved in several different ways.  What we need to decide is how best to solve the problem in a distributed, message-based environment.

The Identity Map

As I have blogged about previously, Greg mentions that each bounded context has an identity map and that we can use pub/sub to notify all other instances of the same bounded context of changes in state as a result of processing commands.  This works great, but once we have multiple instances running, we need to be extremely careful lest two instances of the same aggregate process commands which produce conflicting results.

An Example

Let’s suppose there’s a user on eBay who wants to sign-up with a certain username—let’s say it’s “bob” and, for the sake of argument, that it’s available.  Because of the number of users using eBay, it’s a fairly probable situation that he could attempt to register with that username at the same time as someone else.

If the same aggregate was on multiple servers and a request came into both instances simultaneously requesting the username “bob”—one from Bob Martin and the other from Bob Jones—all of a sudden, we have a problem.  Each aggregate has approved the username “bob” and passed a message back to some workflow saying that it’s reserved the name for their respective user.  We’ve got a concurrency issue.

If not address properly we will run into some very nasty temporal bugs that may or may not be discovered for a long time.  What’s worse, the longer the bug exists, the more damage it causes to all related systems.  Usually these types of bugs have a way of corrupting data not just in their own bounded context, but throughout all parts of the extended domain.

The Solution

Rather than randomly having any instance of the same bounded context receive commands that it needs to perform, we instead want to forward them to a single instance.  You might say, “Wait a second!  Why run multiple instances if all commands are forwarded to only one instance?!”  The answer is simple: Partitioning.  We can use a locator (devTeach @ 46:20) to forward the command to the appropriate instance of the bounded context based upon some selection criteria.  In the case of our eBay example—we could partition based upon the first letter of the username, such that all requests for usernames starting with the letter “b” are forwarded to a specific instance of the bounded context for the aggregate root to process.

Continuing with our example—the same aggregate would now service both requests.  It would service one command and then the other.  The first would be approved, the second would be denied.  Usernames starting with other letters, would be handled by other instances.  Because we have different instances handling non-overlapping portions of work, we have effectively eliminated concurrency issues—for this particular context.  Do you see how we can distribute and scale?

The only caveat, per my understanding, is that we need to ensure that each aggregate only performs one unit of work at a time—effectively serializing all work.  Serializing (queuing up work), makes it seem like commands would back up very quickly.  Fortunately because of ways that we can architecture our DDDD infrastructure, each unit of work does not take long to complete—especially when we don’t have to communicate with a persistence engine directly and virtually all communication is in-process communication.

DDDD and CQS: Getting Started

There are a number of good resources for getting started on Distributed Domain-Driven Design as well as architectural-level Command Query Separation (not to be confused with CQS as a programming paradigm).

Basic Concepts

For starters, you should be familiar with "regular" DDD: Domain-Driven Design, DDD Quickly, and Think DDD.

Then, you’ll probably want to understand about distributed systems because the way that you think about programming in a distributed environment is most definitely not the traditional "request/response" paradigm: Enterprise Integration Patterns, Pattern-Oriented Software Architecture, Volume 4 (Distributed Computing), and Distributed, Event-Based Systems.

From there, you’ll want to read up just a bit on Command Query Separation and SOA on Udi Dahan’s blog as well as some of the processing methods using messaging.

Videos & Screencasts

You may want to allocate a block of time.  I can promise you it is probably the best few hours you’ll ever spend.  Think of this as an investment.  If you spend all day writing code, wouldn’t it be nice to "sharpen the saw" from time to time?

With that block of time there are several videos that are worth watching.  I advocate watching them in order listed below.  I had to watch each several times to really have things sink in.

devTeach – The best "primer" to DDDD and CQS

European Virtual ALT.NET Screencast – The quality of the video isn’t the best, but it expands upon the original concepts in the devTeach video.

Eric Evans interviews Greg Young

Greg Young discusses State Transitions in DDD

Seattle ALT.NET Meeting

Follow-up

Once you’re done watching those videos, you’ll be able to better understand Greg Young’s blog posts.

In addition, there are a number of other really good resources:

http://www.bjoernrochel.de/2008/11/20/command-query-separation/

http://www.goeleven.com/blog/EntryDetail.aspx?entry=133

http://blog.xebia.com/2008/11/22/qcon-san-francisco-2008-unleash-your-domain/

http://elegantcode.com/2008/04/30/altnet-seattle-takeawayddddresources/

http://www.martinfowler.com/eaaDev/EventSourcing.html

Infinite Loop Event Sourcing

Greg Young talks about using an identity map to help avoid hitting the persistence engine and making it the bottleneck for the entire system.  In a fully distributed scenario, the event pipeline coming out of the repository would publish any events committed to an aggregate to all interested parties.  These interested parties would include a persistence engine subscriber, other bounded contexts, and other instances of the same bounded context.  Other instances of the same bounded context would use an identity map and listen to messages for aggregates that already held in memory–other messages would be ignored.

When using an identity map there are several things to keep in mind:

  1. The identity map sits between the repository and the persistence engine.
  2. The identity map must only hand out transactionally consistent aggregate roots.
  3. Applying events within the identity map must not cause events to be published.

Do I Need To Sit Between You Two?

The identity map sits "between" the repository and the persistence engine so that it can service requests for a particular aggregate root.  If it’s able to service the request, because it has one in memory, it hands the in-memory instance back.  If not, the repository goes to the persistence engine and load the values and stores the reconstituted aggregate root the identity map.

This step alone takes a huge burden off of the persistence engine.

Transactionally Consistent

Normally an identity map is unique to a unit of work such that each unit of work would have its own identity map holding non-shared reference to an aggregate.  While this statement is true, it’s not entirely complete when dealing with distributed-messaging scenarios.

Specifically, one of the things that the message-based identity map does is listen as a subscriber to all events published by other processes or instances of the same bounded context.  If an event comes in and the aggregate to which the event pertains is in memory, it processes the event against the aggregate.  If the aggregate is not in memory, it simply drops the message.

We have to be very careful to ensure that when a series of events comes in as part of a single transaction, that we apply all events before handing back a reference to the aggregate.   If we don’t, we risk passing back the aggregate in an inconsistent–or even invalid–state.

Furthermore, we have to be careful not to apply messages to the aggregate root reference that is currently being processed as part of the unit of work.  It would seem as though this message-based identity map is two layers deep.  There would be a session-level identity map which holds session-level references and a process-level identity map which subscribes to messages from other instances of the same bounded context and applies them when they arrive.

For optimistic concurrency, it would appear that when a unit of work was complete, the aggregate version at the start of the unit of work would be compared with the aggregate version in the process-level identity map.  If the two were the same, the unit of work would commit, otherwise, the transaction would rollback and the unit of work would be re-attempted.

Lastly, it would also appear that the process-level identity map doesn’t hand out the same reference to specific aggregate root twice.  Instead it would appear that it hands out copies.  The  main reason for this is so that multiple units of work could potentially be executing against the same aggregate root simultaneously, with optimistic concurrency protecting each instance from the other.  The other reason is so that, if a unit of work failed, we wouldn’t have to figure out how to rollback the aggregate.  Instead we could just toss out the reference and get a new one.

Infinite Loop

Lastly, when applying events received to the aggregate from other instances of the same bounded context, we want to ensure that we ignore the events that come out the other side.  This means that, by definition, when I send an aggregate a "UserNameChanged" event, the aggregate produces a "UserNameChanged" event as a result in a 1-1 consume/produce fashion.

We don’t want to publish these events, otherwise, we might inadvertently create an infinite loop between all the other instances of the same bounded context.