stream_ext – bringing more Rx API to the Dart

Yan Cui

I help clients go faster for less using serverless technologies.

This article is brought to you by

Don’t reinvent the patterns. Catalyst gives you consistent APIs for messaging, data, and workflow with key microservice patterns like circuit-breakers and retries for free.

Try the Catalyst beta

Over the last week or so, I’ve been looking at and playing around with the Streams API in Dart, which has been (in part at least) based on the Rx API, and it’s easy to see the parallels between the two sets of APIs and you can find most of the core Rx APIs on Dart’s Stream type already, but there were a few notable absentees which I have often found useful, such as merge, zip, delay and window.

With these absentees in mind I started a small Dart project called stream_ext (for stream extensions) to port them over, and here are some details of the first couple of functions I have done along with some live demo examples.

 

merge

The merge function merges two streams into a single unified output stream.

stream_ext_merge

You can try a live demo use of the merge function here, with the source code here.

 

combineLatest

The combineLatest function merges two streams into one by using the supplied selector function whenever one of the streams produces an event.

stream_ext_combineLatest

You can try a live demo use of the combineLatest function here, with the source code here.

 

delay

The delay function creates a new stream whose events are directly sourced from the input stream but each delivered after the specified duration.

stream_ext_delay

You can try a live demo (a port of the ‘Time flies like an array’ example) use of the delay function here, with the source code here.

 

throttle

The throttle function creates a new stream based on events produced by the specified input, upon forwarding an event from the input stream it’ll ignore any subsequent events produced by the input stream until the flow of new events has paused for the specified duration, after which the last event produced by the input stream is then delivered.

stream_ext_throttle

You can try a live demo use of the throttle function here, with the source code here.

 

zip

The zip function combines two streams into one by combining their elements in a pairwise fashion, combining the latest value on each stream and producing a new event using the supplied zipper function.

stream_ext_zip

You can try a live demo use of the zip function here, with the source code here.

 

It’s still early days and there are quite a few more useful Rx function I think will be useful to people working with Dart, and I hope you find these extension functions useful and please don’t hesitate to get in touch and leave some feedbacks on the project!

Whenever you’re ready, here are 3 ways I can help you:

  1. Production-Ready Serverless: Join 20+ AWS Heroes & Community Builders and 1000+ other students in levelling up your serverless game. This is your one-stop shop for quickly levelling up your serverless skills.
  2. I help clients launch product ideas, improve their development processes and upskill their teams. If you’d like to work together, then let’s get in touch.
  3. Join my community on Discord, ask questions, and join the discussion on all things AWS and Serverless.

4 thoughts on “stream_ext – bringing more Rx API to the Dart”

  1. DUDE, thank you for merge and zip, omg! Especially merge.

    … now that I have your code working, what’s a use case you can think of for StreamController.addStream? I don’t get it’s usage; I thought it’d work like your merge, but obviously it doesn’t.

  2. Thanks! Glad to hear you find it useful.

    My understanding was that StreamController.addStream should work similar to merge but it didn’t behave in the way I expected (http://stackoverflow.com/q/18647371/55074). I also needed zip and a few of the other Rx functions for a mini-game I was working on, hence why I created the stream_ext library.

  3. Rad. Yeah, I had the same challenges; I assumed addStream was analogous to merge, but required TONS of setup code/futures just to merge streams whereas in Bacon/RX, you just call merge and you’re done.

    Thanks again!

  4. Pingback: Year in Review, 2013 | theburningmonk.com

Leave a Comment

Your email address will not be published. Required fields are marked *