Reactive Extensions for Javascript – Observable vs ConnectableObservable

Yan Cui

I help clients go faster for less using serverless technologies.

This article is brought to you by

Don’t reinvent the patterns. Catalyst gives you consistent APIs for messaging, data, and workflow with key microservice patterns like circuit-breakers and retries for free.

Try the Catalyst beta

For those of you who are familiar with Reactive Extensions you should know all about observables already, but did you know that there’s another kind of observable sequence – Rx.ConnectableObservable.

The difference between the two types of observable sequences is well explained here, in short, a connectable observable sequence allows you to share the same source sequence of values with multiple subscribers whilst the normal observable sequence gives each subscriber its own sequence of values. Whilst in most cases this difference doesn’t have any practical impacts as each subscribers are given the same values in the same order, however, consider this observable sequence of random numbers between 0 and 1000:

   1: var maxNumber = 1000;

   2: var observableSource = Rx.Observable.GenerateWithTime(

   3:     Math.random(),                                      // initial state

   4:     function (x) { return true; },                      // condition

   5:     function (x) { return Math.random(); },             // iterator     

   6:     function (x) { return parseInt(x * maxNumber); },   // select

   7:     function (x) { return 1000 });                      // interval

As you can see, each time the iterator is invoked it’ll generate a different value, hence subscribers will receive a different value each time (see demo below):

   1: // first subscriber

   2: observableSource.Subscribe(function (n) {

   3:     sub1Span.html(n);

   4: });

   5:  

   6: // second subscriber

   7: observableSource.Subscribe(function (n) {

   8:     sub2Span.html(n);

   9: });

Instead, if you want to ensure that all the subscribers receive the same values, your best bet is to ‘publish‘ the source:

   1: // create a connectable observable from the source

   2: var connectableObservable = observableSource.Publish();

which returns you a connectable observable that you can then attach subscribers to:

   1: // connected subscriber 1

   2: connectableObservable.Subscribe(function (n) {

   3:     connSub1Span.html(n);

   4: });

   5:  

   6: // connected subscriber 2

   7: connectableObservable.Subscribe(function (n) {

   8:     connSub2Span.html(n);

   9: });

and once you ‘connect‘ to the underlying source, the subscribers will start receiving values from the stream:

   1: connectableObservable.Connect();

Demo

Whenever you’re ready, here are 3 ways I can help you:

  1. Production-Ready Serverless: Join 20+ AWS Heroes & Community Builders and 1000+ other students in levelling up your serverless game. This is your one-stop shop for quickly levelling up your serverless skills.
  2. I help clients launch product ideas, improve their development processes and upskill their teams. If you’d like to work together, then let’s get in touch.
  3. Join my community on Discord, ask questions, and join the discussion on all things AWS and Serverless.

Leave a Comment

Your email address will not be published. Required fields are marked *