For those of you who are famil­iar with Reac­tive Exten­sions you should know all about observ­ables already, but did you know that there’s another kind of observ­able sequence – Rx.ConnectableObservable.

The dif­fer­ence between the two types of observ­able sequences is well explained here, in short, a con­nectable observ­able sequence allows you to share the same source sequence of val­ues with mul­ti­ple sub­scribers whilst the nor­mal observ­able sequence gives each sub­scriber its own sequence of val­ues. Whilst in most cases this dif­fer­ence doesn’t have any prac­ti­cal impacts as each sub­scribers are given the same val­ues in the same order, how­ever, con­sider this observ­able sequence of ran­dom num­bers between 0 and 1000:

   1: var maxNumber = 1000;

   2: var observableSource = Rx.Observable.GenerateWithTime(

   3:     Math.random(),                                      // initial state

   4:     function (x) { return true; },                      // condition

   5:     function (x) { return Math.random(); },             // iterator     

   6:     function (x) { return parseInt(x * maxNumber); },   // select

   7:     function (x) { return 1000 });                      // interval

As you can see, each time the iter­a­tor is invoked it’ll gen­er­ate a dif­fer­ent value, hence sub­scribers will receive a dif­fer­ent value each time (see demo below):

   1: // first subscriber

   2: observableSource.Subscribe(function (n) {

   3:     sub1Span.html(n);

   4: });

   5:  

   6: // second subscriber

   7: observableSource.Subscribe(function (n) {

   8:     sub2Span.html(n);

   9: });

Instead, if you want to ensure that all the sub­scribers receive the same val­ues, your best bet is to ‘pub­lish’ the source:

   1: // create a connectable observable from the source

   2: var connectableObservable = observableSource.Publish();

which returns you a con­nectable observ­able that you can then attach sub­scribers to:

   1: // connected subscriber 1

   2: connectableObservable.Subscribe(function (n) {

   3:     connSub1Span.html(n);

   4: });

   5:  

   6: // connected subscriber 2

   7: connectableObservable.Subscribe(function (n) {

   8:     connSub2Span.html(n);

   9: });

and once you ‘con­nect’ to the under­ly­ing source, the sub­scribers will start receiv­ing val­ues from the stream:

   1: connectableObservable.Connect();

Demo

Share

I wrote pre­vi­ously about how you can set up mul­ti­ple observ­able sequences and sub­scribe to them with mul­ti­ple observers and cre­ate a many-to-many rela­tion­ship between them.

Whilst this is a very flex­i­ble model with a clear sep­a­ra­tion of respon­si­bil­i­ties, often it requires more work to set up and is more than what you need for the task at hand. For instance, if you’re receiv­ing a steady stream of inputs and want to log the arrival of new inputs as well as per­form­ing some aggre­ga­tion on them, you don’t nec­es­sar­ily have to cre­ate two sub­scribers for the input but instead make use of the Rx.Observable.Do function.

Like the Rx.Observable.Subscribe func­tion, the Do func­tion can take a sub­scriber, or two up to three func­tion objects to han­dle the onNext, onError and onCom­pleted events, in that order. Unlike the Rx.Observable.Select func­tion, it doesn’t have a return value and there­fore won’t allow you to trans­form the input stream, it’s intended purely for caus­ing side effects.

I’ve put together a quick demo (see below) to illus­trate the use of the Do func­tion in con­junc­tion with other com­mon RxJS func­tions such as Select and Where. For this demo we just need two <span> ele­ments, one to show the run­ning total, the other to show a log mes­sage every time a new value is received:

   1: <body>

   2:     <div id="wrapper">

   3:         <p>Sum of squares of odd numbers received : <span id="sum"></span></p>

   4:         <p><span id="log"></span></p>

   5:     </div>

   6: </body>

And the Javascript to go along with it:

   1: <script type="text/javascript" src="js/jquery/jquery-1.4.4.min.js"></script>

   2: <script type="text/javascript" src="js/rxjs/rx.js"></script>

   3:  

   4: <script type="text/javascript">

   5:     $(function () {

   6:         var logSpan = $("#log"), sumSpan = $("#sum");

   7:  

   8:         // create an observable which gives ten numbers in total at 1 second 

   9:         // interval with a 13% chance of exception being thrown at each interval

  10:         var observable = Rx.Observable.GenerateWithTime(

  11:             1,                                  // initial state

  12:             function (x) { return x <= 10; },   // condition

  13:             function (x) {                      // iterator

  14:                 var prob = Math.random();

  15:                 if (prob < 0.13) {

  16:                     throw "Better luck next time!";

  17:                 }

  18:  

  19:                 return x + 1;

  20:             },

  21:             function (x) { return x; },         // select

  22:             function (x) {                      // interval

  23:                 return x === 0 ? 0 : 1000

  24:             });

  25:  

  26:         var sum = 0;

  27:  

  28:         observable.Do(function (n) {    // onNext

  29:             logSpan.html("Received new input: " + n);

  30:         }, function (err) {             // onError

  31:             logSpan.html("Error: " + err);

  32:         }, function () {               // onCompleted

  33:             logSpan.html("No more inputs");

  34:         }).Where(function (n) {         // filter the input sequence

  35:             return n % 2 != 0;          // odd numbers only

  36:         }).Select(function (n) {        // transform the input sequence

  37:             return n * n;

  38:         }).Subscribe(function (n) {

  39:             sum += n;                   // add the new input to the running total

  40:             sumSpan.html(sum);          // show the new running total

  41:         });

  42:     });

  43: </script>

Cou­ple of things to note here:

  • line 13 – this is a delib­er­ate attempt to give the observ­able sequence a ran­dom chance of except­ing to invoke the onError han­dler spec­i­fied in the Do func­tion on line 30
  • line 28 – the Do func­tion updates the HTML con­tent in the log <span> ele­ment every time it receives a new value
  • line 32 – show a dif­fer­ent log mes­sage when we have exhausted the observ­able sequence
  • line 34 – apply a fil­ter on the observ­able sequence for all sub­se­quent functions
  • line 36 – only odd num­bers are fed to this handler

Demo

If you are lucky enough (or unlucky enough depend­ing on which sce­nario you’re try­ing to test) just refresh the page and try again and hope­fully you have bet­ter luck the sec­ond time around!

Share

Matthew Pod­wysocki posted a cou­ple of very good arti­cles on RxJS on codebetter.com and amongst them was a sim­ple demo to do a look up on wikipedia using their open API:

Intro­duc­tion to the Reac­tive Exten­sions for JavaScript – Wikipedia Lookup

Unfor­tu­nately, there wasn’t a live demo you can play around with and see it work, and since the arti­cle was posted things might have changed and doing cross-domain HTTP requests are no longer a straight­for­ward affair (at least not when it comes to Chrome and Fire­fox). If you had tried to piece together all the bits of code snip­pets from the arti­cle you might find that it only works in IE but not in Chrome or Firefox.

So I decided to put together a work­ing demo, and with that, let’s start with the HTML from the orig­i­nal example:

   1: <body>

   2:     <div id="wrapper">

   3:         <input id="search-input" size="100" type="text" placeholder="Enter Search Phrase" />

   4:         <ul id="results" />

   5:         <p id="error" />

   6:     </div>

   7: </body>

And the Javascript which had been slightly mod­i­fied from those shown in the article:

   1: <script type="text/javascript">

   2:     function searchWiki(term) {

   3:         // note the callback=? clause at the end, which is required to make 

   4:         // cross-domain request possible in chrome and firefox

   5:         var url = "http://en.wikipedia.org/w/api.php?action=opensearch&search=" + 

   6:                     term + "&format=json&callback=?";

   7:  

   8:         // note that I'm using getJSONAsObservable instead of XMLHttpRequest

   9:         return $.getJSONAsObservable(url)

  10:                 .Select(function (result) {

  11:                     if (result.data && result.data.length == 2) {

  12:                         return result.data[1];

  13:                     }

  14:                     else {

  15:                         return [];

  16:                     }

  17:                 });

  18:     }

  19:  

  20:     $(function () {

  21:         var searcher = $("#search-input");

  22:  

  23:         var searchTerms = 

  24:             searcher.toObservable("keyup")     // hook up the keyup event on the search box

  25:                     .Throttle(250)             // ignore values entered within 250ms of each other

  26:                     .Select(function () {      // return what's in the search box

  27:                         return searcher.val();

  28:                     });

  29:  

  30:         var searchResults = searchTerms.Select(function (term) { return searchWiki(term); }).Switch();

  31:  

  32:         searchResults.Subscribe(function (data) {

  33:             $("#results").empty();

  34:             $.each(data, function (_, value) {

  35:                 $("#results").append("<li>" + value + "</li>");

  36:             })

  37:         }, function (errr) {

  38:             $("#error").html(error);

  39:         });

  40:     });

  41: </script>

Key things to note here:

  • line 6, at the end of the url I added &call­back=? which is the stan­dard way to spec­ify a JSONP call­back with jQuery’s getJ­SON function.
  • line 9, I used getJ­SONAsOb­serv­able, which is the same as getJ­SON but returns the val­ues as an observ­able sequence

Demo

Share

One of the great things about the Reac­tive Exten­sions for Javascript is that you can eas­ily cre­ate a many-to-many rela­tion­ship between observ­able sequences of val­ues and observers who han­dles the arrival of new values.

You should have a read of these cou­ple of posts first:

To take it a step fur­ther from the brief code snip­pets shown in the blog posts by Matthew Pod­wysocki above, here’s a quick demo on how you can cre­ate mul­ti­ple observ­able sequences of val­ues and have mul­ti­ple observers observe them.

So first, a sim­ple set of HTML to cre­ate a cou­ple of spans:

   1: <body>

   2:     <div id="wrapper">

   3:         <span id="message" class="block"></span>

   4:         <span id="error" class="block"></span>

   5:         <span id="complete" class="block"></span>

   6:  

   7:         <span id="message-2" class="block"></span>

   8:     </div>

   9: </body>

And the Javascript:

   1: <script type="text/javascript" src="js/jquery/jquery-1.4.4.min.js"></script>

   2: <script type="text/javascript" src="js/rxjs/rx.js"></script>

   3: <script type="text/javascript" src="js/rxjs/rx.jQuery.js"></script>

   4:  

   5: <script type="text/javascript">

   6:     $(function () {

   7:         var wrapperElement = $("#wrapper"),

   8:             messageElement = $("#message"),

   9:             messageElement2 = $("#message-2"),

  10:             errorElement = $("#error"),

  11:             completeEelement = $("#complete");

  12:  

  13:         /********** 1ST OBSERVER (EXPLICIT) ************/

  14:         var observer = Rx.Observer.Create(

  15:             function (next) { // when next value is fired

  16:                 messageElement.html("Next: " + next);

  17:             },

  18:             function (err) { // on error

  19:                 errorElement.html("Error: " + err);

  20:             },

  21:             function () { // on complete

  22:                 completeEelement.html("Complete!");

  23:             });

  24:  

  25:         // an observable that starts from 0 and increments by 1 per second

  26:         var counter = Rx.Observable.GenerateWithTime(

  27:             0,                                  // initial state

  28:             function (x) { return true; },      // condition

  29:             function (x) { return x + 1; },     // iterator

  30:             function (x) { return x },          // select

  31:             function (x) { return x === 0 ? 0 : 1000 }); // time (1s) between each value

  32:  

  33:         // an observable sequence which just throws an exception

  34:         var except = Rx.Observable.Throw("FAIL!");

  35:  

  36:         // an observable sequence which completes straight away

  37:         var complete = Rx.Observable.Return();

  38:  

  39:         // get reference to the disposable objects to unsubscribe later

  40:         var counterSub = counter.Subscribe(observer);

  41:         var exceptSub = except.Subscribe(observer);

  42:         var completeSub = complete.Subscribe(observer);

  43:  

  44:         /********** 2ND OBSERVER (IMPLICIT) ************/

  45:         var counterSub2 = counter.Subscribe(function (next) {

  46:             messageElement2.html(next);

  47:         });

  48:  

  49:         // unsubscribe the first observer's counter subscription after 10 seconds

  50:         setTimeout(function () { counterSub.Dispose() }, 10000);

  51:     });

  52: </script>

The key things to note here are the two ways you can cre­ate an observer, either explic­itly using the Rx.Observer.Create func­tion or cre­at­ing one dynam­i­cally using one of the over­loaded Sub­scribe func­tions on an observ­able sequence.

When you call Sub­scribe on an observ­able, you get back a dis­pos­able object which can then be used to unsub­scribe an observer from an ongo­ing observ­able sequence (see line 50 in the code above and the demo below).

Demo

Share

Been a while since I’ve been mean­ing to check out Reac­tive Exten­sions for Javascript, I have been using the .Net ver­sion for a while now and blogged about some of the cool things you can do with the .Net version.

Although there are many sources where you can learn about it from they don’t have a struc­tured guide to help you get started and some of the info/demo I found were pretty out-dated and not in line with the lat­est API. Among the arti­cles I’ve read so far I found the series of arti­cles by Matthew Pod­wysocki on codebetter.com to be most use­ful and def­i­nitely worth read­ing through if you want get up to speed with RXJS.

Any­ways, back to the point of the post, I just wanted to show you how to achieve a sim­ple drag and drop effect using RX, with sup­port for clip­ping so you can’t drag the out­side the par­ent container.

Here’s the basic HTML for the demo and with some sim­ple CSS how it looks:

   1: <body>

   2:     <div id="wrapper">

   3:         <div id="draggable"></div>

   4:     </div>

   5: </body>

image

The javascript code for the demo is simple:

   1: <script type="text/javascript" src="js/jquery/jquery-1.4.4.min.js"></script>

   2: <script type="text/javascript" src="js/rxjs/rx.js"></script>

   3: <script type="text/javascript" src="js/rxjs/rx.jQuery.js"></script>

   4:  

   5: <script type="text/javascript">

   6:     $(function () {

   7:         var _doc = $(document), _draggable = $("#draggable"), _wrapper = $("#wrapper");

   8:  

   9:         // boundary so that the draggable content can't move outside of the its container

  10:         var _parent = _draggable.parent(),

  11:             _parentOffset = _parent.offset(),

  12:             _minLeft = _parentOffset.left,

  13:             _minTop = _parentOffset.top,

  14:             _maxLeft = _minLeft + _parent.width() - _draggable.outerWidth(),

  15:             _maxTop = _minTop + _parent.height() - _draggable.outerHeight();

  16:  

  17:         // get the stream of events from the mousedown, mousemove and mouseup events

  18:         var mouseDown = _draggable.toObservable("mousedown"),

  19:             mouseMove = _doc.toObservable("mousemove"),

  20:             mouseUp = _doc.toObservable("mouseup");

  21:  

  22:         // get the changes in the X and Y direction as the mouse moves

  23:         var mouseMoves = mouseMove.Skip(1).Zip(mouseMove, function (left, right) {

  24:             return {

  25:                 xChange: left.clientX - right.clientX,

  26:                 yChange: left.clientY - right.clientY

  27:             };

  28:         });

  29:  

  30:         // for each mouse down event, get all the subsequent changes in the clientX and

  31:         // clientY values resulting from the mouse move events until mouse up event occurs

  32:         var mouseDrags = mouseDown.SelectMany(function (md) {

  33:             return mouseMoves.TakeUntil(mouseUp);

  34:         });

  35:  

  36:         mouseDrags.Subscribe(function (mouseEvent) {

  37:             var oldOffset = _draggable.offset();

  38:  

  39:             // change the left and top

  40:             _draggable.css({

  41:                 left: Math.min(Math.max(oldOffset.left + mouseEvent.xChange, _minLeft), _maxLeft),

  42:                 top: Math.min(Math.max(oldOffset.top + mouseEvent.yChange, _minTop), _maxTop)

  43:             });

  44:         });

  45:     });

  46: </script>

As you can see, this exam­ple uses jQuery and included in the RXJS pack­age is a rx.jQuery.js file which con­tains a num­ber of exten­sion meth­ods to use with jQuery such as the toOb­serv­able func­tion which returns a stream of observ­able val­ues. If this sounds unfa­mil­iar to you I strongly rec­om­mend you to read through the first cou­ple of arti­cles by Matthew Pod­wysocki I linked above.

On line 23, I’m cre­ated a com­pos­ite event stream by pair­ing mouse move events with their sub­se­quent mouse move event to work out the change in the X and Y direc­tion. To visu­alise the two event streams and the resul­tant mouse­Moves stream:

image

Then on line 36, I’ve sub­scribed an event han­dler for val­ues pushed down to us through the mouse­Moves stream. This event han­dler applies the X and Y changes to the <div> element’s left and top off­set but caps them so that it can only be moved inside its container.

Demo

Part­ing thoughts

I used jQuery to demon­strate RXJS’s exten­sion func­tions for jQuery but you don’t need jQuery in order to use RXJS, in the rx.html.js script file there is a very use­ful Rx.Observable.FromHTMLEvent func­tion which lets you get an observ­able stream of events from any HTML events includ­ing IE-specific events such as ‘oncut’.

Share