Reactive Extensions for Javascript – Multiple observers for multiple observables

One of the great things about the Reac­tive Exten­sions for Javascript is that you can eas­i­ly cre­ate a many-to-many rela­tion­ship between observ­able sequences of val­ues and observers who han­dles the arrival of new val­ues.

You should have a read of these cou­ple of posts first:

To take it a step fur­ther from the brief code snip­pets shown in the blog posts by Matthew Pod­wysoc­ki above, here’s a quick demo on how you can cre­ate mul­ti­ple observ­able sequences of val­ues and have mul­ti­ple observers observe them.

So first, a sim­ple set of HTML to cre­ate a cou­ple of spans:

   1: <body>

   2:     <div id="wrapper">

   3:         <span id="message" class="block"></span>

   4:         <span id="error" class="block"></span>

   5:         <span id="complete" class="block"></span>

   6:  

   7:         <span id="message-2" class="block"></span>

   8:     </div>

   9: </body>

And the Javascript:

   1: <script type="text/javascript" src="js/jquery/jquery-1.4.4.min.js"></script>

   2: <script type="text/javascript" src="js/rxjs/rx.js"></script>

   3: <script type="text/javascript" src="js/rxjs/rx.jQuery.js"></script>

   4:  

   5: <script type="text/javascript">

   6:     $(function () {

   7:         var wrapperElement = $("#wrapper"),

   8:             messageElement = $("#message"),

   9:             messageElement2 = $("#message-2"),

  10:             errorElement = $("#error"),

  11:             completeEelement = $("#complete");

  12:  

  13:         /********** 1ST OBSERVER (EXPLICIT) ************/

  14:         var observer = Rx.Observer.Create(

  15:             function (next) { // when next value is fired

  16:                 messageElement.html("Next: " + next);

  17:             },

  18:             function (err) { // on error

  19:                 errorElement.html("Error: " + err);

  20:             },

  21:             function () { // on complete

  22:                 completeEelement.html("Complete!");

  23:             });

  24:  

  25:         // an observable that starts from 0 and increments by 1 per second

  26:         var counter = Rx.Observable.GenerateWithTime(

  27:             0,                                  // initial state

  28:             function (x) { return true; },      // condition

  29:             function (x) { return x + 1; },     // iterator

  30:             function (x) { return x },          // select

  31:             function (x) { return x === 0 ? 0 : 1000 }); // time (1s) between each value

  32:  

  33:         // an observable sequence which just throws an exception

  34:         var except = Rx.Observable.Throw("FAIL!");

  35:  

  36:         // an observable sequence which completes straight away

  37:         var complete = Rx.Observable.Return();

  38:  

  39:         // get reference to the disposable objects to unsubscribe later

  40:         var counterSub = counter.Subscribe(observer);

  41:         var exceptSub = except.Subscribe(observer);

  42:         var completeSub = complete.Subscribe(observer);

  43:  

  44:         /********** 2ND OBSERVER (IMPLICIT) ************/

  45:         var counterSub2 = counter.Subscribe(function (next) {

  46:             messageElement2.html(next);

  47:         });

  48:  

  49:         // unsubscribe the first observer's counter subscription after 10 seconds

  50:         setTimeout(function () { counterSub.Dispose() }, 10000);

  51:     });

  52: </script>

The key things to note here are the two ways you can cre­ate an observ­er, either explic­it­ly using the Rx.Observer.Create func­tion or cre­at­ing one dynam­i­cal­ly using one of the over­loaded Sub­scribe func­tions on an observ­able sequence.

When you call Sub­scribe on an observ­able, you get back a dis­pos­able object which can then be used to unsub­scribe an observ­er from an ongo­ing observ­able sequence (see line 50 in the code above and the demo below).

Demo