Linq Over Events — Playing with the Rx Framework

As you may have read about on numer­ous MVP’s blogs already, there’s an ongo­ing research project at Microsoft’s DevLabs called Reac­tive Exten­sions for .Net (Rx) which intro­duces a push-style dual­i­ty (equal and oppo­site) of the tra­di­tion­al pull-style enu­mer­a­tion using Linq.

Erik Mei­jer gave a 30 mins intro­duc­tion to the reac­tive frame­work in this video, I high­ly rec­om­mend that you sit through this to help you under­stand the basics of how Rx works. In sum­ma­ry, Rx intro­duces two main inter­faces:

IObservable<T>, which rep­re­sents a push-style col­lec­tion

IObserver<T>, which sup­ports push-style iter­a­tion over an observ­able sequence

Used togeth­er, they allow you to write pro­grams which reacts to inputs from the envi­ron­ment as opposed to the tra­di­tion­al inter­ac­tive pro­gram which asks the envi­ron­ment for input.

image

Drag-and-Drop using Rx

For a quick demo to show how you can imple­ment a Drag-and-Drop fea­ture in WPF with just a few lines of code, see this video on Channel9. Unfor­tu­nate­ly, since the video was released, the API has changed some­what (Observable.Context is gone for exam­ple), so below is a revised ver­sion which works with the lat­est API.

To get start­ed, down­load the Rx exten­sion for .Net 3.5 SP1 and start a new WPF appli­ca­tion in Visu­al Stu­dio 2008, add ref­er­ences to System.CoreEx and System.Reactive. In Windows1.xaml, replace the default Grid with:

<Canvas>
    <TextBlock Name="counter" Text="Rx Test"/>
    <Image Name="image"
           Source=[put the path to an image here]
           Width="100" Height="100" Canvas.Left="0" Canvas.Top="25"/>
</Canvas>

and then in the code-behind, change the con­struc­tor to some­thing like this:

public Window1()
{
    InitializeComponent();

    // create a stream which returns a new value every second, starting from -1
    var xs = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(-1);

    // subscribe to the above stream
    xs.ObserveOnDispatcher().Subscribe(value => counter.Text = value.ToString());

    // create event streams for mouse down/up/move using reflection
    var mouseDown = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown")
                    select evt.EventArgs.GetPosition(this);
    var mouseUp = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseUp")
                  select evt.EventArgs.GetPosition(this);
    var mouseMove = from evt in Observable.FromEvent<MouseEventArgs>(image, "MouseMove")
                    select evt.EventArgs.GetPosition(this);

    // between mouse down and mouse up events
    // keep taking pairs of mouse move events and return the change in X, Y positions
    // from one mouse move event to the next as a new stream
    var q = from start in mouseDown
            from pos in mouseMove.StartWith(start).TakeUntil(mouseUp)
                         .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                              new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
            select pos;

    // subscribe to the stream of position changes and modify the Canvas.Left and Canvas.Top
    // property of the image to achieve drag and drop effect!
    q.ObserveOnDispatcher().Subscribe(value =>
          {
              Canvas.SetLeft(image, Canvas.GetLeft(image) + value.X);
              Canvas.SetTop(image, Canvas.GetTop(image) + value.Y);
          });
}

This code might seem a lit­tle con­fus­ing, but just imag­ine how much code you’d have to write oth­er­wise to imple­ment Drag-and-Drop fea­ture using the stan­dard C# events and event han­dlers, and how messy and clut­tered that code will will look if you did! So there’s def­i­nite­ly ben­e­fits to be had here, and there­fore well worth the time and effort to learn more about the Rx :-)

Back to the exam­ple, let’s look a clos­er look at some of the meth­ods used here to help you get a sense of what’s going on.

IObservable<T>.Subscribe

If you have an observ­able col­lec­tion, xs:

IObservable<int> xs = …;

You can sub­scribe to this col­lec­tion with an IOb­serv­er, or alter­na­tive­ly you can use a han­dler as we did in the Drag-and-Drop exam­ple. You can actu­al­ly pro­vide three dif­fer­ent han­dlers to an IObservable<T> col­lec­tion:

  • Action<T> val­ue­Han­dler, which gets called when a new val­ue is received on the col­lec­tion.
  • Action<Exception> excep­tion­Han­dler, which gets called when an excep­tion is caught dur­ing the iter­a­tion, if none is pro­vid­ed, then the orig­i­nal excep­tion is sim­ply re-thrown.
  • Action com­ple­tion­Han­dler, which gets called when the iter­a­tion is fin­ished (when mouse­Up is received in the exam­ple), by default this does noth­ing.

In the exam­ple, q was the IOb­serv­able col­lec­tion of an anony­mous type we cre­at­ed to hold the changes in the X and Y posi­tions between adja­cent mouse­Move events between a mouse­Down and mouse­Up event. WPF stip­u­lates that changes to ele­ments must be made on the own­ing thread, and the way to ensure our han­dlers run on the own­ing thread is to first call the ObserveOnDis­patch­er exten­sion method so observers are asyn­chro­nous­ly noti­fied using the cur­rent dis­patch­er.

You should note that the Sub­scribe exten­sion method actu­al­ly returns an IDis­pos­able. If you wish to unsub­scribe from the observ­able col­lec­tion, you need to call Dis­pose() on this IDis­pos­able:

var s = q.ObserveOnDispatcher().Subscribe(value =>
            {
                Canvas.SetLeft(image, Canvas.GetLeft(image) + value.X);
                Canvas.SetTop(image, Canvas.GetTop(image) + value.Y);
            });
// to unsubscribe
s.Dispose();

And final­ly, in this video, Wes Dyer out­lined the four phas­es of exe­cu­tion in sub­scrip­tion:

1. Build observ­able query

var xs = ys.Select(f);

2. Sub­scribe

var s = xs.Subscribe(h);

3. Val­ue

h();

4. Unsub­scribe

s.Dispose();

IObservable<T>.Zip

The IObservable<T>.Zip exten­sion method you see in the exam­ple above takes two streams and a func­tion and returns a new stream. So let Xs and Ys be two streams of events which need to hap­pen in pairs we can com­bine them into a new stream of events, Zs, which tells us when both Xs and its cor­re­spond­ing Ys has hap­pened:

Zs = Xs.Zip(Ys, (a, b) -> a + b)

image

You should note that the order in which Xs and Ys hap­pen does not mat­ter in this case.

So back to the drag-and-drop exam­ple above, we used the Zip exten­sion method to com­bine two streams:

- mm, which trans­lates to mouseMove.StartWith(start).TakeUntil(mouseUp)

- mm.Skip(1), which trans­lates to mouseMove.StartWith(start).TakeUntil(mouseUp).Skip(1)

and returns a new stream of val­ues:

var q = from start in mouseDown
        from pos in mouseMove.StartWith(start).TakeUntil(mouseUp)
                     .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                          new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
        select pos;

References:

Rx podcasts on Channel9