Rx framework – IObservable<T>.Catch and IObservable<T>.OnErrorResumeNext

Like IObservable<T>.Concat, the IObservable<T>.Catch extension method concatenates an observable collection with another.

However, unlike the Concat method where the subscription to the second observable collection happens after the first had completed, in the Catch method the subscription to the second collection happens after and only after the first had excepted!

Which begs a very good question:

What if I want to concatenate an observable collection with another regardless of whether the first succeeds or not?

Well, the answer to that question is IObservable<T>.OnErrorResumeNext, which always subscribes to the second observable collection at the end of the first.

image

Like so many other merge-type extension methods in Rx, there are two flavours to choose from when using the Catch or OnErrorResumeNext methods. Either as extension method:


var zs = xs.Catch(ys);

var zs2 = xs.OnErrorResumeNext(ys);

or as static methods which give you the option to merge more than two observable collections:


var zs = Observable.Catch(xs, ys, us, vs);

var zs2 = Observable.OnErrorResumeNext(xs, ys, us, vs);

Rx framework – IObservable<T>.Concat

Yet another extension method to combine two observable collections, this time we have IObservable<T>.Concat which is very similar to IObservable<T>.Merge, but crucially, when you concatenate one observable collection to another, the subscription to the second observable collection happens after the first had completed! Here’s a quick illustration of how the two methods differ:

image

As you’ve probably guessed already, the Concat method requires the observable collections to be merged to be of the same type.

Again, you can either invoke it as an extension method:

var zs = xs.Concat(ys);

or you can invoke it as static method on more than two observable collections:

var zs = Observable.Concat(xs, ys, us, vs);

Rx framework – IObservable<T>.CombineLatest

The IObservable<T>.CombineLatest extension method is very similar to IObservable<T>.Zip and IObservable<T>.Merge in that it combines two observable collections and returns a new one.

Unlike IObservable<T>.Merge, IObservable<T>.CombineLatest does not require the merged observable collections to be of the same type.

Like IObservable<T>.Zip, IObservable<T>.CombineLatest combines ‘pairs‘ of values from the two observable collections, but unlike Zip when a new value becomes available on one collection it does not wait till a new value to be available on the other collection, instead it takes whatever the latest value is from the other collection (provided there is one):

image

Again, like the Merge method, you can either invoke CombineLatest as an extension method:

var zs = xs.CombineLatest(ys, (a, b) => a + b);

or you can invoke it as static method:

var zs = Observable.CombineLatest(xs, ys, (a, b) => a + b);

Rx framework – IObservable<T>.Merge

In my previous post I used the IObservable<T>.Zip extension method in the Drag-and-Drop example, which takes two observable collections and runs a function over them to return a new observable collection of potentially a different type.

But what if you just want to simply merge the two observable collections into one stream? Well, Rx API has another extension method called Merge. To use it you can either invoke it as extension method:

var zs = xs.Merge(ys);

or you can invoke it as static method on more than two observable collections:

var zs = Observable.Merge(xs, ys, us, vs);

To use the Merge method, all the observable collections being merged need to be of the same type.

There are two things you should keep in mind:

1. OnCompleted is fired on the composite observable collection (zs) at the point the last OnCompleted is fired on any of the merged observable collections:

image

2. OnError is fired on the composite observable collection (zs) at the point the first OnError is fired on any of the merged observable collections, any subsequent values on the merged observable collection will not make it onto the composite collection:

image

Linq Over Events – Playing with the Rx Framework

As you may have read about on numerous MVP’s blogs already, there’s an ongoing research project at Microsoft’s DevLabs called Reactive Extensions for .Net (Rx) which introduces a push-style duality (equal and opposite) of the traditional pull-style enumeration using Linq.

Erik Meijer gave a 30 mins introduction to the reactive framework in this video, I highly recommend that you sit through this to help you understand the basics of how Rx works. In summary, Rx introduces two main interfaces:

IObservable<T>, which represents a push-style collection

IObserver<T>, which supports push-style iteration over an observable sequence

Used together, they allow you to write programs which reacts to inputs from the environment as opposed to the traditional interactive program which asks the environment for input.

image

Drag-and-Drop using Rx

For a quick demo to show how you can implement a Drag-and-Drop feature in WPF with just a few lines of code, see this video on Channel9. Unfortunately, since the video was released, the API has changed somewhat (Observable.Context is gone for example), so below is a revised version which works with the latest API.

To get started, download the Rx extension for .Net 3.5 SP1 and start a new WPF application in Visual Studio 2008, add references to System.CoreEx and System.Reactive. In Windows1.xaml, replace the default Grid with:

<Canvas>
    <TextBlock Name="counter" Text="Rx Test"/>
    <Image Name="image"
           Source=[put the path to an image here]
           Width="100" Height="100" Canvas.Left="0" Canvas.Top="25"/>
</Canvas>

and then in the code-behind, change the constructor to something like this:

public Window1()
{
    InitializeComponent();

    // create a stream which returns a new value every second, starting from -1
    var xs = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(-1);

    // subscribe to the above stream
    xs.ObserveOnDispatcher().Subscribe(value => counter.Text = value.ToString());

    // create event streams for mouse down/up/move using reflection
    var mouseDown = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown")
                    select evt.EventArgs.GetPosition(this);
    var mouseUp = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseUp")
                  select evt.EventArgs.GetPosition(this);
    var mouseMove = from evt in Observable.FromEvent<MouseEventArgs>(image, "MouseMove")
                    select evt.EventArgs.GetPosition(this);

    // between mouse down and mouse up events
    // keep taking pairs of mouse move events and return the change in X, Y positions
    // from one mouse move event to the next as a new stream
    var q = from start in mouseDown
            from pos in mouseMove.StartWith(start).TakeUntil(mouseUp)
                         .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                              new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
            select pos;

    // subscribe to the stream of position changes and modify the Canvas.Left and Canvas.Top
    // property of the image to achieve drag and drop effect!
    q.ObserveOnDispatcher().Subscribe(value =>
          {
              Canvas.SetLeft(image, Canvas.GetLeft(image) + value.X);
              Canvas.SetTop(image, Canvas.GetTop(image) + value.Y);
          });
}

This code might seem a little confusing, but just imagine how much code you’d have to write otherwise to implement Drag-and-Drop feature using the standard C# events and event handlers, and how messy and cluttered that code will will look if you did! So there’s definitely benefits to be had here, and therefore well worth the time and effort to learn more about the Rx :-)

Back to the example, let’s look a closer look at some of the methods used here to help you get a sense of what’s going on.

IObservable<T>.Subscribe

If you have an observable collection, xs:

IObservable<int> xs = …;

You can subscribe to this collection with an IObserver, or alternatively you can use a handler as we did in the Drag-and-Drop example. You can actually provide three different handlers to an IObservable<T> collection:

  • Action<T> valueHandler, which gets called when a new value is received on the collection.
  • Action<Exception> exceptionHandler, which gets called when an exception is caught during the iteration, if none is provided, then the original exception is simply re-thrown.
  • Action completionHandler, which gets called when the iteration is finished (when mouseUp is received in the example), by default this does nothing.

In the example, q was the IObservable collection of an anonymous type we created to hold the changes in the X and Y positions between adjacent mouseMove events between a mouseDown and mouseUp event. WPF stipulates that changes to elements must be made on the owning thread, and the way to ensure our handlers run on the owning thread is to first call the ObserveOnDispatcher extension method so observers are asynchronously notified using the current dispatcher.

You should note that the Subscribe extension method actually returns an IDisposable. If you wish to unsubscribe from the observable collection, you need to call Dispose() on this IDisposable:

var s = q.ObserveOnDispatcher().Subscribe(value =>
            {
                Canvas.SetLeft(image, Canvas.GetLeft(image) + value.X);
                Canvas.SetTop(image, Canvas.GetTop(image) + value.Y);
            });
// to unsubscribe
s.Dispose();

And finally, in this video, Wes Dyer outlined the four phases of execution in subscription:

1. Build observable query

var xs = ys.Select(f);

2. Subscribe

var s = xs.Subscribe(h);

3. Value

h();

4. Unsubscribe

s.Dispose();

IObservable<T>.Zip

The IObservable<T>.Zip extension method you see in the example above takes two streams and a function and returns a new stream. So let Xs and Ys be two streams of events which need to happen in pairs we can combine them into a new stream of events, Zs, which tells us when both Xs and its corresponding Ys has happened:

Zs = Xs.Zip(Ys, (a, b) -> a + b)

image

You should note that the order in which Xs and Ys happen does not matter in this case.

So back to the drag-and-drop example above, we used the Zip extension method to combine two streams:

mm, which translates to mouseMove.StartWith(start).TakeUntil(mouseUp)

mm.Skip(1), which translates to mouseMove.StartWith(start).TakeUntil(mouseUp).Skip(1)

and returns a new stream of values:

var q = from start in mouseDown
        from pos in mouseMove.StartWith(start).TakeUntil(mouseUp)
                     .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                          new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
        select pos;

References:

Rx podcasts on Channel9