Peter Goodman bio photo

Peter Goodman

A software engineer and leader living in Auckland building products and teams. Originally from Derry, Ireland.

Twitter Google+ LinkedIn Github

I’ve had a few occasions recently to use Reactive Extensions in production for message processing. Once you finally bend your brain around Rx, you started seeing uses for it everywhere you look, it really is a LINQ style awakening moment when you suddenly start to see all the places that you should be using it.

I’m going to walk through a scenario we had and explain where we used Reactive Extensions to tame some of the messaging bottlenecks we had. In our scenario we were trying to funnel workflow tracking messages into a sort of log table in a SQL database but there should be some mileage here anywhere you are raising asynchronous messages and want to optimize the throughput effectively.

I will note that we did not need strict guaranteed delivery, that is to say, it was not absolutely critical that our messages get to the database. I’m not saying that this approach would not work for another scenario that needs it, I’m just pointing out that we have not tested it under those conditions.

Original Implementation

In our enterprise workflow system we can have any number of workflow definitions running under different web applications scaled out over many web farm nodes. Each of these workflow web applications could have any number of running instances at any one time. Each workflow emits tracking records which log the progress through each activity in the workflow, any exceptions etc. The thing about processing workflow tracking records is that you need to get rid of the messages as quickly as possible as you do not want to adversely affect the performance of the workflow instances themselves.

If we were to just process the messages synchronously in the workflow tracking participant (the plugin class that handles tracking messages as they are raised) then we would see the following type of message pipeline where each message is handled as a single unit and written to the database one at a time.

RxMessageProcessing1

The problem with this approach is that many instances of many workflows on many web applications from many server nodes would all be sending messages to this single service. Even if that service is load balanced you can get into a scenario where peak loads can cause a lot of contention on this service which backs up onto the workflows themselves causing an overall degradation in performance and can affect user experience. Therefore the secret to absorbing some of this peak load is to implement some sort of “Load Levelling” where the load is normalised over a period of time.

LoadLevelling

The messages are essentially throttled to only process so many at a time, processing will continue long after the load has subsided. We decided in the first place to put these messages onto our own messaging infrastructure which is basically an MSMQ queue, another windows service process will pick up these messages later and funnel them into the database, another way to do this would be Azure Service Bus Queues, they often talk up the load-levelling characteristics of queuing. Initially this approach was fine, the messages would be placed asynchronously on a queue and later they would be processed one at a time and written to the tracking database table. Each message was processed as a single transaction.

RxMessageProcessing2

The side effect of this approach is that the load is moved to another process, our message listener at the time was single threaded so each message would get processed before the next message was picked up off the queue. This resulted in the levelling that I described above.

This worked well, until we met a client who pushed the limits of what we had tested and found that they could create quite a lot of work for the message processor. In fact they created so much work that the messages were expiring on the queue due to the load on the message processor. In other words the single-threaded pipeline could not process the messages fast enough before the TTL on the messages expired. We needed to add more threads to this sucker but a thread per message could be excessive, creating threads is not completely free. We could just have used the Task Parallel Library but as we will see using Rx we can easily extend the behaviour of the message pipeline as required.

Using Rx to Batch a Message Pipeline

I knew I wanted to use reactive extensions to compose the message pipeline. The first task was to convert the pipeline code from an event based model to an Rx Observable so I could start. For this, the easiest thing was to simply use the FromEventPattern method on Observable.

trackingRecords =
    Observable.FromEventPattern<MessageEventArgs>(
        ev => messageSource.MessageReceived += ev,
        ev => messageSource.MessageReceived -= ev)

Next thing was to figure out how to process more of the messages at a time so that they did not expire on the queue. This is where the Rx Buffer operator comes in, basically pass it a batch size and timeout period and the incoming messages will be batched up into a collection, the size of the collection is based on receiving 500 items or a 1 second window, whichever happens first. Once the collection is created a new one will start being buffered.

activityTrackSubscription = trackingRecords
    .Buffer(TimeSpan.FromSeconds(1), 500)
    .Where(items => items.Any())
    .Subscribe(records => {
          repository.RecordWorkflowTrack(records);
    });

In the diagram below you can see that we move from a stream of messages to a stream of message collections. Each of these is then handled on its own thread while the aggregating of the next batch continues.

Then I handed this collection over to our ORM (NHibernate) to write in a single transaction to the database instead of multiple individual transactions. Wicked! Just a few lines of code and already we have a batch that is constructed from a stream of messages and handled as a batch command rather than one at a time.

RxMessageProcessing3

Think about what this would have involved if you wanted to write it yourself. You would have had to keep a collection of messages and timestamp in some sort of state/fields, then you would have had to copy the message collection, clear the fields, add the threading code to create a new thread to process the batch. Not undoable but a pain all the same.

This helped us survive for a while but we started to get some reports of more similar CPU activity, this time on this message listener and SQL server. After looking into it we discovered 2 things, the first was that the ORM wasn’t actually batching the inserts, sure they were in a single transaction but each were individual inserts. Problem solved with SQL bulk copy.

The second issue was actually due to the second usage of this event stream, apart from inserting each message into a log table we also updated a summary table which held a single row per workflow instance. This row had the last action date and other summary information that was included in each tracking message of a certain type. These messages were still being processed one at a time, and for each one we would load the row into memory, perform the changes through a mapper and update the row, all the while taking locks etc.You see, when a workflow executes there is a pulse of activity until it idles, during this pulse of activity we may emit many tracking records therefore processing these one at a time can be expensive.

To solve this we needed to group incoming records according to the instance id of the workflow and make the changes in memory before writing to the database. Effectively we were changing the approach from making many consecutive changes in the database to an approach which made those changes in memory first and wrote the final result to the database.

Batching a message pipeline according to the Message Content

So for this task I used the Group operator. I always found it difficult to understand what the group operator was doing. I wondered, “How can it group events into a dictionary of messages when it doesn’t know what the groups (keys) will be?”. If you think of the LINQ Group function as grouping items in a collection into a keyed collection of collections (a dictionary), the Rx Group function actually groups into a keyed observable of observables. Who said this was hard?

Once we have our grouped observables we can buffer each group to achieve a batch in the same way we did before. The result of this is an observable stream that produces events where each event is a collection of messages for the same workflow instance.

RxMessageProcessing4

Although this time instead of bulk insert we are going to map each message into the same instance of our persisted row. In other words for each batch we will start by loading the single workflow summary row once, then run it through our mapper for each message in the batch and then persist the result to the database.

workflowSummarySubscription = trackingRecords
    .Where(record => IsSummaryInformation(record))
    .GroupBy(record => record.InstanceId)
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(1), 500))
    .Subscribe(records => {
        var row = GetRow(records.First().InstanceId);
        foreach(var record in records){
            UpdateRowFromRecord(record, row);
        }
        SaveRow(row);
    });

The result of this last set of changes is that we greatly improved the message pipeline throughput from ~50 messages/sec to 2700 messages/sec before the messages started backing up on the queue. The recoverability of the pipeline under peak load also improved such that the backlog was cleared in a matter of seconds.

Summary

If you have an asynchronous message pipeline, consider consuming it in Rx. Optimizing the stream becomes trivial after that as you can add context back into a stream of individual messages to create batches optimised for your consumption case.

Update

Jacob Reimers asked the following question on twitter

@jjrdk: @petegoo Cool blog post. Had similar issue at work recently. How would you deal with group streams that you need to close?

The trick here is to change the GroupBy to a GroupByUntil with a throttle. This will complete each group when a period of inactivity is reached (say 30 seconds), if more activity happens for that group later then it will get recreated.

workflowSummarySubscription = trackingRecords
    .Where(record => IsSummaryInformation(record))
    .GroupByUntil(record => record.InstanceId, g => g.Throttle(TimeSpan.FromSeconds(30)).Take(1))
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(1), 500))
    .Subscribe(records => {
        var row = GetRow(records.First().InstanceId);
        foreach(var record in records){
            UpdateRowFromRecord(record, row);
        }
        SaveRow(row);
    });