Linq Over Events – Playing with the Rx Framework

Yan Cui

I help clients go faster for less using serverless technologies.

This article is brought to you by

Step Functions, EventBridge, MSK, DynamoDB…stop hacking together AWS services and get back to building!

See how it works

As you may have read about on numerous MVP’s blogs already, there’s an ongoing research project at Microsoft’s DevLabs called Reactive Extensions for .Net (Rx) which introduces a push-style duality (equal and opposite) of the traditional pull-style enumeration using Linq.

Erik Meijer gave a 30 mins introduction to the reactive framework in this video, I highly recommend that you sit through this to help you understand the basics of how Rx works. In summary, Rx introduces two main interfaces:

IObservable<T>, which represents a push-style collection

IObserver<T>, which supports push-style iteration over an observable sequence

Used together, they allow you to write programs which reacts to inputs from the environment as opposed to the traditional interactive program which asks the environment for input.

image

Drag-and-Drop using Rx

For a quick demo to show how you can implement a Drag-and-Drop feature in WPF with just a few lines of code, see this video on Channel9. Unfortunately, since the video was released, the API has changed somewhat (Observable.Context is gone for example), so below is a revised version which works with the latest API.

To get started, download the Rx extension for .Net 3.5 SP1 and start a new WPF application in Visual Studio 2008, add references to System.CoreEx and System.Reactive. In Windows1.xaml, replace the default Grid with:

<Canvas>
    <TextBlock Name="counter" Text="Rx Test"/>
    <Image Name="image"
           Source=[put the path to an image here]
           Width="100" Height="100" Canvas.Left="0" Canvas.Top="25"/>
</Canvas>

and then in the code-behind, change the constructor to something like this:

public Window1()
{
    InitializeComponent();

    // create a stream which returns a new value every second, starting from -1
    var xs = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(-1);

    // subscribe to the above stream
    xs.ObserveOnDispatcher().Subscribe(value => counter.Text = value.ToString());

    // create event streams for mouse down/up/move using reflection
    var mouseDown = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown")
                    select evt.EventArgs.GetPosition(this);
    var mouseUp = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseUp")
                  select evt.EventArgs.GetPosition(this);
    var mouseMove = from evt in Observable.FromEvent<MouseEventArgs>(image, "MouseMove")
                    select evt.EventArgs.GetPosition(this);

    // between mouse down and mouse up events
    // keep taking pairs of mouse move events and return the change in X, Y positions
    // from one mouse move event to the next as a new stream
    var q = from start in mouseDown
            from pos in mouseMove.StartWith(start).TakeUntil(mouseUp)
                         .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                              new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
            select pos;

    // subscribe to the stream of position changes and modify the Canvas.Left and Canvas.Top
    // property of the image to achieve drag and drop effect!
    q.ObserveOnDispatcher().Subscribe(value =>
          {
              Canvas.SetLeft(image, Canvas.GetLeft(image) + value.X);
              Canvas.SetTop(image, Canvas.GetTop(image) + value.Y);
          });
}

This code might seem a little confusing, but just imagine how much code you’d have to write otherwise to implement Drag-and-Drop feature using the standard C# events and event handlers, and how messy and cluttered that code will will look if you did! So there’s definitely benefits to be had here, and therefore well worth the time and effort to learn more about the Rx :-)

Back to the example, let’s look a closer look at some of the methods used here to help you get a sense of what’s going on.

IObservable<T>.Subscribe

If you have an observable collection, xs:

IObservable<int> xs = …;

You can subscribe to this collection with an IObserver, or alternatively you can use a handler as we did in the Drag-and-Drop example. You can actually provide three different handlers to an IObservable<T> collection:

  • Action<T> valueHandler, which gets called when a new value is received on the collection.
  • Action<Exception> exceptionHandler, which gets called when an exception is caught during the iteration, if none is provided, then the original exception is simply re-thrown.
  • Action completionHandler, which gets called when the iteration is finished (when mouseUp is received in the example), by default this does nothing.

In the example, q was the IObservable collection of an anonymous type we created to hold the changes in the X and Y positions between adjacent mouseMove events between a mouseDown and mouseUp event. WPF stipulates that changes to elements must be made on the owning thread, and the way to ensure our handlers run on the owning thread is to first call the ObserveOnDispatcher extension method so observers are asynchronously notified using the current dispatcher.

You should note that the Subscribe extension method actually returns an IDisposable. If you wish to unsubscribe from the observable collection, you need to call Dispose() on this IDisposable:

var s = q.ObserveOnDispatcher().Subscribe(value =>
            {
                Canvas.SetLeft(image, Canvas.GetLeft(image) + value.X);
                Canvas.SetTop(image, Canvas.GetTop(image) + value.Y);
            });
// to unsubscribe
s.Dispose();

And finally, in this video, Wes Dyer outlined the four phases of execution in subscription:

1. Build observable query

var xs = ys.Select(f);

2. Subscribe

var s = xs.Subscribe(h);

3. Value

h();

4. Unsubscribe

s.Dispose();

IObservable<T>.Zip

The IObservable<T>.Zip extension method you see in the example above takes two streams and a function and returns a new stream. So let Xs and Ys be two streams of events which need to happen in pairs we can combine them into a new stream of events, Zs, which tells us when both Xs and its corresponding Ys has happened:

Zs = Xs.Zip(Ys, (a, b) -> a + b)

image

You should note that the order in which Xs and Ys happen does not matter in this case.

So back to the drag-and-drop example above, we used the Zip extension method to combine two streams:

mm, which translates to mouseMove.StartWith(start).TakeUntil(mouseUp)

mm.Skip(1), which translates to mouseMove.StartWith(start).TakeUntil(mouseUp).Skip(1)

and returns a new stream of values:

var q = from start in mouseDown
        from pos in mouseMove.StartWith(start).TakeUntil(mouseUp)
                     .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                          new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
        select pos;

References:

Rx podcasts on Channel9

Whenever you’re ready, here are 3 ways I can help you:

  1. Production-Ready Serverless: Join 20+ AWS Heroes & Community Builders and 1000+ other students in levelling up your serverless game. This is your one-stop shop for quickly levelling up your serverless skills.
  2. I help clients launch product ideas, improve their development processes and upskill their teams. If you’d like to work together, then let’s get in touch.
  3. Join my community on Discord, ask questions, and join the discussion on all things AWS and Serverless.

4 thoughts on “Linq Over Events – Playing with the Rx Framework”

  1. Pingback: Rx framework — IObservable<T>.Merge | theburningmonk.com

  2. Pingback: Rx framework — IObservable<T>.CombineLatest | theburningmonk.com

  3. Pingback: Rx framework — IObservable<T>.Repeat | theburningmonk.com

  4. Pingback: Network Security Access Restrictions in Silverlight and Embedded Jetty « Tales from a Trading Desk

Leave a Comment

Your email address will not be published. Required fields are marked *