Reactive Extensions for Javascript – Causing side effects with Do

I wrote pre­vi­ous­ly about how you can set up mul­ti­ple observ­able sequences and sub­scribe to them with mul­ti­ple observers and cre­ate a many-to-many rela­tion­ship between them.

Whilst this is a very flex­i­ble mod­el with a clear sep­a­ra­tion of respon­si­bil­i­ties, often it requires more work to set up and is more than what you need for the task at hand. For instance, if you’re receiv­ing a steady stream of inputs and want to log the arrival of new inputs as well as per­form­ing some aggre­ga­tion on them, you don’t nec­es­sar­i­ly have to cre­ate two sub­scribers for the input but instead make use of the Rx.Observable.Do func­tion.

Like the Rx.Observable.Subscribe func­tion, the Do func­tion can take a sub­scriber, or two up to three func­tion objects to han­dle the onNext, onError and onCom­plet­ed events, in that order. Unlike the Rx.Observable.Select func­tion, it doesn’t have a return val­ue and there­fore won’t allow you to trans­form the input stream, it’s intend­ed pure­ly for caus­ing side effects.

I’ve put togeth­er a quick demo (see below) to illus­trate the use of the Do func­tion in con­junc­tion with oth­er com­mon RxJS func­tions such as Select and Where. For this demo we just need two <span> ele­ments, one to show the run­ning total, the oth­er to show a log mes­sage every time a new val­ue is received:

   1: <body>

   2:     <div id="wrapper">

   3:         <p>Sum of squares of odd numbers received : <span id="sum"></span></p>

   4:         <p><span id="log"></span></p>

   5:     </div>

   6: </body>

And the Javascript to go along with it:

   1: <script type="text/javascript" src="js/jquery/jquery-1.4.4.min.js"></script>

   2: <script type="text/javascript" src="js/rxjs/rx.js"></script>

   3:  

   4: <script type="text/javascript">

   5:     $(function () {

   6:         var logSpan = $("#log"), sumSpan = $("#sum");

   7:  

   8:         // create an observable which gives ten numbers in total at 1 second 

   9:         // interval with a 13% chance of exception being thrown at each interval

  10:         var observable = Rx.Observable.GenerateWithTime(

  11:             1,                                  // initial state

  12:             function (x) { return x <= 10; },   // condition

  13:             function (x) {                      // iterator

  14:                 var prob = Math.random();

  15:                 if (prob < 0.13) {

  16:                     throw "Better luck next time!";

  17:                 }

  18:  

  19:                 return x + 1;

  20:             },

  21:             function (x) { return x; },         // select

  22:             function (x) {                      // interval

  23:                 return x === 0 ? 0 : 1000

  24:             });

  25:  

  26:         var sum = 0;

  27:  

  28:         observable.Do(function (n) {    // onNext

  29:             logSpan.html("Received new input: " + n);

  30:         }, function (err) {             // onError

  31:             logSpan.html("Error: " + err);

  32:         }, function () {               // onCompleted

  33:             logSpan.html("No more inputs");

  34:         }).Where(function (n) {         // filter the input sequence

  35:             return n % 2 != 0;          // odd numbers only

  36:         }).Select(function (n) {        // transform the input sequence

  37:             return n * n;

  38:         }).Subscribe(function (n) {

  39:             sum += n;                   // add the new input to the running total

  40:             sumSpan.html(sum);          // show the new running total

  41:         });

  42:     });

  43: </script>

Cou­ple of things to note here:

  • line 13 – this is a delib­er­ate attempt to give the observ­able sequence a ran­dom chance of except­ing to invoke the onError han­dler spec­i­fied in the Do func­tion on line 30
  • line 28 – the Do func­tion updates the HTML con­tent in the log <span> ele­ment every time it receives a new val­ue
  • line 32 – show a dif­fer­ent log mes­sage when we have exhaust­ed the observ­able sequence
  • line 34 – apply a fil­ter on the observ­able sequence for all sub­se­quent func­tions
  • line 36 – only odd num­bers are fed to this han­dler

Demo

If you are lucky enough (or unlucky enough depend­ing on which sce­nario you’re try­ing to test) just refresh the page and try again and hope­ful­ly you have bet­ter luck the sec­ond time around!