Linq Over Events – Playing with the Rx Framework

You can become a serverless blackbelt. Enrol to my 4-week online workshop Production-Ready Serverless and gain hands-on experience building something from scratch using serverless technologies. At the end of the workshop, you should have a broader view of the challenges you will face as your serverless architecture matures and expands. You should also have a firm grasp on when serverless is a good fit for your system as well as common pitfalls you need to avoid. Sign up now and get 15% discount with the code yanprs15!

As you may have read about on numerous MVP’s blogs already, there’s an ongoing research project at Microsoft’s DevLabs called Reactive Extensions for .Net (Rx) which introduces a push-style duality (equal and opposite) of the traditional pull-style enumeration using Linq.

Erik Meijer gave a 30 mins introduction to the reactive framework in this video, I highly recommend that you sit through this to help you understand the basics of how Rx works. In summary, Rx introduces two main interfaces:

IObservable<T>, which represents a push-style collection

IObserver<T>, which supports push-style iteration over an observable sequence

Used together, they allow you to write programs which reacts to inputs from the environment as opposed to the traditional interactive program which asks the environment for input.

image

Drag-and-Drop using Rx

For a quick demo to show how you can implement a Drag-and-Drop feature in WPF with just a few lines of code, see this video on Channel9. Unfortunately, since the video was released, the API has changed somewhat (Observable.Context is gone for example), so below is a revised version which works with the latest API.

To get started, download the Rx extension for .Net 3.5 SP1 and start a new WPF application in Visual Studio 2008, add references to System.CoreEx and System.Reactive. In Windows1.xaml, replace the default Grid with:

<Canvas>
    <TextBlock Name="counter" Text="Rx Test"/>
    <Image Name="image"
           Source=[put the path to an image here]
           Width="100" Height="100" Canvas.Left="0" Canvas.Top="25"/>
</Canvas>

and then in the code-behind, change the constructor to something like this:

public Window1()
{
    InitializeComponent();

    // create a stream which returns a new value every second, starting from -1
    var xs = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(-1);

    // subscribe to the above stream
    xs.ObserveOnDispatcher().Subscribe(value => counter.Text = value.ToString());

    // create event streams for mouse down/up/move using reflection
    var mouseDown = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown")
                    select evt.EventArgs.GetPosition(this);
    var mouseUp = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseUp")
                  select evt.EventArgs.GetPosition(this);
    var mouseMove = from evt in Observable.FromEvent<MouseEventArgs>(image, "MouseMove")
                    select evt.EventArgs.GetPosition(this);

    // between mouse down and mouse up events
    // keep taking pairs of mouse move events and return the change in X, Y positions
    // from one mouse move event to the next as a new stream
    var q = from start in mouseDown
            from pos in mouseMove.StartWith(start).TakeUntil(mouseUp)
                         .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                              new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
            select pos;

    // subscribe to the stream of position changes and modify the Canvas.Left and Canvas.Top
    // property of the image to achieve drag and drop effect!
    q.ObserveOnDispatcher().Subscribe(value =>
          {
              Canvas.SetLeft(image, Canvas.GetLeft(image) + value.X);
              Canvas.SetTop(image, Canvas.GetTop(image) + value.Y);
          });
}

This code might seem a little confusing, but just imagine how much code you’d have to write otherwise to implement Drag-and-Drop feature using the standard C# events and event handlers, and how messy and cluttered that code will will look if you did! So there’s definitely benefits to be had here, and therefore well worth the time and effort to learn more about the Rx :-)

Back to the example, let’s look a closer look at some of the methods used here to help you get a sense of what’s going on.

IObservable<T>.Subscribe

If you have an observable collection, xs:

IObservable<int> xs = …;

You can subscribe to this collection with an IObserver, or alternatively you can use a handler as we did in the Drag-and-Drop example. You can actually provide three different handlers to an IObservable<T> collection:

  • Action<T> valueHandler, which gets called when a new value is received on the collection.
  • Action<Exception> exceptionHandler, which gets called when an exception is caught during the iteration, if none is provided, then the original exception is simply re-thrown.
  • Action completionHandler, which gets called when the iteration is finished (when mouseUp is received in the example), by default this does nothing.

In the example, q was the IObservable collection of an anonymous type we created to hold the changes in the X and Y positions between adjacent mouseMove events between a mouseDown and mouseUp event. WPF stipulates that changes to elements must be made on the owning thread, and the way to ensure our handlers run on the owning thread is to first call the ObserveOnDispatcher extension method so observers are asynchronously notified using the current dispatcher.

You should note that the Subscribe extension method actually returns an IDisposable. If you wish to unsubscribe from the observable collection, you need to call Dispose() on this IDisposable:

var s = q.ObserveOnDispatcher().Subscribe(value =>
            {
                Canvas.SetLeft(image, Canvas.GetLeft(image) + value.X);
                Canvas.SetTop(image, Canvas.GetTop(image) + value.Y);
            });
// to unsubscribe
s.Dispose();

And finally, in this video, Wes Dyer outlined the four phases of execution in subscription:

1. Build observable query

var xs = ys.Select(f);

2. Subscribe

var s = xs.Subscribe(h);

3. Value

h();

4. Unsubscribe

s.Dispose();

IObservable<T>.Zip

The IObservable<T>.Zip extension method you see in the example above takes two streams and a function and returns a new stream. So let Xs and Ys be two streams of events which need to happen in pairs we can combine them into a new stream of events, Zs, which tells us when both Xs and its corresponding Ys has happened:

Zs = Xs.Zip(Ys, (a, b) -> a + b)

image

You should note that the order in which Xs and Ys happen does not matter in this case.

So back to the drag-and-drop example above, we used the Zip extension method to combine two streams:

mm, which translates to mouseMove.StartWith(start).TakeUntil(mouseUp)

mm.Skip(1), which translates to mouseMove.StartWith(start).TakeUntil(mouseUp).Skip(1)

and returns a new stream of values:

var q = from start in mouseDown
        from pos in mouseMove.StartWith(start).TakeUntil(mouseUp)
                     .Let(mm => mm.Zip(mm.Skip(1), (prev, cur) =>
                          new { X = cur.X - prev.X, Y = cur.Y - prev.Y }))
        select pos;

References:

Rx podcasts on Channel9
Liked this article? Support me on Patreon and get direct help from me via a private Slack channel or 1-2-1 mentoring.
Subscribe to my newsletter


Hi, I’m Yan. I’m an AWS Serverless Hero and I help companies go faster for less by adopting serverless technologies successfully.

Are you struggling with serverless or need guidance on best practices? Do you want someone to review your architecture and help you avoid costly mistakes down the line? Whatever the case, I’m here to help.

Hire me.


Skill up your serverless game with this hands-on workshop.

My 4-week Production-Ready Serverless online workshop is back!

This course takes you through building a production-ready serverless web application from testing, deployment, security, all the way through to observability. The motivation for this course is to give you hands-on experience building something with serverless technologies while giving you a broader view of the challenges you will face as the architecture matures and expands.

We will start at the basics and give you a firm introduction to Lambda and all the relevant concepts and service features (including the latest announcements in 2020). And then gradually ramping up and cover a wide array of topics such as API security, testing strategies, CI/CD, secret management, and operational best practices for monitoring and troubleshooting.

If you enrol now you can also get 15% OFF with the promo code “yanprs15”.

Enrol now and SAVE 15%.


Check out my new podcast Real-World Serverless where I talk with engineers who are building amazing things with serverless technologies and discuss the real-world use cases and challenges they face. If you’re interested in what people are actually doing with serverless and what it’s really like to be working with serverless day-to-day, then this is the podcast for you.


Check out my new course, Learn you some Lambda best practice for great good! In this course, you will learn best practices for working with AWS Lambda in terms of performance, cost, security, scalability, resilience and observability. We will also cover latest features from re:Invent 2019 such as Provisioned Concurrency and Lambda Destinations. Enrol now and start learning!


Check out my video course, Complete Guide to AWS Step Functions. In this course, we’ll cover everything you need to know to use AWS Step Functions service effectively. There is something for everyone from beginners to more advanced users looking for design patterns and best practices. Enrol now and start learning!