My adventure with Elm @ NCrafts

Yan Cui - MY ADVENTURE WITH ELM from NCRAFTS Conferences on Vimeo.

with accompanying slides here:

stream_ext – version 0.3.0 is out

I have just published version 0.3.0 of stream_ext, my attempt to port the Rx APIs to Dart. In this version I have added a number of additional methods to the existing set of:

 

amb

StreamExt.amb has the following signature:

Stream amb(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })

this method propagates values from the stream that reacts first with a value.

This method will ignore any errors received from either stream until the first value is received. The stream which reacts first with a value will have its values and errors propagated through the output stream.

The output stream will complete if:

  • neither stream produced a value before completing
  • the propagated stream has completed
  • the closeOnError flag is set to true and an error is received in the propagated stream

 

Marble Diagram

 

Live demo here.

More information on the Observable.amb extension method from Rx API here.

 

log

StreamExt.log has the following signature:

void log(Stream input, [ String prefix, void log(Object msg) ])

this helper method provide an easy way to log when new values and errors are received and when the stream is done.

You can see example usage here.

 

onErrorResumeNext

StreamExt.onErrorResumeNext has the following signature:

Stream onErrorResumeNext(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })

this method allows the continuation of a stream with another regardless of whether the first stream completes gracefully or due to an error.

The output stream will complete if:

  • both input streams have completed (if stream 2 completes before stream 1 then the output stream is completed when stream 1 completes)
  • closeOnError flag is set to true and an error is received in the continuation stream

 

Marble Diagram

 

Live demo here.

More information on the Observable.onErrorResumeNext extension method from the Rx API here.

 

switchFrom

StreamExt.switchFrom has the following signature:

Stream switchFrom(Stream<Stream> inputs, { bool closeOnError : false, bool sync : false })

this method transforms a stream of streams into a stream producing values only from the most recent stream.

The output stream will complete if:

  • the input stream has completed and the last stream has completed
  • closeOnError flag is set to true and an error is received in the active stream

 

Marble Diagram

 

Live demo here.

More information on the Observable.Switch extension method from the Rx API here.

 

timeOut

StreamExt.timeOut has the following signature:

Stream timeOut(Stream input, Duration duration, { bool closeOnError : false, bool sync : false })

this method allows you to terminate a stream with a TimeoutError if the specified duration between values elapsed.

The output stream will complete if:

  • the input stream has completed
  • the specified duration between input values has elapsed
  • closeOnError flag is set to true and an error is received

 

Marble Diagram

 

Live demo here.

More information on the Observable.Timeout extension method from the Rx API here.

 

timeOutAt

StreamExt.timeOutAt has the following signature:

Stream timeOutAt(Stream input, DateTime dueTime, { bool closeOnError : false, bool sync : false })

this method allows you to terminate a stream with a TimeoutError at the specified dueTime.

The output stream will complete if:

  • the input stream has completed
  • the specified dueTime has elapsed
  • closeOnError flag is set to true and an error is received

 

Marble Diagram

 

Live demo here.

 

Links

Source Code

API Ref­er­ence

Get­ting Started guide

Wiki (with live demo for each method)

Intro to Rx

stream_ext – version 0.2.0 is out

Lately I’ve been making steady progress in porting over Rx APIs over to Dart with stream_ext, and with the release of version 0.2.0 a few more Rx methods have been added to the existing set of buffer, combineLatest, delay, max, merge, min, scan, sum, throttle, window and zip.

 

average

StreamExt.average has the following signature:

Future average(Stream input, { num map (dynamic elem), bool closeOnError : false, bool sync : false })

this method returns the average of the input values as a Future which completes when the input stream is done.

This method uses the supplied map function to convert each input value into a num. If a map function is not specified then the identity function is used instead.

If closeOnError flag is set to true, then any error in the map function or from the input stream will complete the Future with the error. Otherwise, any errors will be swallowed and excluded from the final average.

 

Live demo here.

 

concat

StreamExt.concat has the following signature:

Stream concat(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })

this method concatenates two streams together, when the first stream completes the second stream is subscribed to. Until the first stream is done any values and errors from the second stream is ignored.

The concatenated stream will complete if:

  • both input streams have completed (if stream 2 completes before stream 1 then the concatenated stream is completed when stream 1 completes)
  • closeOnError flag is set to true and an error is received in the active input stream

marble diagram

 

Live demo here.

 

repeat

StreamExt.repeat has the following signature:

Stream repeat(Stream input, { int repeatCount, bool closeOnError : false, bool sync : false })

this method allows you to repeat the input stream for the specified number of times. If repeatCount is not set, then the input stream will be repeated indefinitely. The repeated stream respects both the order and timing of values from the input stream.

The done value is not delivered when the input stream completes, but only after the input stream has been repeated the required number of times.

The output stream will complete if:

  • the input stream has been repeated the required number of times
  • the closeOnError flag is set to true and an error has been received

marble diagram

 

Live demo here.

 

sample

StreamExt.sample has the following signature:

Stream sample(Stream input, Duration duration, { bool closeOnError : false, bool sync : false })

this method creates a new stream by taking the last value from the input stream for every specified duration.

The sampled stream will complete if:

  • the input stream has completed and any sampled message has been delivered
  • the closeOnError flag is set to true and an error has been received

marble diagram

 

Live demo here.

 

startWith

StreamExt.startWith has the following signature:

Stream startWith(Stream input, Iterable values, { bool closeOnError : false, bool sync : false })

this method allows you to prefix values to a stream. The supplied values are delivered as soon as the listener is subscribed before the listener receives values from the input stream.

The output stream will complete if:

  • the input stream has completed
  • closeOnError flag is set to true and an error is received

marble diagram

 

Live demo here.

 

Once again, I hope you find these methods useful to you and please don’t hesitate to send me any questions or feedbacks you have about the project.

 

Links

Source Code

API Reference

Getting Started guide

Wiki (with live demo for each method)

Intro to Rx

stream_ext – bringing more Rx API to the Dart

Over the last week or so, I’ve been looking at and playing around with the Streams API in Dart, which has been (in part at least) based on the Rx API, and it’s easy to see the parallels between the two sets of APIs and you can find most of the core Rx APIs on Dart’s Stream type already, but there were a few notable absentees which I have often found useful, such as merge, zip, delay and window.

With these absentees in mind I started a small Dart project called stream_ext (for stream extensions) to port them over, and here are some details of the first couple of functions I have done along with some live demo examples.

 

merge

The merge function merges two streams into a single unified output stream.

stream_ext_merge

You can try a live demo use of the merge function here, with the source code here.

 

combineLatest

The combineLatest function merges two streams into one by using the supplied selector function whenever one of the streams produces an event.

stream_ext_combineLatest

You can try a live demo use of the combineLatest function here, with the source code here.

 

delay

The delay function creates a new stream whose events are directly sourced from the input stream but each delivered after the specified duration.

stream_ext_delay

You can try a live demo (a port of the ‘Time flies like an array’ example) use of the delay function here, with the source code here.

 

throttle

The throttle function creates a new stream based on events produced by the specified input, upon forwarding an event from the input stream it’ll ignore any subsequent events produced by the input stream until the flow of new events has paused for the specified duration, after which the last event produced by the input stream is then delivered.

stream_ext_throttle

You can try a live demo use of the throttle function here, with the source code here.

 

zip

The zip function combines two streams into one by combining their elements in a pairwise fashion, combining the latest value on each stream and producing a new event using the supplied zipper function.

stream_ext_zip

You can try a live demo use of the zip function here, with the source code here.

 

It’s still early days and there are quite a few more useful Rx function I think will be useful to people working with Dart, and I hope you find these extension functions useful and please don’t hesitate to get in touch and leave some feedbacks on the project!

Reactive Extensions for Javascript – Observable vs ConnectableObservable

For those of you who are familiar with Reactive Extensions you should know all about observables already, but did you know that there’s another kind of observable sequence – Rx.ConnectableObservable.

The difference between the two types of observable sequences is well explained here, in short, a connectable observable sequence allows you to share the same source sequence of values with multiple subscribers whilst the normal observable sequence gives each subscriber its own sequence of values. Whilst in most cases this difference doesn’t have any practical impacts as each subscribers are given the same values in the same order, however, consider this observable sequence of random numbers between 0 and 1000:

   1: var maxNumber = 1000;

   2: var observableSource = Rx.Observable.GenerateWithTime(

   3:     Math.random(),                                      // initial state

   4:     function (x) { return true; },                      // condition

   5:     function (x) { return Math.random(); },             // iterator     

   6:     function (x) { return parseInt(x * maxNumber); },   // select

   7:     function (x) { return 1000 });                      // interval

As you can see, each time the iterator is invoked it’ll generate a different value, hence subscribers will receive a different value each time (see demo below):

   1: // first subscriber

   2: observableSource.Subscribe(function (n) {

   3:     sub1Span.html(n);

   4: });

   5:  

   6: // second subscriber

   7: observableSource.Subscribe(function (n) {

   8:     sub2Span.html(n);

   9: });

Instead, if you want to ensure that all the subscribers receive the same values, your best bet is to ‘publish‘ the source:

   1: // create a connectable observable from the source

   2: var connectableObservable = observableSource.Publish();

which returns you a connectable observable that you can then attach subscribers to:

   1: // connected subscriber 1

   2: connectableObservable.Subscribe(function (n) {

   3:     connSub1Span.html(n);

   4: });

   5:  

   6: // connected subscriber 2

   7: connectableObservable.Subscribe(function (n) {

   8:     connSub2Span.html(n);

   9: });

and once you ‘connect‘ to the underlying source, the subscribers will start receiving values from the stream:

   1: connectableObservable.Connect();

Demo