There are two key types to understand when working with Rx, and a subset of auxiliary types that will help you to learn Rx more effectively. The IObserver<T> and IObservable<T> form the fundamental building blocks for Rx, while implementations of ISubject<TSource, TResult> reduce the learning curve for developers new to Rx.

Many are familiar with LINQ and its many popular forms like LINQ to Objects, LINQ to SQL & LINQ to XML. Each of these common implementations allows you query data at rest; Rx offers the ability to query data in motion. Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as multicast delegates or events (which are usually multicast delegates). Multicast delegates are not ideal however as they exhibit the following less desirable features;

Rx looks to solve these problems. Here I will introduce you to the building blocks and some basic types that make up Rx.


IObservable<T> is one of the two new core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft is so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a streaming sequence of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.

// defines a provider for push-based notification.
public interface IObservable<out T>;
    // notifies the provider that an observer is to receive notifications.
    IDisposable Subscribe(IObserver<T> observer);

.NET already has the concept of Streams with the type and sub types of System.IO.Stream. The System.IO.Stream implementations are commonly used to stream data (generally bytes) to or from an I/O device like a file, network or block of memory. System.IO.Stream implementations can have both the ability to read and write, and sometimes the ability to seek (i.e. fast forward through a stream or move backwards). When I refer to an instance of IObservable<T> as a stream, it does not exhibit the seek or write functionality that streams do. This is a fundamental difference preventing Rx being built on top of the System.IO.Stream paradigm. Rx does however have the concept of forward streaming (push), disposing (closing) and completing (eof). Rx also extends the metaphor by introducing concurrency constructs, and query operations like transformation, merging, aggregating and expanding. These features are also not an appropriate fit for the existing System.IO.Stream types.

Some others refer to instances of IObservable<T> as Observable Collections, which I find hard to understand. While the observable part makes sense to me, I do not find them like collections at all. You generally cannot sort, insert or remove items from an IObservable<T> instance like I would expect you can with a collection. Collections generally have some sort of backing store like an internal array. The values from an IObservable<T> source are not usually pre-materialized as you would expect from a normal collection. There is also a type in WPF/Silverlight called an ObservableCollection<T> that does exhibit collection-like behavior, and is very well suited to this description.

In fact IObservable<T> integrates very well with ObservableCollection<T> instances. So to save on any confusion we will refer to instances of IObservable<T> as sequences. While instances of IEnumerable<T> are also sequences, we will adopt the convention that they are sequences of data at rest, and IObservable<T> instances are sequences of data in motion.


IObserver<T> is the other one of the two core interfaces for working with Rx. It too has made it into the BCL as of .NET 4.0. Don't worry if you are not on .NET 4.0 yet as the Rx team have included these two interfaces in a separate assembly for .NET 3.5 and Silverlight users. IObservable<T> is meant to be the "functional dual of IEnumerable<T>".If you want to know what that last statement means, then enjoy the hours of videos on Channel9 where they discuss the mathematical purity of the types. For everyone else it means that where an IEnumerable<T> can effectively yield three things (the next value, an exception or the end of the sequence), so too can IObservable<T> via IObserver<T>'s three methods OnNext(T), OnError(Exception) and OnCompleted().

// provides a mechanism for receiving push-based notifications.
public interface IObserver<in T>
    // provides the observer with new data.
    void OnNext(T value);
    // notifies the observer that the provider has experienced an error condition.
    void OnError(Exception error);
    // notifies the observer that the provider has finished sending push-based notifications.
    void OnCompleted();

Rx has an implicit contract that must be followed. An implementation of IObserver<T> may have zero or more calls to OnNext(T) followed optionally by a call to either OnError(Exception) or OnCompleted(). This protocol ensures that if a sequence terminates, it is always terminated by an OnError(Exception), or an OnCompleted(). This protocol does not however demand that an OnNext(T), OnError(Exception) or OnCompleted() ever be called. This enables to concept of empty and infinite sequences. We will look into this more later.

Interestingly, while you will be exposed to the IObservable<T> interface frequently if you work with Rx, in general you will not need to be concerned with IObserver<T>. This is due to Rx providing anonymous implementations via methods like Subscribe.

Implementing IObserver<T> and IObservable<T>

It is quite easy to implement each interface. If we wanted to create an observer that printed values to the console it would be as easy as this.

public class MyConsoleObserver<T> : IObserver<T>;
     public void OnNext(T value)
          Console.WriteLine("Received value {0}", value);

     public void OnError(Exception error)
          Console.WriteLine("Sequence faulted with {0}", error);

     public void OnCompleted()
          Console.WriteLine("Sequence terminated");

Implementing an observable sequence is a little bit harder. An overly simplified implementation that returned a sequence of numbers could look like this.

public class MySequenceOfNumbers : IObservable<int>
    public IDisposable Subscribe(IObserver<int> observer)
        return Disposable.Empty;

We can tie these two implementations together to get the following output

var numbers = new MySequenceOfNumbers();
var observer = new MyConsoleObserver<int>();


Received value 1
Received value 2
Received value 3
Sequence terminated

The problem we have here is that this is not really reactive at all. This implementation is blocking, so we may as well use an IEnumerable<T> implementation like a List<T> or an array.

This problem of implementing the interfaces should not concern us too much. You will find that when you use Rx, you do not have the need to actually implement these interfaces, Rx provides all of the implementations you need out of the box. Let's have a look at the simple ones.

    I like to think of the <em>IObserver&lt;T&gt;</em> and the <em>IObservable&lt;T&gt;</em>
    as the 'reader' and 'writer' or, 'consumer' and 'publisher' interfaces. If you were
    to create your own implementation of <em>IObservable&lt;T&gt;</em> you may find
    that while you want to publicly expose the IObservable characteristics you still
    need to be able to publish items to the subscribers, throw errors and notify when
    the sequence is complete. Why that sounds just like the methods defined in <em>IObserver&lt;T&gt;</em>!
    While it may seem odd to have one type implementing both interfaces, it does make
    life easy. This is what <a title="Using Rx Subjects - MSDN" href="http://msdn.microsoft.com/en-us/library/hh242969(v=VS.103).aspx">
        subjects</a> can do for you. <a title="Subject(Of T) - MSDN" href="http://msdn.microsoft.com/en-us/library/hh229173(v=VS.103).aspx">
            <em>Subject&lt;T&gt;</em></a> is the most basic of the subjects. Effectively
    you can expose your <em>Subject&lt;T&gt;</em> behind a method that returns <em>IObservable&lt;T&gt;</em>
    but internally you can use the <em>OnNext</em>, <em>OnError</em> and <em>OnCompleted</em>
    methods to control the sequence.
    In this very basic example, I create a subject, subscribe to that subject and then
    publish values to the sequence (by calling <code>subject.OnNext(T)</code>).
<pre class="csharpcode">
  static void Main(string[] args)
    var subject = new Subject&lt;string&gt;();


  //Takes an IObservable&lt;string&gt; as its parameter. 
  //Subject&lt;string&gt; implements this interface.
  static void WriteSequenceToConsole(IObservable&lt;string&gt; sequence)
    //The next two lines are equivalent.
    Note that the <code>WriteSequenceToConsole</code> method takes an <em>IObservable&lt;string&gt;</em>
    as it only wants access to the subscribe method. Hang on, doesn't the <em>Subscribe</em>
    method need an <em>IObserver&lt;string&gt;</em> as an argument? Surely <em>Console.WriteLine</em>
    does not match that interface. Well it doesn't, but the Rx team supply me with an
    Extension Method to <em>IObservable&lt;T&gt;</em> that just takes an <a title="Action(Of T) Delegate - MSDN"
    The action will be executed every time an item is published. There are <a title="ObservableExtensions class - MSDN"
        other overloads to the Subscribe extension method</a> that allows you to pass
    combinations of delegates to be invoked for <em>OnNext</em>, <em>OnCompleted</em>
    and <em>OnError</em>. This effectively means I don't need to implement <em>IObserver&lt;T&gt;</em>.
    As you can see, <em>Subject&lt;T&gt;</em> could be quite useful for getting started
    in Rx programming. <em>Subject&lt;T&gt;</em> however, is a basic implementation.
    There are three siblings to <em>Subject&lt;T&gt;</em> that offer subtly different
    implementations which can drastically change the way your program runs.

    <a title="ReplaySubject(Of T) - MSDN" href="http://msdn.microsoft.com/en-us/library/hh211810(v=VS.103).aspx">
        <em>ReplaySubject&lt;T&gt;</em></a> provides the feature of caching values and
    then replaying them for any late subscriptions. Consider this example where we have
    moved our first publication to occur before our subscription
<pre class="csharpcode">
    static void Main(string[] args)
      var subject = new Subject&lt;string&gt;();


    The result of this would be that 'b' and 'c' would be written to the console, but
    'a' ignored. If we were to make the minor change to make subject a <em>ReplaySubject&lt;T&gt;</em>
    we would see all publications again.
<pre class="csharpcode">
    var subject = new ReplaySubject&lt;string&gt;();


    This can be very handy for eliminating race conditions. Be warned though, the default
    constructor of the <em>ReplaySubject&lt;T&gt;</em> will create an instance that
    caches every value published to it. In many scenarios this could create unnecessary
    memory pressure on the application. <em>ReplaySubject&lt;T&gt;</em> allows you to
    specify simple cache expiry settings that can alleviate this memory issue. One option
    is that you can specify the size of the buffer in the cache. In this example we
    create the <em>ReplaySubject&lt;T&gt;</em> with a buffer size of 2, and so only
    get the last two values published prior to our subscription:
<pre class="csharpcode">
    public void ReplaySubjectBufferExample()
        var bufferSize = 2;
        var subject = new ReplaySubject&lt;string&gt;(bufferSize);

    Here the output would show that the value 'a' had been dropped from the cache, but
    values 'b' and 'c' were still valid. The value 'd' was published after we subscribed
    so it is also written to the console.
<div class="output">
    <div class="line">Output:</div>
    <div class="line">b</div>
    <div class="line">c</div>
    <div class="line">d</div>
    Another option for preventing the endless caching of values by the <em>ReplaySubject&lt;T&gt;</em>,
    is to provide a window for the cache. In this example, instead of creating a <em>ReplaySubject&lt;T&gt;</em>
    with a buffer size, we specify a window of time that the cached values are valid
<pre class="csharpcode">
    public void ReplaySubjectWindowExample()
        var window = TimeSpan.FromMilliseconds(150);
        var subject = new ReplaySubject&lt;string&gt;(window);

    In the above example the window was specified as 150 milliseconds. Values are published
    100 milliseconds apart. Once we have subscribed to the subject, the first value
    is 200ms old and as such has expired and been removed from the cache.
<div class="output">
    <div class="line">Output:</div>
    <div class="line">x</div>
    <div class="line">y</div>
    <div class="line">z</div>
    <a title="BehaviorSubject(Of T) - MSDN" href="http://msdn.microsoft.com/en-us/library/hh211949(v=VS.103).aspx">
        <em>BehaviorSubject&lt;T&gt;</em></a> is similar to <em>ReplaySubject&lt;T&gt;</em>
    except it only remembers the last publication. <em>BehaviorSubject&lt;T&gt;</em>
    also requires you to provide it a default value of <em>T</em>. This means that all
    subscribers will receive a value immediately (unless it is already completed).
    In this example the value 'a' is written to the console:
<pre class="csharpcode">
    public void BehaviorSubjectExample()
        //Need to provide a default value.
        var subject = new BehaviorSubject&lt;string&gt;(&quot;a&quot;);
    In this example the value 'b' is written to the console, but not 'a'.
<pre class="csharpcode">
    public void BehaviorSubjectExample2()
      var subject = new BehaviorSubject&lt;string&gt;(&quot;a&quot;);
    In this example the values 'b', 'c' &amp; 'd' are all written to the console, but
    again not 'a'
<pre class="csharpcode">
    public void BehaviorSubjectExample3()
      var subject = new BehaviorSubject&lt;string&gt;(&quot;a&quot;);

    Finally in this example, no values will be published as the sequence has completed.
    Nothing is written to the console.
<pre class="csharpcode">
    public void BehaviorSubjectCompletedExample()
      var subject = new BehaviorSubject&lt;string&gt;(&quot;a&quot;);
    That note that there is a difference between a <em>ReplaySubject&lt;T&gt;</em> with a buffer size
    of one (commonly called a 'replay one subject') and a <em>BehaviorSubject&lt;T&gt;</em>.
    A <em>BehaviorSubject&lt;T&gt;</em> requires an initial value. With the assumption
    that neither subjects have completed, then you can be sure that the <em>BehaviorSubject&lt;T&gt;</em>
    will have a value. You cannot be certain with the <em>ReplaySubject&lt;T&gt;</em> however. With this
    in mind, it is unusual to ever complete a <em>BehaviorSubject&lt;T&gt;</em>. Another difference is
    that a replay-one-subject will still cache its value once it has been completed.
    So subscribing to a completed <em>BehaviorSubject&lt;T&gt;</em> we can be sure to not receive any
    values, but with a <em>ReplaySubject&lt;T&gt;</em> it is possible.
    <em>BehaviorSubject&lt;T&gt;</em>s are often associated with class <a href="http://msdn.microsoft.com/en-us/library/65zdfbdt(v=vs.71).aspx">
        properties</a>. As they always have a value and can provide change notifications,
    they could be candidates for backing fields to properties.
    <a title="AsyncSubject(Of T) - MSDN" href="http://msdn.microsoft.com/en-us/library/hh229363(v=VS.103).aspx">
        <em>AsyncSubject&lt;T&gt;</em></a> is similar to the Replay and Behavior subjects
    in the way that it caches values, however it will only store the last value, and
    only publish it when the sequence is completed. The general usage of the <em>AsyncSubject&lt;T&gt;</em>
    is to only ever publish one value then immediately complete. This means that is
    becomes quite comparable to <em>Task&lt;T&gt;</em>.
    In this example no values will be published as the sequence never completes. No
    values will be written to the console.
<pre class="csharpcode">
    static void Main(string[] args)
      var subject = new AsyncSubject&lt;string&gt;();
    In this example we invoke the <em>OnCompleted</em> method so the last value 'c'
    is written to the console:
<pre class="csharpcode">
    static void Main(string[] args)
      var subject = new AsyncSubject&lt;string&gt;();

<h2>Implicit contracts</h2>
    There are implicit contacts that need to be upheld when working with Rx as mentioned
    above. The key one is that once a sequence is completed, no more activity can
    happen on that sequence. A sequence can be completed in one of two ways, either
    by <em>OnCompleted()</em> or by <em>OnError(Exception)</em>.
    The four subjects described in this chapter all cater for this implicit contract by
    ignoring any attempts to publish values, errors or completions once the sequence
    has already terminated.
    Here we see an attempt to publish the value 'c' on a completed sequence. Only values
    'a' and 'b' are written to the console.
<pre class="csharpcode">
    public void SubjectInvalidUsageExample()
        var subject = new Subject&lt;string&gt;();


<h2>ISubject interfaces</h2>
    While each of the four subjects described in this chapter implement the <em>IObservable&lt;T&gt;</em>
    and <em>IObserver&lt;T&gt;</em> interfaces, they do so via another set of interfaces:
<pre class="csharpcode">
    //Represents an object that is both an observable sequence as well as an observer.
    public interface ISubject&lt;in TSource, out TResult&gt; 
        : IObserver&lt;TSource&gt;, IObservable&lt;TResult&gt;
    As all the subjects mentioned here have the same type for both <em>TSource</em>
    and <em>TResult</em>, they implement this interface which is the superset of all
    the previous interfaces:
<pre class="csharpcode">
    //Represents an object that is both an observable sequence as well as an observer.
    public interface ISubject&lt;T&gt; : ISubject&lt;T, T&gt;, IObserver&lt;T&gt;, IObservable&lt;T&gt;
    These interfaces are not widely used, but prove useful as the subjects do not share
    a common base class. We will see the subject interfaces used later when we discover
    <a href="14_HotAndColdObservables.html">hot and cold observables</a>.
<h2>Subject factory</h2>
    Finally it is worth making you aware that you can also create a subject via a factory
    method. Considering that a subject combines the <em>IObservable&lt;T&gt;</em> and
    <em>IObserver&lt;T&gt;</em> interfaces, it seems sensible that there should be a
    factory that allows you to combine them yourself. The <em>Subject.Create(IObserver&lt;TSource&gt;,
        IObservable&lt;TResult&gt;)</em> factory method provides just this.
<pre class="csharpcode">
    //Creates a subject from the specified observer used to publish messages to the subject
    //  and observable used to subscribe to messages sent from the subject
    public static ISubject&lt;TSource, TResult&gt; Create&lt;TSource, TResult&gt;(
        IObserver&lt;TSource&gt; observer, 
        IObservable&lt;TResult&gt; observable)
    Subjects provide a convenient way to poke around Rx, however they are not recommended
    for day to day use. An explanation is in the <a href="18_UsageGuidelines.html">Usage
        Guidelines</a> in the appendix. Instead of using subjects, favor the factory
    methods we will look at in <a href="04_CreatingObservableSequences.html">Part 2</a>.
    The fundamental types <em>IObserver&lt;T&gt;</em> and <em>IObservable&lt;T&gt;</em>
    and the auxiliary subject types create a base from which to build your Rx knowledge.
    It is important to understand these simple types and their implicit contracts. In
    production code you may find that you rarely use the <em>IObserver&lt;T&gt;</em>
    interface and subject types, but understanding them and how they fit into the Rx
    eco-system is still important. The <em>IObservable&lt;T&gt;</em> interface is the
    dominant type that you will be exposed to for representing a sequence of data in
    motion, and therefore will comprise the core concern for most of your work with Rx and most
    of this book.
