stream_ext – version 0.2.0 is out

Late­ly I’ve been mak­ing steady progress in port­ing over Rx APIs over to Dart with stream_ext, and with the release of ver­sion 0.2.0 a few more Rx meth­ods have been added to the exist­ing set of buffer, com­bineLat­est, delay, max, merge, min, scan, sum, throt­tle, win­dow and zip.

 

average

StreamExt.average has the fol­low­ing sig­na­ture:

Future average(Stream input, { num map (dynam­ic elem), bool closeOn­Error : false, bool sync : false })

this method returns the aver­age of the input val­ues as a Future which com­pletes when the input stream is done.

This method uses the sup­plied map func­tion to con­vert each input val­ue into a num. If a map func­tion is not spec­i­fied then the iden­ti­ty func­tion is used instead.

If closeOn­Error flag is set to true, then any error in the map func­tion or from the input stream will com­plete the Future with the error. Oth­er­wise, any errors will be swal­lowed and exclud­ed from the final aver­age.

 

Live demo here.

 

concat

StreamExt.concat has the fol­low­ing sig­na­ture:

Stream concat(Stream stream1, Stream stream2, { bool closeOn­Error : false, bool sync : false })

this method con­cate­nates two streams togeth­er, when the first stream com­pletes the sec­ond stream is sub­scribed to. Until the first stream is done any val­ues and errors from the sec­ond stream is ignored.

The con­cate­nat­ed stream will com­plete if:

  • both input streams have com­plet­ed (if stream 2 com­pletes before stream 1 then the con­cate­nat­ed stream is com­plet­ed when stream 1 com­pletes)
  • closeOn­Error flag is set to true and an error is received in the active input stream

marble diagram

 

Live demo here.

 

repeat

StreamExt.repeat has the fol­low­ing sig­na­ture:

Stream repeat(Stream input, { int repeat­Count, bool closeOn­Error : false, bool sync : false })

this method allows you to repeat the input stream for the spec­i­fied num­ber of times. If repeat­Count is not set, then the input stream will be repeat­ed indef­i­nite­ly. The repeat­ed stream respects both the order and tim­ing of val­ues from the input stream.

The done val­ue is not deliv­ered when the input stream com­pletes, but only after the input stream has been repeat­ed the required num­ber of times.

The out­put stream will com­plete if:

  • the input stream has been repeat­ed the required num­ber of times
  • the closeOn­Error flag is set to true and an error has been received

marble diagram

 

Live demo here.


 

sample

StreamExt.sample has the fol­low­ing sig­na­ture:

Stream sample(Stream input, Dura­tion dura­tion, { bool closeOn­Error : false, bool sync : false })

this method cre­ates a new stream by tak­ing the last val­ue from the input stream for every spec­i­fied dura­tion.

The sam­pled stream will com­plete if:

  • the input stream has com­plet­ed and any sam­pled mes­sage has been deliv­ered
  • the closeOn­Error flag is set to true and an error has been received

marble diagram

 

Live demo here.

 

startWith

StreamExt.startWith has the fol­low­ing sig­na­ture:

Stream startWith(Stream input, Iter­able val­ues, { bool closeOn­Error : false, bool sync : false })

this method allows you to pre­fix val­ues to a stream. The sup­plied val­ues are deliv­ered as soon as the lis­ten­er is sub­scribed before the lis­ten­er receives val­ues from the input stream.

The out­put stream will com­plete if:

  • the input stream has com­plet­ed
  • closeOn­Error flag is set to true and an error is received

marble diagram

 

Live demo here.

 

Once again, I hope you find these meth­ods use­ful to you and please don’t hes­i­tate to send me any ques­tions or feed­backs you have about the project.

 

Links

Source Code

API Ref­er­ence

Get­ting Start­ed guide

Wiki (with live demo for each method)

Intro to Rx