DDDD: The Event Pipeline—Bringing It All Together

There are little nuggets of gold in comments that Greg makes on various groups, such as Yahoo's DDD group. One of them is in regards to the event pipeline.

The responsibility of the event pipeline is "to provide an indexed view of the event storage." In other words, it's like the repository pattern, but this time at a much lower level. Basically we want to make it look like all of the events are in memory.

In Greg's response to a question Yahoo Groups, he mentions that he created a custom pipeline implementation using the file system which preserved the message ordering because it had to. I have been digging for this very response for a few weeks now. I was trying to determine the actual implementation details of the pipeline. This understanding of some of the implementation details helps me to reconcile my understanding of where the write-ahead log fits into the picture. The write-ahead log *is* the pipeline—or at least part of it.

It appears that the pipeline is broken down into two pieces: a local storage (the write-ahead log) and permanent, shared storage, meaning a persistence engine of some type—perhaps CouchDB? These two persistence mechanisms would most likely implement some type of IPipeline interface. Then, there would be another object, let's call it CompositePipeline that would also implement IPipeline, which would contain two IPipeline references—one to the local file system, and the other to the database.

Here is where it gets interesting. As soon as a unit of work is complete, several things would happen transactionally. First, the message(s) sent to the bounded context would be removed from their message channel (if we're using a service bus) and the events would be serialized to the file system. There is a slight issue regarding the transactional capabilities of the file system prior to Vista, but that's beyond the scope of this blog post.

In a completely separate process, we would have an instance of the FileSystemPipeline and a CouchDbPipeline and we would read events from the file system and put them into CouchDB in a transactional manner. This would in a multi-threaded, thread-safe manner. To completely avoid issues with file locking, we ensure that the instance of the pipeline inside of the bounded content memory space only adds to the file system—it never reads or deletes an event once written.

What about concurrency? If the events are temporarily stored in the file system and the system goes offline before writing them to the database, how is the rest of the system made aware of those state changes?

If you're only running a single instance of a bounded context, then it doesn't matter. Your reporting DB and other bounded contexts won't get the events until the machine comes back online—that's all part of the "eventually consistent" paradigm.

I am still investigating how best to handle offline scenarios when there are multiple instances of the same bounded context. In general though, it doesn't appear to be much different—the rest of the system gets updated when the node comes back online.

One possibility would be to have each event produced have some sort of correlation identifier which tied back to the original command message. Then, if the locator (content-based router) detected a particular node was offline, it could re-dispatch those same commands to another node for processing. When the failed node came back online and started persisting to the database the persistence process would detect whether or not the particular correlation identifier was already stored and simply skip storing those events. I haven't thought this through all the way, but in general the idea might work.