The series so far:

  1. Hello World exam­ple
  2. Beyond Hello World

 

Within a work­flow, not all activ­i­ties have to be per­formed sequen­tially. In fact, to increase through­put and/or reduce the over­all time required to fin­ish a work­flow, you might want to per­form sev­eral activ­i­ties in par­al­lel pro­vided that they don’t have any inter-dependencies and can be per­formed independently.

In this post we’re going to see how we can use the SWF exten­sions library to par­al­lelize activ­i­ties by sched­ul­ing sev­eral activ­ity tasks at a sin­gle step and then aggre­gate their results into a sin­gu­lar input to the next activ­ity in the workflow.

image

The par­al­lelized activ­i­ties receive their input from either:

  1. the work­flow execution’s input if this is the first step of the work­flow, or
  2. the result of the pre­ced­ing activity/child work­flow in the workflow

As of now, the library requires you to spec­ify a ‘reducer’ which is respon­si­ble for aggre­gat­ing the results of the par­al­lel activ­i­ties into a sin­gle string which is returned as the result of the step in the work­flow. There are some caveats to this reducer func­tion (of sig­na­ture Dictionary<int, string> –> string right now) as you will see in the exam­ple, I’ll look to address these odd­i­ties and clean up the API in future ver­sions of the library, so please bear with me for now.

The aggre­gate result of these par­al­lel activ­i­ties can then be passed along as the input to the sub­se­quent activ­ity as per the exam­ple in my pre­vi­ous post.

Exam­ple : Count HTML ele­ment types

Sup­pose that, given a URL, you want to count the num­ber of dif­fer­ent HTML ele­ments (e.g. <div>, <span>, …) the returned HTML con­tains, the count­ing of each ele­ment type is inde­pen­dent and can be car­ried out in par­al­lel. For nicety, we can add an echo activ­ity before and after the count activ­i­ties so that we can print the input URL and the results to the screen. So you will end up with a work­flow that per­haps looks like this:

image

The imple­men­ta­tion of this work­flow is as follows:

Untitled

 1: #r "bin/Release/AWSSDK.dll"
 2: #r "bin/Release/SWF.Extensions.Core.dll"
 3: 
 4: open Amazon.SimpleWorkflow
 5: open Amazon.SimpleWorkflow.Extensions
 6: 
 7: open System.Collections.Generic
 8: open System.Net
 9: 
10: let echo str = printfn "%s" str; str
11: 
12: // a function to count the number of occurances of a pattern inside the HTML returned
13: // by the specified URL address
14: let countMatches (pattern : string) (address : string) =
15:     let webClient = new WebClient()
16:     let html = webClient.DownloadString address
17: 
18:     seq { 0..html.Length - pattern.Length }
19:     |> Seq.map (fun i -> html.Substring(i, pattern.Length))
20:     |> Seq.filter ((=) pattern)
21:     |> Seq.length
22: 
23: let echoActivity = Activity(
24:                         "echo", "echo input", echo,
25:                         taskHeartbeatTimeout       = 60, 
26:                         taskScheduleToStartTimeout = 10,
27:                         taskStartToCloseTimeout    = 10, 
28:                         taskScheduleToCloseTimeout = 20)
29: 
30: let countDivs = Activity<string, int>(
31:                         "count_divs", "count the number of <div> elements", 
32:                         countMatches "<div",
33:                         taskHeartbeatTimeout       = 60, 
34:                         taskScheduleToStartTimeout = 10,
35:                         taskStartToCloseTimeout    = 10, 
36:                         taskScheduleToCloseTimeout = 20)
37: 
38: let countScripts = Activity<string, int>(
39:                         "count_scripts", "count the number of <script> elements", 
40:                         countMatches "<script",
41:                         taskHeartbeatTimeout       = 60, 
42:                         taskScheduleToStartTimeout = 10,
43:                         taskStartToCloseTimeout    = 10, 
44:                         taskScheduleToCloseTimeout = 20)
45: 
46: let countSpans = Activity<string, int>(
47:                         "count_spans", "count the number of <span> elements", 
48:                         countMatches "<span",
49:                         taskHeartbeatTimeout       = 60, 
50:                         taskScheduleToStartTimeout = 10,
51:                         taskStartToCloseTimeout    = 10, 
52:                         taskScheduleToCloseTimeout = 20)
53: 
54: let countActivities = [| countDivs      :> ISchedulable
55:                          countScripts   :> ISchedulable
56:                          countSpans     :> ISchedulable |]
57: 
58: let countReducer (results : Dictionary<int, string>) =
59:     sprintf "Divs : %d\nScripts : %d\nSpans : %d\n" (int results.[0]) (int results.[1]) (int results.[2])
60: 
61: let countElementsWorkflow = 
62:     Workflow(domain = "theburningmonk.com", name = "count_html_elements", 
63:              description = "this workflow counts", 
64:              version = "1")
65:     ++> echoActivity
66:     ++> (countActivities, countReducer)
67:     ++> echoActivity
68: 
69: let awsKey      = "PUT-YOUR-AWS-KEY-HERE"
70: let awsSecret   = "PUT-YOUR-AWS-SECRET-HERE"
71: let client = new AmazonSimpleWorkflowClient(awsKey, awsSecret)
72: 
73: countElementsWorkflow.Start(client)

name­space Amazon
name­space Amazon.SimpleWorkflow
name­space Amazon.SimpleWorkflow.Extensions
name­space System
name­space System.Collections
name­space System.Collections.Generic
name­space System.Net
val echo : str:string -> string

Full name: ParallelizeActivities.echo

val str : string
val printfn : format:Printf.TextWriterFormat<‘T> -> ‘T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn

val count­Matches : pattern:string -> address:string -> int

Full name: ParallelizeActivities.countMatches

val pat­tern : string
Mul­ti­ple items

val string : value:‘T -> string

Full name: Microsoft.FSharp.Core.Operators.string

——————–

type string = System.String

Full name: Microsoft.FSharp.Core.string

val address : string
val web­Client : WebClient
Mul­ti­ple items

type Web­Client =

  inherit Component

  new : unit -> WebClient

  mem­ber AllowRead­Stream­Buffer­ing : bool with get, set

  mem­ber AllowWriteStream­Buffer­ing : bool with get, set

  mem­ber BaseAd­dress : string with get, set

  mem­ber CacheP­ol­icy : Request­CacheP­ol­icy with get, set

  mem­ber Can­ce­lA­sync : unit -> unit

  mem­ber Cre­den­tials : ICre­den­tials with get, set

  mem­ber Down­load­Data : address:string -> byte[] + 1 overload

  mem­ber Down­load­DataA­sync : address:Uri -> unit + 1 overload

  mem­ber Down­load­DataTaskA­sync : address:string -> Task<byte[]> + 1 overload

  …

Full name: System.Net.WebClient

——————–

Web­Client() : unit

val html : string
WebClient.DownloadString(address: System.Uri) : string

WebClient.DownloadString(address: string) : string
Mul­ti­ple items

val seq : sequence:seq<‘T> -> seq<‘T>

Full name: Microsoft.FSharp.Core.Operators.seq

——————–

type seq<‘T> = IEnumerable<‘T>

Full name: Microsoft.FSharp.Collections.seq<_>

prop­erty System.String.Length: int
mod­ule Seq

from Microsoft.FSharp.Collections

val map : mapping:(‘T -> ‘U) -> source:seq<‘T> -> seq<‘U>

Full name: Microsoft.FSharp.Collections.Seq.map

val i : int
System.String.Substring(startIndex: int) : string

System.String.Substring(startIndex: int, length: int) : string
val fil­ter : predicate:(‘T -> bool) -> source:seq<‘T> -> seq<‘T>

Full name: Microsoft.FSharp.Collections.Seq.filter

val length : source:seq<‘T> -> int

Full name: Microsoft.FSharp.Collections.Seq.length

val echoAc­tiv­ity : Activity<string,string>

Full name: ParallelizeActivities.echoActivity

Mul­ti­ple items

type Activ­ity = Activity<string,string>

Full name: Amazon.SimpleWorkflow.Extensions.Activity

——————–

new : name:obj * description:obj * processor:System.Func<‘TInput,‘TOutput> * taskHeartbeatTimeout:obj * taskScheduleToStartTimeout:obj * taskStartToCloseTimeout:obj * taskScheduleToCloseTimeout:obj * ?taskList:obj -> Activity<‘TInput,‘TOutput>

new : name:string * description:string * processor:(‘TInput -> ‘TOut­put) * taskHeartbeatTimeout:Model.Seconds * taskScheduleToStartTimeout:Model.Seconds * taskStartToCloseTimeout:Model.Seconds * taskScheduleToCloseTimeout:Model.Seconds * ?taskList:string * ?maxAttempts:int -> Activity<‘TInput,‘TOutput>

val count­Divs : Activity<string,int>

Full name: ParallelizeActivities.countDivs

Mul­ti­ple items

val int : value:‘T -> int (requires mem­ber op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

——————–

type int = int32

Full name: Microsoft.FSharp.Core.int

——————–

type int<‘Measure> = int

Full name: Microsoft.FSharp.Core.int<_>

val countScripts : Activity<string,int>

Full name: ParallelizeActivities.countScripts

val countSpans : Activity<string,int>

Full name: ParallelizeActivities.countSpans

val coun­tAc­tiv­i­ties : ISchedulable []

Full name: ParallelizeActivities.countActivities

type ISchedu­la­ble =

  interface

    abstract mem­ber Descrip­tion : string

    abstract mem­ber Max­At­tempts : int

    abstract mem­ber Name : string

  end

Full name: Amazon.SimpleWorkflow.Extensions.ISchedulable

val coun­tRe­ducer : results:Dictionary<int,string> -> string

Full name: ParallelizeActivities.countReducer

val results : Dictionary<int,string>
Mul­ti­ple items

type Dictionary<‘TKey,‘TValue> =

  new : unit -> Dictionary<‘TKey, ‘TValue> + 5 overloads

  mem­ber Add : key:‘TKey * value:‘TValue -> unit

  mem­ber Clear : unit -> unit

  mem­ber Com­parer : IEqualityComparer<‘TKey>

  mem­ber Con­tainsKey : key:‘TKey -> bool

  mem­ber Con­tainsValue : value:‘TValue -> bool

  mem­ber Count : int

  mem­ber GetEnu­mer­a­tor : unit -> Enumerator<‘TKey, ‘TValue>

  mem­ber GetO­b­ject­Data : info:SerializationInfo * context:StreamingContext -> unit

  mem­ber Item : ‘TKey -> ‘TValue with get, set

  …

  nested type Enumerator

  nested type KeyCollection

  nested type ValueCollection

Full name: System.Collections.Generic.Dictionary<_,_>

——————–

Dic­tio­nary() : unit

Dictionary(capacity: int) : unit

Dictionary(comparer: IEqualityComparer<‘TKey>) : unit

Dictionary(dictionary: IDictionary<‘TKey,‘TValue>) : unit

Dictionary(capacity: int, com­parer: IEqualityComparer<‘TKey>) : unit

Dictionary(dictionary: IDictionary<‘TKey,‘TValue>, com­parer: IEqualityComparer<‘TKey>) : unit

val sprintf : format:Printf.StringFormat<‘T> -> ‘T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf

val coun­tEle­mentsWork­flow : Workflow

Full name: ParallelizeActivities.countElementsWorkflow

Mul­ti­ple items

type Work­flow =

  inter­face IWorkflow

  new : domain:string * name:string * description:string * version:string * ?taskList:string * ?stages:Stage list * ?taskStartToCloseTimeout:Seconds * ?execStartToCloseTimeout:Seconds * ?childPolicy:ChildPolicy * ?identity:Identity * ?maxAttempts:int -> Workflow

  mem­ber pri­vate Append : toStageAction:(‘a -> Stage­Ac­tion) * args:‘a -> Workflow

  mem­ber Start : swfClt:AmazonSimpleWorkflowClient -> unit

  mem­ber add_OnActivityFailed : Handler<Domain * Name * Activ­i­tyId * Details option * Rea­son option> -> unit

  mem­ber add_OnActivityTaskError : Handler<Exception> -> unit

  mem­ber add_OnDecisionTaskError : Handler<Exception> -> unit

  mem­ber add_OnWorkflowCompleted : Handler<Domain * Name> -> unit

  mem­ber add_OnWorkflowFailed : Handler<Domain * Name * RunId * Details option * Rea­son option> -> unit

  mem­ber Num­berOf­Stages : int

  …

Full name: Amazon.SimpleWorkflow.Extensions.Workflow

——————–

new : domain:string * name:string * description:string * version:string * ?taskList:string * ?stages:Stage list * ?taskStartToCloseTimeout:Model.Seconds * ?execStartToCloseTimeout:Model.Seconds * ?childPolicy:Model.ChildPolicy * ?identity:Model.Identity * ?maxAttempts:int -> Workflow

val awsKey : string

Full name: ParallelizeActivities.awsKey

val awsSe­cret : string

Full name: ParallelizeActivities.awsSecret

val client : AmazonSimpleWorkflowClient

Full name: ParallelizeActivities.client

Mul­ti­ple items

type Ama­zon­Sim­ple­Work­flow­Client =

  inherit AmazonWebServiceClient

  new : unit -> Ama­zon­Sim­ple­Work­flow­Client + 11 overloads

  mem­ber Begin­Count­Closed­Work­flowEx­e­cu­tions : countClosedWorkflowExecutionsRequest:CountClosedWorkflowExecutionsRequest * callback:AsyncCallback * state:obj -> IAsyncResult

  mem­ber Begin­Coun­tOpen­Work­flowEx­e­cu­tions : countOpenWorkflowExecutionsRequest:CountOpenWorkflowExecutionsRequest * callback:AsyncCallback * state:obj -> IAsyncResult

  mem­ber Begin­Count­Pendin­gAc­tiv­i­ty­Tasks : countPendingActivityTasksRequest:CountPendingActivityTasksRequest * callback:AsyncCallback * state:obj -> IAsyncResult

  mem­ber Begin­Count­Pend­ingDe­ci­sion­Tasks : countPendingDecisionTasksRequest:CountPendingDecisionTasksRequest * callback:AsyncCallback * state:obj -> IAsyncResult

  mem­ber Begin­Dep­re­cate­Ac­tiv­i­ty­Type : deprecateActivityTypeRequest:DeprecateActivityTypeRequest * callback:AsyncCallback * state:obj -> IAsyncResult

  mem­ber Begin­Dep­re­cate­Do­main : deprecateDomainRequest:DeprecateDomainRequest * callback:AsyncCallback * state:obj -> IAsyncResult

  mem­ber Begin­Dep­re­cate­Work­flow­Type : deprecateWorkflowTypeRequest:DeprecateWorkflowTypeRequest * callback:AsyncCallback * state:obj -> IAsyncResult

  mem­ber Begin­De­scribeAc­tiv­i­ty­Type : describeActivityTypeRequest:DescribeActivityTypeRequest * callback:AsyncCallback * state:obj -> IAsyncResult

  mem­ber Begin­De­scribeDo­main : describeDomainRequest:DescribeDomainRequest * callback:AsyncCallback * state:obj -> IAsyncResult

  …

Full name: Amazon.SimpleWorkflow.AmazonSimpleWorkflowClient

——————–

Ama­zon­Sim­ple­Work­flow­Client() : unit

   (+0 other overloads)

AmazonSimpleWorkflowClient(region: Amazon.RegionEndpoint) : unit

   (+0 other overloads)

AmazonSimpleWorkflowClient(config: Ama­zon­Sim­ple­Work­flow­Con­fig) : unit

   (+0 other overloads)

AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials) : unit

   (+0 other overloads)

AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials, region: Amazon.RegionEndpoint) : unit

   (+0 other overloads)

AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials, client­Con­fig: Ama­zon­Sim­ple­Work­flow­Con­fig) : unit

   (+0 other overloads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSec­re­tAc­cessKey: string) : unit

   (+0 other overloads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSec­re­tAc­cessKey: string, region: Amazon.RegionEndpoint) : unit

   (+0 other overloads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSec­re­tAc­cessKey: string, client­Con­fig: Ama­zon­Sim­ple­Work­flow­Con­fig) : unit

   (+0 other overloads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSec­re­tAc­cessKey: string, awsSes­sion­To­ken: string) : unit

   (+0 other overloads)

mem­ber Workflow.Start : swfClt:AmazonSimpleWorkflowClient -> unit

Thanks to Tomas Pet­ricek’s FSharp.Formatting project I’m now able to pro­vide code snip­pets with intel­lisense! Tomas, you rock!

 

Run­ning the above exam­ple and start­ing a work­flow exe­cu­tion with the input

http://www.bing.com

out­puts the fol­low­ing to the console:

image

If you take a look at the his­tory of events below, the deci­sion task fol­low­ing the com­ple­tion of stage 0 (the first echo activ­ity) was com­pleted with the state (tracked in the Exe­cu­tion Con­text property):

{“CurrentStageNumber”:1,“NumberOfActions”:3,“Results”:{}}

With­out going into too much details on the inner work­ings of the gen­er­ated decider, this JSON seri­al­ized state tells us that the work­flow has moved into stage no. 1, where there are a total of 3 actions, each rep­re­sented by an activ­ity task to count a par­tic­u­lar type of HTML element.

image

Switch­ing to the Activ­i­ties tab, you can see that 3 activ­i­ties were com­pleted at stage index 1 of the count_html_elements work­flow, judg­ing by the Activ­ity ID and Ver­sion of the 3 activities:

image

Caveats

Look­ing at the exam­ple code, a cou­ple of ques­tions jump out straight away:

Q. What is the ISchedu­la­ble interface?

The ISchedu­la­ble inter­face rep­re­sents any­thing that can be sched­uled as a part of a work­flow, i.e. an activ­ity or a child work­flow. Both IAc­tiv­ity and IWork­flow inher­its from it though in all the exam­ples so far we’ve only worked directly against the con­cept imple­men­ta­tion types for these two interfaces.

 

Q. Why does the reducer take a Dictionary<int, string>?

A. As far as the reducer is con­cerned, it prob­a­bly doesn’t need to be. The main rea­son I’ve used a dic­tio­nary here is that when inter­me­di­ate results are avail­able (e.g. 2 out of 3 par­al­lel activ­i­ties have com­pleted) I wanted to be able to show the cur­rent set of results in the Exe­cu­tion Con­text for the work­flow exe­cu­tion (see screen­shot above). Because we don’t have all the results back, so I needed to be able to show the result against the orig­i­nat­ing cativ­ity, hence why a dic­tio­nary where the key is the zero-based index of the activ­ity in the input array and the value is the string rep­re­sen­ta­tion of the result.

 

Q. Why then, are the Dic­tio­nary’s value strings when the sched­uled activ­ity can be generic and return arbi­trary types?

Because not all the activ­i­ties have to return the same type.

Under the hood the generic Activity<TInput, TOut­put> mar­shals data to and from JSON strings using ServiceStack.Text JSON seri­al­izer, and the Activ­ity type is just a spe­cial case where both TIn­put and TOut­put are strings.

When the result is recorded and retrieved via SWF, they’re already in string for­mat, although it’s pos­si­ble to inspect the orig­i­nat­ing activity’s generic type para­me­ters to work out the returned type, to cater for dif­fer­ent return types, the dic­tio­nary would need to be a Dictionary<int, object> instead, which is not any better.

 

Q. So what if I want to return any­thing other than a string from the reducer function?

For now, you can use the ServiceStack.Text JSON seri­al­izer (for bet­ter com­pat­i­bil­ity) to seri­al­ize the return value to string your­self, I’ll add sup­port for the library to do this auto­mat­i­cally in ver­sion 1.1.0 release. It skipped my mind at the time, sorry…

 

Next

In the next post, I’ll show you how you can add child work­flows in the mix. As I’ve men­tioned above, work­flows also imple­ment the ISchedu­la­ble inter­face and can be sched­uled into the work­flow in the same way as activities.

To find out the lat­est announce­ments and updates on the Amazon.SimpleWorkflow.Extensions project, please fol­low the offi­cial twit­ter account @swf_extensions, and as always, your feed­backs and com­ments on the project will be much appreciated!

Share

One Response to “Introduction to AWS SimpleWorkflow Extensions Part 3 – Parallelizing activities”

Leave a Reply