Reactive Extensions (Rx) is a set of classes and functions that allow a developer to control data over time. It allows for event streams to be manipulated as easily as collections can be manipulated with LINQ.
An event stream can be made up of anything. It could be button clicks, key presses, or calls to/from a client or server. One of the classic “Hello, world!” examples for Rx is implementing a search box [101 Rx Samples, Stack Overflow]. This is where the search doesn’t occur until the user has stopped typing for a few hundred milliseconds. Searching on every keystroke, especially against an external database, would bog down the user interface. However with a couple Rx function calls the program can aggregate these events, run the search in background thread, then return to the UI thread.
I like to think of Rx as LINQ but extended out to three dimensions.
The first dimension of Rx is Data. LINQ also has this
dimension. Most LINQ functions you are familiar with are also
available with Rx event streams. An Rx event stream is defined in
the library as an
IObservable<T>, which is
analogous to the
IEnumerable<T> type. This
supports a Select method to transform the data to another type and
a Where method to filter objects on the stream. Rx is also similar
to LINQ in that it follows a
fluent, functional programing (FP) style. This FP style allows the code
to be very expressive and maintainable.
The second dimension of Rx is Time. Rx has additional functions, not found in LINQ, to control and aggregate data on the stream over time. Event data can be grouped into time boxes (the Buffer or Sample methods), delayed (the Delay method), or monitor for missing data (the Timeout method). These are just a few examples and I will cover these in this series of blog posts. Also note that Rx support a scheduler so that production code that works with events over hours can have automated unit tests that execute this code in milliseconds.
The third dimension of Rx is Threading. By default Rx is
single threaded. Applying functions like Select or Where will have
the resulting event stream handled on the same thread that the
events are generated on. However it is a simple matter of
ObserveOn method into the fluent FP
method chain to offload the work to a different thread or join to
I found the best way to learn Rx is to try to apply it to a real world problems. I’ve found this much more productive than simply reading up on all the available methods and hoping to replace my existing code with these methods. That was the approach I took with LINQ, and for LINQ it was very effective. LINQ can typically be used to take non-LINQ code (such as for loops and if statements) that are already implemented and replace it with LINQ functions and lambdas. Using LINQ functions over conventional programming constructs often yielded half the amount of code.
I’ve found Rx isn’t applied in the same way. Yes, Rx does reduce the lines of code required to solve a problem. However the code savings are much, much more than half. Typically I find that the problems I am solving with Rx are problems that I wouldn’t even attempt to solve if I were limited to conventional programming constructs.
In this series of blog posts the real world problem I’ll be looking at is notifying users. There are a number of common patterns that I employ when setting up a notification system. First and foremost, I want to avoid spamming users with every single nugget of information the program is producing. There are three patterns I’ll cover:
- Lossless Notifications, where data from each event is aggregated into a single message
- Unexpected Error Notifications, where duplicate errors are thrown out
- Connection Error Notifications, where connection are queued and can be cancelled if the connection self corrects
Before I get to these notification patterns, I cover the basics of setting up an event stream, subscribing, and a general overview of the test program I’ll be using for the demonstrations. All the code is available on GitHub.
Before we get started, I’ll quickly state some history and additional resources. The Rx library is owned and maintained by Microsoft. It was originally releases around 2010. It targets the 4.0 framework, although there is version available that targets 3.5 if needed.
- The Rx libraries are available on NuGet: https://www.nuget.org/packages/Rx-Main/
- Microsoft Home Page for Rx: http://msdn.microsoft.com/en-ca/data/gg577609.aspx
- Some examples: http://rxwiki.wikidot.com/101samples
- A free e-book: http://www.introtorx.com/