stream_ext – version 0.3.0 is out

I have just published version 0.3.0 of stream_ext, my attempt to port the Rx APIs to Dart. In this version I have added a number of additional methods to the existing set of:

 

amb

StreamExt.amb has the following signature:

Stream amb(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })

this method propagates values from the stream that reacts first with a value.

This method will ignore any errors received from either stream until the first value is received. The stream which reacts first with a value will have its values and errors propagated through the output stream.

The output stream will complete if:

  • neither stream produced a value before completing
  • the propagated stream has completed
  • the closeOnError flag is set to true and an error is received in the propagated stream

 

Marble Diagram

 

Live demo here.

More information on the Observable.amb extension method from Rx API here.

 

log

StreamExt.log has the following signature:

void log(Stream input, [ String prefix, void log(Object msg) ])

this helper method provide an easy way to log when new values and errors are received and when the stream is done.

You can see example usage here.

 

onErrorResumeNext

StreamExt.onErrorResumeNext has the following signature:

Stream onErrorResumeNext(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })

this method allows the continuation of a stream with another regardless of whether the first stream completes gracefully or due to an error.

The output stream will complete if:

  • both input streams have completed (if stream 2 completes before stream 1 then the output stream is completed when stream 1 completes)
  • closeOnError flag is set to true and an error is received in the continuation stream

 

Marble Diagram

 

Live demo here.

More information on the Observable.onErrorResumeNext extension method from the Rx API here.

 

switchFrom

StreamExt.switchFrom has the following signature:

Stream switchFrom(Stream<Stream> inputs, { bool closeOnError : false, bool sync : false })

this method transforms a stream of streams into a stream producing values only from the most recent stream.

The output stream will complete if:

  • the input stream has completed and the last stream has completed
  • closeOnError flag is set to true and an error is received in the active stream

 

Marble Diagram

 

Live demo here.

More information on the Observable.Switch extension method from the Rx API here.

 

timeOut

StreamExt.timeOut has the following signature:

Stream timeOut(Stream input, Duration duration, { bool closeOnError : false, bool sync : false })

this method allows you to terminate a stream with a TimeoutError if the specified duration between values elapsed.

The output stream will complete if:

  • the input stream has completed
  • the specified duration between input values has elapsed
  • closeOnError flag is set to true and an error is received

 

Marble Diagram

 

Live demo here.

More information on the Observable.Timeout extension method from the Rx API here.

 

timeOutAt

StreamExt.timeOutAt has the following signature:

Stream timeOutAt(Stream input, DateTime dueTime, { bool closeOnError : false, bool sync : false })

this method allows you to terminate a stream with a TimeoutError at the specified dueTime.

The output stream will complete if:

  • the input stream has completed
  • the specified dueTime has elapsed
  • closeOnError flag is set to true and an error is received

 

Marble Diagram

 

Live demo here.

 

Links

Source Code

API Ref­er­ence

Get­ting Started guide

Wiki (with live demo for each method)

Intro to Rx

stream_ext – version 0.2.0 is out

Lately I’ve been making steady progress in porting over Rx APIs over to Dart with stream_ext, and with the release of version 0.2.0 a few more Rx methods have been added to the existing set of buffer, combineLatest, delay, max, merge, min, scan, sum, throttle, window and zip.

 

average

StreamExt.average has the following signature:

Future average(Stream input, { num map (dynamic elem), bool closeOnError : false, bool sync : false })

this method returns the average of the input values as a Future which completes when the input stream is done.

This method uses the supplied map function to convert each input value into a num. If a map function is not specified then the identity function is used instead.

If closeOnError flag is set to true, then any error in the map function or from the input stream will complete the Future with the error. Otherwise, any errors will be swallowed and excluded from the final average.

 

Live demo here.

 

concat

StreamExt.concat has the following signature:

Stream concat(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })

this method concatenates two streams together, when the first stream completes the second stream is subscribed to. Until the first stream is done any values and errors from the second stream is ignored.

The concatenated stream will complete if:

  • both input streams have completed (if stream 2 completes before stream 1 then the concatenated stream is completed when stream 1 completes)
  • closeOnError flag is set to true and an error is received in the active input stream

marble diagram

 

Live demo here.

 

repeat

StreamExt.repeat has the following signature:

Stream repeat(Stream input, { int repeatCount, bool closeOnError : false, bool sync : false })

this method allows you to repeat the input stream for the specified number of times. If repeatCount is not set, then the input stream will be repeated indefinitely. The repeated stream respects both the order and timing of values from the input stream.

The done value is not delivered when the input stream completes, but only after the input stream has been repeated the required number of times.

The output stream will complete if:

  • the input stream has been repeated the required number of times
  • the closeOnError flag is set to true and an error has been received

marble diagram

 

Live demo here.

 

sample

StreamExt.sample has the following signature:

Stream sample(Stream input, Duration duration, { bool closeOnError : false, bool sync : false })

this method creates a new stream by taking the last value from the input stream for every specified duration.

The sampled stream will complete if:

  • the input stream has completed and any sampled message has been delivered
  • the closeOnError flag is set to true and an error has been received

marble diagram

 

Live demo here.

 

startWith

StreamExt.startWith has the following signature:

Stream startWith(Stream input, Iterable values, { bool closeOnError : false, bool sync : false })

this method allows you to prefix values to a stream. The supplied values are delivered as soon as the listener is subscribed before the listener receives values from the input stream.

The output stream will complete if:

  • the input stream has completed
  • closeOnError flag is set to true and an error is received

marble diagram

 

Live demo here.

 

Once again, I hope you find these methods useful to you and please don’t hesitate to send me any questions or feedbacks you have about the project.

 

Links

Source Code

API Reference

Getting Started guide

Wiki (with live demo for each method)

Intro to Rx

Run Taotie Run – new Here Be Monsteres mini-game made with Dart and StageXL

Using StageXL and Dart, I built another mini-game themed around our MMORPG Here Be Monsters this week. The game follows a pack of Taotie monsters, which is a type of spirit monster created when a ghost with immense hunger possesses a Chinese pot. Taotie originates from Chinese folklores and is one of many monsters that you encounter in the world of Here Be Monsters as seen in the in-game almanac page on Taotie:

Taotie's almanac page 

 

Run Taotie Run

As the player, you assume the role of the leader of this pack of travelling Taotie who are looking for anything and everything to eat (I’m not making these up, according to the folklores they eat just about anything!) when they ran into a Starium shower (i.e. asteroids falling from the sky).

Screenshot 

Being pots, you will crack and die if you get hit by an asteroid, hence the premise of the game is for you to lead your Taotie minions and navigate through the asteroids and survive for as long as you can.

To make things slightly more interesting and to encourage player to take more calculated risk, how fast you accumulate points towards your score is based on how you play. If you choose to hide around the edges you might have a better chance of surviving longer but your score also increase at a much slower rate compared to if you were to spend more time around the centre of the screen where your reaction time will be more thoroughly tested by the incoming asteroids:

Hint 

Whilst self-preservation is of the highest priority (you get hit directly and it’s game over..), to discourage you from using your minions as mere cannon fodders the rate you accumulate score is also directly proportional to the number of minions you have. So keep your minions alive and you’ll pick up points a lot faster as a reward!

What’s more, there will occasionally be Cherries appearing on the screen for a brief moments, and collecting them before they disappear will yield a handsome 10000 bonus points towards your score!

Screenshot Screenshot

 

 

stream_ext

To create the desired effect of the minions following your every move but always being several steps behind, I used the Zip function from the Rx’s set of APIs which I had begun porting over to Dart with a small project called stream_ext which is available on pub.dartlang.org here.

It’s also worth mentioning that the Rx’s APIs had been studied and taken into account in the design of Dart’s Stream type which already contains the most commonly used Rx APIs such as fold, map and where, so my porting effort is strictly limited to those more ‘exotic’ functions such as zip, merge, combineLatest and delay just to name a few.

Using zip, it made it very easy to create a stream of mouse move events which is some steps behind the source (the mouse move events emitted whenever you move the mouse), see the highlighted lines 137-138 below:

image

 

Please give the game a go and let me know what you think and any suggestions you have on how to make it more fun!

 

Links

Play the Game

Source Code

stream_ext

Rx.zip

stream_ext – bringing more Rx API to the Dart

Over the last week or so, I’ve been looking at and playing around with the Streams API in Dart, which has been (in part at least) based on the Rx API, and it’s easy to see the parallels between the two sets of APIs and you can find most of the core Rx APIs on Dart’s Stream type already, but there were a few notable absentees which I have often found useful, such as merge, zip, delay and window.

With these absentees in mind I started a small Dart project called stream_ext (for stream extensions) to port them over, and here are some details of the first couple of functions I have done along with some live demo examples.

 

merge

The merge function merges two streams into a single unified output stream.

stream_ext_merge

You can try a live demo use of the merge function here, with the source code here.

 

combineLatest

The combineLatest function merges two streams into one by using the supplied selector function whenever one of the streams produces an event.

stream_ext_combineLatest

You can try a live demo use of the combineLatest function here, with the source code here.

 

delay

The delay function creates a new stream whose events are directly sourced from the input stream but each delivered after the specified duration.

stream_ext_delay

You can try a live demo (a port of the ‘Time flies like an array’ example) use of the delay function here, with the source code here.

 

throttle

The throttle function creates a new stream based on events produced by the specified input, upon forwarding an event from the input stream it’ll ignore any subsequent events produced by the input stream until the flow of new events has paused for the specified duration, after which the last event produced by the input stream is then delivered.

stream_ext_throttle

You can try a live demo use of the throttle function here, with the source code here.

 

zip

The zip function combines two streams into one by combining their elements in a pairwise fashion, combining the latest value on each stream and producing a new event using the supplied zipper function.

stream_ext_zip

You can try a live demo use of the zip function here, with the source code here.

 

It’s still early days and there are quite a few more useful Rx function I think will be useful to people working with Dart, and I hope you find these extension functions useful and please don’t hesitate to get in touch and leave some feedbacks on the project!