Yan Cui
I help clients go faster for less using serverless technologies.
This article is brought to you by
Don’t reinvent the patterns. Catalyst gives you consistent APIs for messaging, data, and workflow with key microservice patterns like circuit-breakers and retries for free.
I have just published version 0.3.0 of stream_ext, my attempt to port the Rx APIs to Dart. In this version I have added a number of additional methods to the existing set of:
- average
- buffer
- combineLatest
- concat
- delay
- max
- merge
- min
- repeat
- sample
- scan
- startWith
- sum
- throttle
- window
- zip
amb
StreamExt.amb has the following signature:
Stream amb(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })
this method propagates values from the stream that reacts first with a value.
This method will ignore any errors received from either stream until the first value is received. The stream which reacts first with a value will have its values and errors propagated through the output stream.
The output stream will complete if:
- neither stream produced a value before completing
- the propagated stream has completed
- the closeOnError flag is set to true and an error is received in the propagated stream
Live demo here.
More information on the Observable.amb extension method from Rx API here.
log
StreamExt.log has the following signature:
void log(Stream input, [ String prefix, void log(Object msg) ])
this helper method provide an easy way to log when new values and errors are received and when the stream is done.
You can see example usage here.
onErrorResumeNext
StreamExt.onErrorResumeNext has the following signature:
Stream onErrorResumeNext(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })
this method allows the continuation of a stream with another regardless of whether the first stream completes gracefully or due to an error.
The output stream will complete if:
- both input streams have completed (if stream 2 completes before stream 1 then the output stream is completed when stream 1 completes)
- closeOnError flag is set to true and an error is received in the continuation stream
Live demo here.
More information on the Observable.onErrorResumeNext extension method from the Rx API here.
switchFrom
StreamExt.switchFrom has the following signature:
Stream switchFrom(Stream<Stream> inputs, { bool closeOnError : false, bool sync : false })
this method transforms a stream of streams into a stream producing values only from the most recent stream.
The output stream will complete if:
- the input stream has completed and the last stream has completed
- closeOnError flag is set to true and an error is received in the active stream
Live demo here.
More information on the Observable.Switch extension method from the Rx API here.
timeOut
StreamExt.timeOut has the following signature:
Stream timeOut(Stream input, Duration duration, { bool closeOnError : false, bool sync : false })
this method allows you to terminate a stream with a TimeoutError if the specified duration between values elapsed.
The output stream will complete if:
- the input stream has completed
- the specified duration between input values has elapsed
- closeOnError flag is set to true and an error is received
Live demo here.
More information on the Observable.Timeout extension method from the Rx API here.
timeOutAt
StreamExt.timeOutAt has the following signature:
Stream timeOutAt(Stream input, DateTime dueTime, { bool closeOnError : false, bool sync : false })
this method allows you to terminate a stream with a TimeoutError at the specified dueTime.
The output stream will complete if:
- the input stream has completed
- the specified dueTime has elapsed
- closeOnError flag is set to true and an error is received
Live demo here.
Links
Wiki (with live demo for each method)
Whenever you’re ready, here are 3 ways I can help you:
- Production-Ready Serverless: Join 20+ AWS Heroes & Community Builders and 1000+ other students in levelling up your serverless game. This is your one-stop shop for quickly levelling up your serverless skills.
- I help clients launch product ideas, improve their development processes and upskill their teams. If you’d like to work together, then let’s get in touch.
- Join my community on Discord, ask questions, and join the discussion on all things AWS and Serverless.