stream_ext – version 0.2.0 is out

Lately I’ve been making steady progress in porting over Rx APIs over to Dart with stream_ext, and with the release of version 0.2.0 a few more Rx methods have been added to the existing set of buffer, combineLatest, delay, max, merge, min, scan, sum, throttle, window and zip.

 

average

StreamExt.average has the following signature:

Future average(Stream input, { num map (dynamic elem), bool closeOnError : false, bool sync : false })

this method returns the average of the input values as a Future which completes when the input stream is done.

This method uses the supplied map function to convert each input value into a num. If a map function is not specified then the identity function is used instead.

If closeOnError flag is set to true, then any error in the map function or from the input stream will complete the Future with the error. Otherwise, any errors will be swallowed and excluded from the final average.

 

Live demo here.

 

concat

StreamExt.concat has the following signature:

Stream concat(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })

this method concatenates two streams together, when the first stream completes the second stream is subscribed to. Until the first stream is done any values and errors from the second stream is ignored.

The concatenated stream will complete if:

  • both input streams have completed (if stream 2 completes before stream 1 then the concatenated stream is completed when stream 1 completes)
  • closeOnError flag is set to true and an error is received in the active input stream

marble diagram

 

Live demo here.

 

repeat

StreamExt.repeat has the following signature:

Stream repeat(Stream input, { int repeatCount, bool closeOnError : false, bool sync : false })

this method allows you to repeat the input stream for the specified number of times. If repeatCount is not set, then the input stream will be repeated indefinitely. The repeated stream respects both the order and timing of values from the input stream.

The done value is not delivered when the input stream completes, but only after the input stream has been repeated the required number of times.

The output stream will complete if:

  • the input stream has been repeated the required number of times
  • the closeOnError flag is set to true and an error has been received

marble diagram

 

Live demo here.

 

sample

StreamExt.sample has the following signature:

Stream sample(Stream input, Duration duration, { bool closeOnError : false, bool sync : false })

this method creates a new stream by taking the last value from the input stream for every specified duration.

The sampled stream will complete if:

  • the input stream has completed and any sampled message has been delivered
  • the closeOnError flag is set to true and an error has been received

marble diagram

 

Live demo here.

 

startWith

StreamExt.startWith has the following signature:

Stream startWith(Stream input, Iterable values, { bool closeOnError : false, bool sync : false })

this method allows you to prefix values to a stream. The supplied values are delivered as soon as the listener is subscribed before the listener receives values from the input stream.

The output stream will complete if:

  • the input stream has completed
  • closeOnError flag is set to true and an error is received

marble diagram

 

Live demo here.

 

Once again, I hope you find these methods useful to you and please don’t hesitate to send me any questions or feedbacks you have about the project.

 

Links

Source Code

API Reference

Getting Started guide

Wiki (with live demo for each method)

Intro to Rx