Notification Patterns in Rx

Reactive Extensions
2014-08-30

Reactive Extensions (Rx) is a set of classes and functions that allow a developer to control data over time. It allows for event streams to be manipulated as easily as collections can be manipulated with LINQ.

An event stream can be made up of anything. It could be button clicks, key presses, or calls to/from a client or server. One of the classic “Hello, world!” examples for Rx is implementing a search box [101 Rx Samples, Stack Overflow]. This is where the search doesn’t occur until the user has stopped typing for a few hundred milliseconds. Searching on every keystroke, especially against an external database, would bog down the user interface. However with a couple Rx function calls the program can aggregate these events, run the search in background thread, then return to the UI thread.

I like to think of Rx as LINQ but extended out to three dimensions.

The first dimension of Rx is Data. LINQ also has this dimension. Most LINQ functions you are familiar with are also available with Rx event streams. An Rx event stream is defined in the library as an IObservable<T>, which is analogous to the IEnumerable<T> type. This supports a Select method to transform the data to another type and a Where method to filter objects on the stream. Rx is also similar to LINQ in that it follows a fluent, functional programing (FP) style. This FP style allows the code to be very expressive and maintainable.

The second dimension of Rx is Time. Rx has additional functions, not found in LINQ, to control and aggregate data on the stream over time. Event data can be grouped into time boxes (the Buffer or Sample methods), delayed (the Delay method), or monitor for missing data (the Timeout method). These are just a few examples and I will cover these in this series of blog posts. Also note that Rx support a scheduler so that production code that works with events over hours can have automated unit tests that execute this code in milliseconds.

The third dimension of Rx is Threading. By default Rx is single threaded. Applying functions like Select or Where will have the resulting event stream handled on the same thread that the events are generated on. However it is a simple matter of injecting the ObserveOn method into the fluent FP method chain to offload the work to a different thread or join to another thread.

I found the best way to learn Rx is to try to apply it to a real world problems. I’ve found this much more productive than simply reading up on all the available methods and hoping to replace my existing code with these methods. That was the approach I took with LINQ, and for LINQ it was very effective. LINQ can typically be used to take non-LINQ code (such as for loops and if statements) that are already implemented and replace it with LINQ functions and lambdas. Using LINQ functions over conventional programming constructs often yielded half the amount of code.

I’ve found Rx isn’t applied in the same way. Yes, Rx does reduce the lines of code required to solve a problem. However the code savings are much, much more than half. Typically I find that the problems I am solving with Rx are problems that I wouldn’t even attempt to solve if I were limited to conventional programming constructs.

In this series of blog posts the real world problem I’ll be looking at is notifying users. There are a number of common patterns that I employ when setting up a notification system. First and foremost, I want to avoid spamming users with every single nugget of information the program is producing. There are three patterns I’ll cover:

Before I get to these notification patterns, I cover the basics of setting up an event stream, subscribing, and a general overview of the test program I’ll be using for the demonstrations. All the code is available on GitHub.

Before we get started, I’ll quickly state some history and additional resources. The Rx library is owned and maintained by Microsoft. It was originally releases around 2010. It targets the 4.0 framework, although there is version available that targets 3.5 if needed.