Introduction to AWS SimpleWorkflow Extensions Part 3 – Parallelizing activities

The series so far:

  1. Hel­lo World exam­ple
  2. Beyond Hel­lo World

 

With­in a work­flow, not all activ­i­ties have to be per­formed sequen­tial­ly. In fact, to increase through­put and/or reduce the over­all time required to fin­ish a work­flow, you might want to per­form sev­er­al activ­i­ties in par­al­lel pro­vid­ed that they don’t have any inter-depen­den­cies and can be per­formed inde­pen­dent­ly.

In this post we’re going to see how we can use the SWF exten­sions library to par­al­lelize activ­i­ties by sched­ul­ing sev­er­al activ­i­ty tasks at a sin­gle step and then aggre­gate their results into a sin­gu­lar input to the next activ­i­ty in the work­flow.

image

The par­al­lelized activ­i­ties receive their input from either:

  1. the work­flow execution’s input if this is the first step of the work­flow, or
  2. the result of the pre­ced­ing activity/child work­flow in the work­flow

As of now, the library requires you to spec­i­fy a ‘reduc­er’ which is respon­si­ble for aggre­gat­ing the results of the par­al­lel activ­i­ties into a sin­gle string which is returned as the result of the step in the work­flow. There are some caveats to this reduc­er func­tion (of sig­na­ture Dictionary<int, string> –> string right now) as you will see in the exam­ple, I’ll look to address these odd­i­ties and clean up the API in future ver­sions of the library, so please bear with me for now.

The aggre­gate result of these par­al­lel activ­i­ties can then be passed along as the input to the sub­se­quent activ­i­ty as per the exam­ple in my pre­vi­ous post.

Example : Count HTML element types

Sup­pose that, giv­en a URL, you want to count the num­ber of dif­fer­ent HTML ele­ments (e.g. <div>, <span>, …) the returned HTML con­tains, the count­ing of each ele­ment type is inde­pen­dent and can be car­ried out in par­al­lel. For nice­ty, we can add an echo activ­i­ty before and after the count activ­i­ties so that we can print the input URL and the results to the screen. So you will end up with a work­flow that per­haps looks like this:

image

The imple­men­ta­tion of this work­flow is as fol­lows:

Unti­tled

 1: #r "bin/Release/AWSSDK.dll"
 2: #r "bin/Release/SWF.Extensions.Core.dll"
 3: 
 4: open Amazon.SimpleWorkflow
 5: open Amazon.SimpleWorkflow.Extensions
 6: 
 7: open System.Collections.Generic
 8: open System.Net
 9: 
10: let echo str = printfn "%s" str; str
11: 
12: // a function to count the number of occurances of a pattern inside the HTML returned
13: // by the specified URL address
14: let countMatches (pattern : string) (address : string) =
15:     let webClient = new WebClient()
16:     let html = webClient.DownloadString address
17: 
18:     seq { 0..html.Length - pattern.Length }
19:     |> Seq.map (fun i -> html.Substring(i, pattern.Length))
20:     |> Seq.filter ((=) pattern)
21:     |> Seq.length
22: 
23: let echoActivity = Activity(
24:                         "echo", "echo input", echo,
25:                         taskHeartbeatTimeout       = 60, 
26:                         taskScheduleToStartTimeout = 10,
27:                         taskStartToCloseTimeout    = 10, 
28:                         taskScheduleToCloseTimeout = 20)
29: 
30: let countDivs = Activity<string, int>(
31:                         "count_divs", "count the number of <div> elements", 
32:                         countMatches "<div",
33:                         taskHeartbeatTimeout       = 60, 
34:                         taskScheduleToStartTimeout = 10,
35:                         taskStartToCloseTimeout    = 10, 
36:                         taskScheduleToCloseTimeout = 20)
37: 
38: let countScripts = Activity<string, int>(
39:                         "count_scripts", "count the number of <script> elements", 
40:                         countMatches "<script",
41:                         taskHeartbeatTimeout       = 60, 
42:                         taskScheduleToStartTimeout = 10,
43:                         taskStartToCloseTimeout    = 10, 
44:                         taskScheduleToCloseTimeout = 20)
45: 
46: let countSpans = Activity<string, int>(
47:                         "count_spans", "count the number of <span> elements", 
48:                         countMatches "<span",
49:                         taskHeartbeatTimeout       = 60, 
50:                         taskScheduleToStartTimeout = 10,
51:                         taskStartToCloseTimeout    = 10, 
52:                         taskScheduleToCloseTimeout = 20)
53: 
54: let countActivities = [| countDivs      :> ISchedulable
55:                          countScripts   :> ISchedulable
56:                          countSpans     :> ISchedulable |]
57: 
58: let countReducer (results : Dictionary<int, string>) =
59:     sprintf "Divs : %d\nScripts : %d\nSpans : %d\n" (int results.[0]) (int results.[1]) (int results.[2])
60: 
61: let countElementsWorkflow = 
62:     Workflow(domain = "theburningmonk.com", name = "count_html_elements", 
63:              description = "this workflow counts", 
64:              version = "1")
65:     ++> echoActivity
66:     ++> (countActivities, countReducer)
67:     ++> echoActivity
68: 
69: let awsKey      = "PUT-YOUR-AWS-KEY-HERE"
70: let awsSecret   = "PUT-YOUR-AWS-SECRET-HERE"
71: let client = new AmazonSimpleWorkflowClient(awsKey, awsSecret)
72: 
73: countElementsWorkflow.Start(client)

name­space Ama­zon
name­space Amazon.SimpleWorkflow
name­space Amazon.SimpleWorkflow.Extensions
name­space Sys­tem
name­space System.Collections
name­space System.Collections.Generic
name­space System.Net
val echo : str:string -> string

Full name: ParallelizeActivities.echo

val str : string
val printfn : format:Printf.TextWriterFormat<‘T> -> ‘TFull name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val count­Match­es : pattern:string -> address:string -> int­Full name: ParallelizeActivities.countMatches
val pat­tern : string
Mul­ti­ple items

val string : value:‘T -> string­Full name: Microsoft.FSharp.Core.Operators.string

——————–

type string = System.String

Full name: Microsoft.FSharp.Core.string

val address : string
val web­Client : Web­Client
Mul­ti­ple items

type Web­Client =  inher­it Com­po­nent

new : unit -> Web­Client

mem­ber AllowRead­Stream­Buffer­ing : bool with get, set

mem­ber AllowWriteStream­Buffer­ing : bool with get, set

mem­ber BaseAd­dress : string with get, set

mem­ber CacheP­ol­i­cy : Request­CacheP­ol­i­cy with get, set

mem­ber Can­ce­lA­sync : unit -> unit

mem­ber Cre­den­tials : ICre­den­tials with get, set

mem­ber Down­load­Da­ta : address:string -> byte[] + 1 over­load

mem­ber Down­load­DataA­sync : address:Uri -> unit + 1 over­load

mem­ber Down­load­DataTaskA­sync : address:string -> Task<byte[]> + 1 over­load

Full name: System.Net.WebClient

——————–

Web­Client() : unit

val html : string
WebClient.DownloadString(address: System.Uri) : string

WebClient.DownloadString(address: string) : string

Mul­ti­ple items

val seq : sequence:seq<‘T> -> seq<‘T>Full name: Microsoft.FSharp.Core.Operators.seq

——————–

type seq<‘T> = IEnumerable<‘T>

Full name: Microsoft.FSharp.Collections.seq<_>

prop­er­ty System.String.Length: int
mod­ule Seqfrom Microsoft.FSharp.Collections
val map : mapping:(‘T -> ‘U) -> source:seq<‘T> -> seq<‘U>Full name: Microsoft.FSharp.Collections.Seq.map
val i : int
System.String.Substring(startIndex: int) : string

System.String.Substring(startIndex: int, length: int) : string

val fil­ter : predicate:(‘T -> bool) -> source:seq<‘T> -> seq<‘T>Full name: Microsoft.FSharp.Collections.Seq.filter
val length : source:seq<‘T> -> int­Full name: Microsoft.FSharp.Collections.Seq.length
val echoAc­tiv­i­ty : Activity<string,string>Full name: ParallelizeActivities.echoActivity
Mul­ti­ple items

type Activ­i­ty = Activity<string,string>Full name: Amazon.SimpleWorkflow.Extensions.Activity

——————–

new : name:obj * description:obj * processor:System.Func<‘TInput,‘TOutput> * taskHeartbeatTimeout:obj * taskScheduleToStartTimeout:obj * taskStartToCloseTimeout:obj * taskScheduleToCloseTimeout:obj * ?taskList:obj -> Activity<‘TInput,‘TOutput>

new : name:string * description:string * processor:(‘TInput -> ‘TOut­put) * taskHeartbeatTimeout:Model.Seconds * taskScheduleToStartTimeout:Model.Seconds * taskStartToCloseTimeout:Model.Seconds * taskScheduleToCloseTimeout:Model.Seconds * ?taskList:string * ?maxAttempts:int -> Activity<‘TInput,‘TOutput>

val count­Di­vs : Activity<string,int>Full name: ParallelizeActivities.countDivs
Mul­ti­ple items

val int : value:‘T -> int (requires mem­ber op_Explicit)Full name: Microsoft.FSharp.Core.Operators.int

——————–

type int = int32

Full name: Microsoft.FSharp.Core.int

——————–

type int<‘Measure> = int

Full name: Microsoft.FSharp.Core.int<_>

val countScripts : Activity<string,int>Full name: ParallelizeActivities.countScripts
val countSpans : Activity<string,int>Full name: ParallelizeActivities.countSpans
val coun­tAc­tiv­i­ties : ISchedu­la­ble []Full name: ParallelizeActivities.countActivities
type ISchedu­la­ble =

inter­face    abstract mem­ber Descrip­tion : string

abstract mem­ber Max­At­tempts : int

abstract mem­ber Name : string

end

Full name: Amazon.SimpleWorkflow.Extensions.ISchedulable

val coun­tRe­duc­er : results:Dictionary<int,string> -> string­Full name: ParallelizeActivities.countReducer
val results : Dictionary<int,string>
Mul­ti­ple items

type Dictionary<‘TKey,‘TValue> =  new : unit -> Dictionary<‘TKey, ‘TVal­ue> + 5 over­loads

mem­ber Add : key:‘TKey * value:‘TValue -> unit

mem­ber Clear : unit -> unit

mem­ber Com­par­er : IEqualityComparer<‘TKey>

mem­ber Con­tainsKey : key:‘TKey -> bool

mem­ber Con­tainsVal­ue : value:‘TValue -> bool

mem­ber Count : int

mem­ber GetEnu­mer­a­tor : unit -> Enumerator<‘TKey, ‘TVal­ue>

mem­ber GetO­b­ject­Da­ta : info:SerializationInfo * context:StreamingContext -> unit

mem­ber Item : ‘TKey -> ‘TVal­ue with get, set

nest­ed type Enu­mer­a­tor

nest­ed type Key­Col­lec­tion

nest­ed type Val­ueCol­lec­tion

Full name: System.Collections.Generic.Dictionary<_,_>

——————–

Dic­tio­nary() : unit

Dictionary(capacity: int) : unit

Dictionary(comparer: IEqualityComparer<‘TKey>) : unit

Dictionary(dictionary: IDictionary<‘TKey,‘TValue>) : unit

Dictionary(capacity: int, com­par­er: IEqualityComparer<‘TKey>) : unit

Dictionary(dictionary: IDictionary<‘TKey,‘TValue>, com­par­er: IEqualityComparer<‘TKey>) : unit

val sprintf : format:Printf.StringFormat<‘T> -> ‘TFull name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val coun­tEle­mentsWork­flow : Work­flow­Full name: ParallelizeActivities.countElementsWorkflow
Mul­ti­ple items

type Work­flow =  inter­face IWork­flow

new : domain:string * name:string * description:string * version:string * ?taskList:string * ?stages:Stage list * ?taskStartToCloseTimeout:Seconds * ?execStartToCloseTimeout:Seconds * ?childPolicy:ChildPolicy * ?identity:Identity * ?maxAttempts:int -> Work­flow

mem­ber pri­vate Append : toStageAction:(‘a -> Stage­Ac­tion) * args:‘a -> Work­flow

mem­ber Start : swfClt:AmazonSimpleWorkflowClient -> unit

mem­ber add_OnActivityFailed : Handler<Domain * Name * Activ­i­tyId * Details option * Rea­son option> -> unit

mem­ber add_OnActivityTaskError : Handler<Exception> -> unit

mem­ber add_OnDecisionTaskError : Handler<Exception> -> unit

mem­ber add_OnWorkflowCompleted : Handler<Domain * Name> -> unit

mem­ber add_OnWorkflowFailed : Handler<Domain * Name * RunId * Details option * Rea­son option> -> unit

mem­ber Num­berOf­Stages : int

Full name: Amazon.SimpleWorkflow.Extensions.Workflow

——————–

new : domain:string * name:string * description:string * version:string * ?taskList:string * ?stages:Stage list * ?taskStartToCloseTimeout:Model.Seconds * ?execStartToCloseTimeout:Model.Seconds * ?childPolicy:Model.ChildPolicy * ?identity:Model.Identity * ?maxAttempts:int -> Work­flow

val awsKey : string­Full name: ParallelizeActivities.awsKey
val awsSe­cret : string­Full name: ParallelizeActivities.awsSecret
val client : Ama­zon­Sim­ple­Work­flow­Client­Full name: ParallelizeActivities.client
Mul­ti­ple items

type Ama­zon­Sim­ple­Work­flow­Client =  inher­it Ama­zon­Web­Ser­vice­Client

new : unit -> Ama­zon­Sim­ple­Work­flow­Client + 11 over­loads

mem­ber Begin­Count­Closed­Work­flowEx­e­cu­tions : countClosedWorkflowExecutionsRequest:CountClosedWorkflowExecutionsRequest * callback:AsyncCallback * state:obj -> IAsyn­cRe­sult

mem­ber Begin­Coun­tOpen­Work­flowEx­e­cu­tions : countOpenWorkflowExecutionsRequest:CountOpenWorkflowExecutionsRequest * callback:AsyncCallback * state:obj -> IAsyn­cRe­sult

mem­ber Begin­Count­Pendin­gAc­tiv­i­ty­Tasks : countPendingActivityTasksRequest:CountPendingActivityTasksRequest * callback:AsyncCallback * state:obj -> IAsyn­cRe­sult

mem­ber Begin­Count­Pend­ingDe­ci­sion­Tasks : countPendingDecisionTasksRequest:CountPendingDecisionTasksRequest * callback:AsyncCallback * state:obj -> IAsyn­cRe­sult

mem­ber Begin­Dep­re­cate­Ac­tiv­i­ty­Type : deprecateActivityTypeRequest:DeprecateActivityTypeRequest * callback:AsyncCallback * state:obj -> IAsyn­cRe­sult

mem­ber Begin­Dep­re­cate­Do­main : deprecateDomainRequest:DeprecateDomainRequest * callback:AsyncCallback * state:obj -> IAsyn­cRe­sult

mem­ber Begin­Dep­re­cate­Work­flow­Type : deprecateWorkflowTypeRequest:DeprecateWorkflowTypeRequest * callback:AsyncCallback * state:obj -> IAsyn­cRe­sult

mem­ber Begin­De­scribeAc­tiv­i­ty­Type : describeActivityTypeRequest:DescribeActivityTypeRequest * callback:AsyncCallback * state:obj -> IAsyn­cRe­sult

mem­ber Begin­De­scribeDo­main : describeDomainRequest:DescribeDomainRequest * callback:AsyncCallback * state:obj -> IAsyn­cRe­sult

Full name: Amazon.SimpleWorkflow.AmazonSimpleWorkflowClient

——————–

Ama­zon­Sim­ple­Work­flow­Client() : unit

(+0 oth­er over­loads)

AmazonSimpleWorkflowClient(region: Amazon.RegionEndpoint) : unit

(+0 oth­er over­loads)

AmazonSimpleWorkflowClient(config: Ama­zon­Sim­ple­Work­flow­Con­fig) : unit

(+0 oth­er over­loads)

AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials) : unit

(+0 oth­er over­loads)

AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials, region: Amazon.RegionEndpoint) : unit

(+0 oth­er over­loads)

AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials, client­Con­fig: Ama­zon­Sim­ple­Work­flow­Con­fig) : unit

(+0 oth­er over­loads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSec­re­tAc­cessKey: string) : unit

(+0 oth­er over­loads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSec­re­tAc­cessKey: string, region: Amazon.RegionEndpoint) : unit

(+0 oth­er over­loads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSec­re­tAc­cessKey: string, client­Con­fig: Ama­zon­Sim­ple­Work­flow­Con­fig) : unit

(+0 oth­er over­loads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSec­re­tAc­cessKey: string, awsSes­sion­To­ken: string) : unit

(+0 oth­er over­loads)

mem­ber Workflow.Start : swfClt:AmazonSimpleWorkflowClient -> unit

Thanks to Tomas Pet­ricek’s FSharp.Formatting project I’m now able to pro­vide code snip­pets with intel­lisense! Tomas, you rock!

 

Run­ning the above exam­ple and start­ing a work­flow exe­cu­tion with the input

http://www.bing.com

out­puts the fol­low­ing to the con­sole:

image

If you take a look at the his­to­ry of events below, the deci­sion task fol­low­ing the com­ple­tion of stage 0 (the first echo activ­i­ty) was com­plet­ed with the state (tracked in the Exe­cu­tion Con­text prop­er­ty):

{“CurrentStageNumber”:1,“NumberOfActions”:3,“Results”:{}}

With­out going into too much details on the inner work­ings of the gen­er­at­ed decider, this JSON seri­al­ized state tells us that the work­flow has moved into stage no. 1, where there are a total of 3 actions, each rep­re­sent­ed by an activ­i­ty task to count a par­tic­u­lar type of HTML ele­ment.

image

Switch­ing to the Activ­i­ties tab, you can see that 3 activ­i­ties were com­plet­ed at stage index 1 of the count_html_elements work­flow, judg­ing by the Activ­i­ty ID and Ver­sion of the 3 activ­i­ties:

image

Caveats

Look­ing at the exam­ple code, a cou­ple of ques­tions jump out straight away:

Q. What is the ISchedu­la­ble inter­face?

The ISchedu­la­ble inter­face rep­re­sents any­thing that can be sched­uled as a part of a work­flow, i.e. an activ­i­ty or a child work­flow. Both IAc­tiv­i­ty and IWork­flow inher­its from it though in all the exam­ples so far we’ve only worked direct­ly against the con­cept imple­men­ta­tion types for these two inter­faces.

 

Q. Why does the reduc­er take a Dictionary<int, string>?

A. As far as the reduc­er is con­cerned, it prob­a­bly doesn’t need to be. The main rea­son I’ve used a dic­tio­nary here is that when inter­me­di­ate results are avail­able (e.g. 2 out of 3 par­al­lel activ­i­ties have com­plet­ed) I want­ed to be able to show the cur­rent set of results in the Exe­cu­tion Con­text for the work­flow exe­cu­tion (see screen­shot above). Because we don’t have all the results back, so I need­ed to be able to show the result against the orig­i­nat­ing cativ­i­ty, hence why a dic­tio­nary where the key is the zero-based index of the activ­i­ty in the input array and the val­ue is the string rep­re­sen­ta­tion of the result.

 

Q. Why then, are the Dic­tio­nary’s val­ue strings when the sched­uled activ­i­ty can be gener­ic and return arbi­trary types?

Because not all the activ­i­ties have to return the same type.

Under the hood the gener­ic Activity<TInput, TOut­put> mar­shals data to and from JSON strings using ServiceStack.Text JSON seri­al­iz­er, and the Activ­i­ty type is just a spe­cial case where both TIn­put and TOut­put are strings.

When the result is record­ed and retrieved via SWF, they’re already in string for­mat, although it’s pos­si­ble to inspect the orig­i­nat­ing activity’s gener­ic type para­me­ters to work out the returned type, to cater for dif­fer­ent return types, the dic­tio­nary would need to be a Dictionary<int, object> instead, which is not any bet­ter.

 

Q. So what if I want to return any­thing oth­er than a string from the reduc­er func­tion?

For now, you can use the ServiceStack.Text JSON seri­al­iz­er (for bet­ter com­pat­i­bil­i­ty) to seri­al­ize the return val­ue to string your­self, I’ll add sup­port for the library to do this auto­mat­i­cal­ly in ver­sion 1.1.0 release. It skipped my mind at the time, sor­ry…

 

Next

In the next post, I’ll show you how you can add child work­flows in the mix. As I’ve men­tioned above, work­flows also imple­ment the ISchedu­la­ble inter­face and can be sched­uled into the work­flow in the same way as activ­i­ties.

To find out the lat­est announce­ments and updates on the Amazon.SimpleWorkflow.Extensions project, please fol­low the offi­cial twit­ter account @swf_extensions, and as always, your feed­backs and com­ments on the project will be much appre­ci­at­ed!