stream_ext – version 0.3.0 is out

I have just pub­lished ver­sion 0.3.0 of stream_ext, my attempt to port the Rx APIs to Dart. In this ver­sion I have added a num­ber of addi­tion­al meth­ods to the exist­ing set of:

 

amb

StreamExt.amb has the fol­low­ing sig­na­ture:

Stream amb(Stream stream1, Stream stream2, { bool closeOn­Error : false, bool sync : false })

this method prop­a­gates val­ues from the stream that reacts first with a val­ue.

This method will ignore any errors received from either stream until the first val­ue is received. The stream which reacts first with a val­ue will have its val­ues and errors prop­a­gat­ed through the out­put stream.

The out­put stream will com­plete if:

  • nei­ther stream pro­duced a val­ue before com­plet­ing
  • the prop­a­gat­ed stream has com­plet­ed
  • the closeOn­Error flag is set to true and an error is received in the prop­a­gat­ed stream

 

Marble Diagram

 

Live demo here.

More infor­ma­tion on the Observable.amb exten­sion method from Rx API here.

 

log

StreamExt.log has the fol­low­ing sig­na­ture:

void log(Stream input, [ String pre­fix, void log(Object msg) ])

this helper method pro­vide an easy way to log when new val­ues and errors are received and when the stream is done.

You can see exam­ple usage here.

 

onErrorResumeNext

StreamExt.onErrorResumeNext has the fol­low­ing sig­na­ture:

Stream onErrorResumeNext(Stream stream1, Stream stream2, { bool closeOn­Error : false, bool sync : false })

this method allows the con­tin­u­a­tion of a stream with anoth­er regard­less of whether the first stream com­pletes grace­ful­ly or due to an error.

The out­put stream will com­plete if:

  • both input streams have com­plet­ed (if stream 2 com­pletes before stream 1 then the out­put stream is com­plet­ed when stream 1 com­pletes)
  • closeOn­Error flag is set to true and an error is received in the con­tin­u­a­tion stream

 

Marble Diagram

 

Live demo here.

More infor­ma­tion on the Observable.onErrorResumeNext exten­sion method from the Rx API here.


 

switchFrom

StreamExt.switchFrom has the fol­low­ing sig­na­ture:

Stream switchFrom(Stream<Stream> inputs, { bool closeOn­Error : false, bool sync : false })

this method trans­forms a stream of streams into a stream pro­duc­ing val­ues only from the most recent stream.

The out­put stream will com­plete if:

  • the input stream has com­plet­ed and the last stream has com­plet­ed
  • closeOn­Error flag is set to true and an error is received in the active stream

 

Marble Diagram

 

Live demo here.

More infor­ma­tion on the Observable.Switch exten­sion method from the Rx API here.

 

timeOut

StreamExt.timeOut has the fol­low­ing sig­na­ture:

Stream timeOut(Stream input, Dura­tion dura­tion, { bool closeOn­Error : false, bool sync : false })

this method allows you to ter­mi­nate a stream with a Time­outEr­ror if the spec­i­fied dura­tion between val­ues elapsed.

The out­put stream will com­plete if:

  • the input stream has com­plet­ed
  • the spec­i­fied dura­tion between input val­ues has elapsed
  • closeOn­Error flag is set to true and an error is received

 

Marble Diagram

 

Live demo here.

More infor­ma­tion on the Observable.Timeout exten­sion method from the Rx API here.

 

timeOutAt

StreamExt.timeOutAt has the fol­low­ing sig­na­ture:

Stream timeOutAt(Stream input, Date­Time due­Time, { bool closeOn­Error : false, bool sync : false })

this method allows you to ter­mi­nate a stream with a Time­outEr­ror at the spec­i­fied due­Time.

The out­put stream will com­plete if:

  • the input stream has com­plet­ed
  • the spec­i­fied due­Time has elapsed
  • closeOn­Error flag is set to true and an error is received

 

Marble Diagram

 

Live demo here.

 

Links

Source Code

API Ref­er­ence

Get­ting Start­ed guide

Wiki (with live demo for each method)

Intro to Rx