Introduction to AWS SimpleWorkflow Extensions Part 3 – Parallelizing activities

The series so far:

  1. Hello World example
  2. Beyond Hello World

 

Within a workflow, not all activities have to be performed sequentially. In fact, to increase throughput and/or reduce the overall time required to finish a workflow, you might want to perform several activities in parallel provided that they don’t have any inter-dependencies and can be performed independently.

In this post we’re going to see how we can use the SWF extensions library to parallelize activities by scheduling several activity tasks at a single step and then aggregate their results into a singular input to the next activity in the workflow.

image

The parallelized activities receive their input from either:

  1. the workflow execution’s input if this is the first step of the workflow, or
  2. the result of the preceding activity/child workflow in the workflow

As of now, the library requires you to specify a ‘reducer’ which is responsible for aggregating the results of the parallel activities into a single string which is returned as the result of the step in the workflow. There are some caveats to this reducer function (of signature Dictionary<int, string> –> string right now) as you will see in the example, I’ll look to address these oddities and clean up the API in future versions of the library, so please bear with me for now.

The aggregate result of these parallel activities can then be passed along as the input to the subsequent activity as per the example in my previous post.

Example : Count HTML element types

Suppose that, given a URL, you want to count the number of different HTML elements (e.g. <div>, <span>, …) the returned HTML contains, the counting of each element type is independent and can be carried out in parallel. For nicety, we can add an echo activity before and after the count activities so that we can print the input URL and the results to the screen. So you will end up with a workflow that perhaps looks like this:

image

The implementation of this workflow is as follows:

Untitled

 1: #r "bin/Release/AWSSDK.dll"
 2: #r "bin/Release/SWF.Extensions.Core.dll"
 3: 
 4: open Amazon.SimpleWorkflow
 5: open Amazon.SimpleWorkflow.Extensions
 6: 
 7: open System.Collections.Generic
 8: open System.Net
 9: 
10: let echo str = printfn "%s" str; str
11: 
12: // a function to count the number of occurances of a pattern inside the HTML returned
13: // by the specified URL address
14: let countMatches (pattern : string) (address : string) =
15:     let webClient = new WebClient()
16:     let html = webClient.DownloadString address
17: 
18:     seq { 0..html.Length - pattern.Length }
19:     |> Seq.map (fun i -> html.Substring(i, pattern.Length))
20:     |> Seq.filter ((=) pattern)
21:     |> Seq.length
22: 
23: let echoActivity = Activity(
24:                         "echo", "echo input", echo,
25:                         taskHeartbeatTimeout       = 60, 
26:                         taskScheduleToStartTimeout = 10,
27:                         taskStartToCloseTimeout    = 10, 
28:                         taskScheduleToCloseTimeout = 20)
29: 
30: let countDivs = Activity<string, int>(
31:                         "count_divs", "count the number of <div> elements", 
32:                         countMatches "<div",
33:                         taskHeartbeatTimeout       = 60, 
34:                         taskScheduleToStartTimeout = 10,
35:                         taskStartToCloseTimeout    = 10, 
36:                         taskScheduleToCloseTimeout = 20)
37: 
38: let countScripts = Activity<string, int>(
39:                         "count_scripts", "count the number of <script> elements", 
40:                         countMatches "<script",
41:                         taskHeartbeatTimeout       = 60, 
42:                         taskScheduleToStartTimeout = 10,
43:                         taskStartToCloseTimeout    = 10, 
44:                         taskScheduleToCloseTimeout = 20)
45: 
46: let countSpans = Activity<string, int>(
47:                         "count_spans", "count the number of <span> elements", 
48:                         countMatches "<span",
49:                         taskHeartbeatTimeout       = 60, 
50:                         taskScheduleToStartTimeout = 10,
51:                         taskStartToCloseTimeout    = 10, 
52:                         taskScheduleToCloseTimeout = 20)
53: 
54: let countActivities = [| countDivs      :> ISchedulable
55:                          countScripts   :> ISchedulable
56:                          countSpans     :> ISchedulable |]
57: 
58: let countReducer (results : Dictionary<int, string>) =
59:     sprintf "Divs : %d\nScripts : %d\nSpans : %d\n" (int results.[0]) (int results.[1]) (int results.[2])
60: 
61: let countElementsWorkflow = 
62:     Workflow(domain = "theburningmonk.com", name = "count_html_elements", 
63:              description = "this workflow counts", 
64:              version = "1")
65:     ++> echoActivity
66:     ++> (countActivities, countReducer)
67:     ++> echoActivity
68: 
69: let awsKey      = "PUT-YOUR-AWS-KEY-HERE"
70: let awsSecret   = "PUT-YOUR-AWS-SECRET-HERE"
71: let client = new AmazonSimpleWorkflowClient(awsKey, awsSecret)
72: 
73: countElementsWorkflow.Start(client)

namespace Amazon
namespace Amazon.SimpleWorkflow
namespace Amazon.SimpleWorkflow.Extensions
namespace System
namespace System.Collections
namespace System.Collections.Generic
namespace System.Net
val echo : str:string -> string

Full name: ParallelizeActivities.echo

val str : string
val printfn : format:Printf.TextWriterFormat<‘T> -> ‘TFull name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val countMatches : pattern:string -> address:string -> intFull name: ParallelizeActivities.countMatches
val pattern : string
Multiple items

val string : value:’T -> stringFull name: Microsoft.FSharp.Core.Operators.string

——————–

type string = System.String

Full name: Microsoft.FSharp.Core.string

val address : string
val webClient : WebClient
Multiple items

type WebClient =  inherit Component

new : unit -> WebClient

member AllowReadStreamBuffering : bool with get, set

member AllowWriteStreamBuffering : bool with get, set

member BaseAddress : string with get, set

member CachePolicy : RequestCachePolicy with get, set

member CancelAsync : unit -> unit

member Credentials : ICredentials with get, set

member DownloadData : address:string -> byte[] + 1 overload

member DownloadDataAsync : address:Uri -> unit + 1 overload

member DownloadDataTaskAsync : address:string -> Task<byte[]> + 1 overload

Full name: System.Net.WebClient

——————–

WebClient() : unit

val html : string
WebClient.DownloadString(address: System.Uri) : string

WebClient.DownloadString(address: string) : string

Multiple items

val seq : sequence:seq<‘T> -> seq<‘T>Full name: Microsoft.FSharp.Core.Operators.seq

——————–

type seq<‘T> = IEnumerable<‘T>

Full name: Microsoft.FSharp.Collections.seq<_>

property System.String.Length: int
module Seqfrom Microsoft.FSharp.Collections
val map : mapping:(‘T -> ‘U) -> source:seq<‘T> -> seq<‘U>Full name: Microsoft.FSharp.Collections.Seq.map
val i : int
System.String.Substring(startIndex: int) : string

System.String.Substring(startIndex: int, length: int) : string

val filter : predicate:(‘T -> bool) -> source:seq<‘T> -> seq<‘T>Full name: Microsoft.FSharp.Collections.Seq.filter
val length : source:seq<‘T> -> intFull name: Microsoft.FSharp.Collections.Seq.length
val echoActivity : Activity<string,string>Full name: ParallelizeActivities.echoActivity
Multiple items

type Activity = Activity<string,string>Full name: Amazon.SimpleWorkflow.Extensions.Activity

——————–

new : name:obj * description:obj * processor:System.Func<‘TInput,’TOutput> * taskHeartbeatTimeout:obj * taskScheduleToStartTimeout:obj * taskStartToCloseTimeout:obj * taskScheduleToCloseTimeout:obj * ?taskList:obj -> Activity<‘TInput,’TOutput>

new : name:string * description:string * processor:(‘TInput -> ‘TOutput) * taskHeartbeatTimeout:Model.Seconds * taskScheduleToStartTimeout:Model.Seconds * taskStartToCloseTimeout:Model.Seconds * taskScheduleToCloseTimeout:Model.Seconds * ?taskList:string * ?maxAttempts:int -> Activity<‘TInput,’TOutput>

val countDivs : Activity<string,int>Full name: ParallelizeActivities.countDivs
Multiple items

val int : value:’T -> int (requires member op_Explicit)Full name: Microsoft.FSharp.Core.Operators.int

——————–

type int = int32

Full name: Microsoft.FSharp.Core.int

——————–

type int<‘Measure> = int

Full name: Microsoft.FSharp.Core.int<_>

val countScripts : Activity<string,int>Full name: ParallelizeActivities.countScripts
val countSpans : Activity<string,int>Full name: ParallelizeActivities.countSpans
val countActivities : ISchedulable []Full name: ParallelizeActivities.countActivities
type ISchedulable =

interface    abstract member Description : string

abstract member MaxAttempts : int

abstract member Name : string

end

Full name: Amazon.SimpleWorkflow.Extensions.ISchedulable

val countReducer : results:Dictionary<int,string> -> stringFull name: ParallelizeActivities.countReducer
val results : Dictionary<int,string>
Multiple items

type Dictionary<‘TKey,’TValue> =  new : unit -> Dictionary<‘TKey, ‘TValue> + 5 overloads

member Add : key:’TKey * value:’TValue -> unit

member Clear : unit -> unit

member Comparer : IEqualityComparer<‘TKey>

member ContainsKey : key:’TKey -> bool

member ContainsValue : value:’TValue -> bool

member Count : int

member GetEnumerator : unit -> Enumerator<‘TKey, ‘TValue>

member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit

member Item : ‘TKey -> ‘TValue with get, set

nested type Enumerator

nested type KeyCollection

nested type ValueCollection

Full name: System.Collections.Generic.Dictionary<_,_>

——————–

Dictionary() : unit

Dictionary(capacity: int) : unit

Dictionary(comparer: IEqualityComparer<‘TKey>) : unit

Dictionary(dictionary: IDictionary<‘TKey,’TValue>) : unit

Dictionary(capacity: int, comparer: IEqualityComparer<‘TKey>) : unit

Dictionary(dictionary: IDictionary<‘TKey,’TValue>, comparer: IEqualityComparer<‘TKey>) : unit

val sprintf : format:Printf.StringFormat<‘T> -> ‘TFull name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val countElementsWorkflow : WorkflowFull name: ParallelizeActivities.countElementsWorkflow
Multiple items

type Workflow =  interface IWorkflow

new : domain:string * name:string * description:string * version:string * ?taskList:string * ?stages:Stage list * ?taskStartToCloseTimeout:Seconds * ?execStartToCloseTimeout:Seconds * ?childPolicy:ChildPolicy * ?identity:Identity * ?maxAttempts:int -> Workflow

member private Append : toStageAction:(‘a -> StageAction) * args:’a -> Workflow

member Start : swfClt:AmazonSimpleWorkflowClient -> unit

member add_OnActivityFailed : Handler<Domain * Name * ActivityId * Details option * Reason option> -> unit

member add_OnActivityTaskError : Handler<Exception> -> unit

member add_OnDecisionTaskError : Handler<Exception> -> unit

member add_OnWorkflowCompleted : Handler<Domain * Name> -> unit

member add_OnWorkflowFailed : Handler<Domain * Name * RunId * Details option * Reason option> -> unit

member NumberOfStages : int

Full name: Amazon.SimpleWorkflow.Extensions.Workflow

——————–

new : domain:string * name:string * description:string * version:string * ?taskList:string * ?stages:Stage list * ?taskStartToCloseTimeout:Model.Seconds * ?execStartToCloseTimeout:Model.Seconds * ?childPolicy:Model.ChildPolicy * ?identity:Model.Identity * ?maxAttempts:int -> Workflow

val awsKey : stringFull name: ParallelizeActivities.awsKey
val awsSecret : stringFull name: ParallelizeActivities.awsSecret
val client : AmazonSimpleWorkflowClientFull name: ParallelizeActivities.client
Multiple items

type AmazonSimpleWorkflowClient =  inherit AmazonWebServiceClient

new : unit -> AmazonSimpleWorkflowClient + 11 overloads

member BeginCountClosedWorkflowExecutions : countClosedWorkflowExecutionsRequest:CountClosedWorkflowExecutionsRequest * callback:AsyncCallback * state:obj -> IAsyncResult

member BeginCountOpenWorkflowExecutions : countOpenWorkflowExecutionsRequest:CountOpenWorkflowExecutionsRequest * callback:AsyncCallback * state:obj -> IAsyncResult

member BeginCountPendingActivityTasks : countPendingActivityTasksRequest:CountPendingActivityTasksRequest * callback:AsyncCallback * state:obj -> IAsyncResult

member BeginCountPendingDecisionTasks : countPendingDecisionTasksRequest:CountPendingDecisionTasksRequest * callback:AsyncCallback * state:obj -> IAsyncResult

member BeginDeprecateActivityType : deprecateActivityTypeRequest:DeprecateActivityTypeRequest * callback:AsyncCallback * state:obj -> IAsyncResult

member BeginDeprecateDomain : deprecateDomainRequest:DeprecateDomainRequest * callback:AsyncCallback * state:obj -> IAsyncResult

member BeginDeprecateWorkflowType : deprecateWorkflowTypeRequest:DeprecateWorkflowTypeRequest * callback:AsyncCallback * state:obj -> IAsyncResult

member BeginDescribeActivityType : describeActivityTypeRequest:DescribeActivityTypeRequest * callback:AsyncCallback * state:obj -> IAsyncResult

member BeginDescribeDomain : describeDomainRequest:DescribeDomainRequest * callback:AsyncCallback * state:obj -> IAsyncResult

Full name: Amazon.SimpleWorkflow.AmazonSimpleWorkflowClient

——————–

AmazonSimpleWorkflowClient() : unit

(+0 other overloads)

AmazonSimpleWorkflowClient(region: Amazon.RegionEndpoint) : unit

(+0 other overloads)

AmazonSimpleWorkflowClient(config: AmazonSimpleWorkflowConfig) : unit

(+0 other overloads)

AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials) : unit

(+0 other overloads)

AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials, region: Amazon.RegionEndpoint) : unit

(+0 other overloads)

AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials, clientConfig: AmazonSimpleWorkflowConfig) : unit

(+0 other overloads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSecretAccessKey: string) : unit

(+0 other overloads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSecretAccessKey: string, region: Amazon.RegionEndpoint) : unit

(+0 other overloads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSecretAccessKey: string, clientConfig: AmazonSimpleWorkflowConfig) : unit

(+0 other overloads)

AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSecretAccessKey: string, awsSessionToken: string) : unit

(+0 other overloads)

member Workflow.Start : swfClt:AmazonSimpleWorkflowClient -> unit

Thanks to Tomas Petricek’s FSharp.Formatting project I’m now able to provide code snippets with intellisense! Tomas, you rock!

 

Running the above example and starting a workflow execution with the input

http://www.bing.com

outputs the following to the console:

image

If you take a look at the history of events below, the decision task following the completion of stage 0 (the first echo activity) was completed with the state (tracked in the Execution Context property):

{“CurrentStageNumber”:1,”NumberOfActions”:3,”Results”:{}}

Without going into too much details on the inner workings of the generated decider, this JSON serialized state tells us that the workflow has moved into stage no. 1, where there are a total of 3 actions, each represented by an activity task to count a particular type of HTML element.

image

Switching to the Activities tab, you can see that 3 activities were completed at stage index 1 of the count_html_elements workflow, judging by the Activity ID and Version of the 3 activities:

image

Caveats

Looking at the example code, a couple of questions jump out straight away:

Q. What is the ISchedulable interface?

The ISchedulable interface represents anything that can be scheduled as a part of a workflow, i.e. an activity or a child workflow. Both IActivity and IWorkflow inherits from it though in all the examples so far we’ve only worked directly against the concept implementation types for these two interfaces.

 

Q. Why does the reducer take a Dictionary<int, string>?

A. As far as the reducer is concerned, it probably doesn’t need to be. The main reason I’ve used a dictionary here is that when intermediate results are available (e.g. 2 out of 3 parallel activities have completed) I wanted to be able to show the current set of results in the Execution Context for the workflow execution (see screenshot above). Because we don’t have all the results back, so I needed to be able to show the result against the originating cativity, hence why a dictionary where the key is the zero-based index of the activity in the input array and the value is the string representation of the result.

 

Q. Why then, are the Dictionary’s value strings when the scheduled activity can be generic and return arbitrary types?

Because not all the activities have to return the same type.

Under the hood the generic Activity<TInput, TOutput> marshals data to and from JSON strings using ServiceStack.Text JSON serializer, and the Activity type is just a special case where both TInput and TOutput are strings.

When the result is recorded and retrieved via SWF, they’re already in string format, although it’s possible to inspect the originating activity’s generic type parameters to work out the returned type, to cater for different return types, the dictionary would need to be a Dictionary<int, object> instead, which is not any better.

 

Q. So what if I want to return anything other than a string from the reducer function?

For now, you can use the ServiceStack.Text JSON serializer (for better compatibility) to serialize the return value to string yourself, I’ll add support for the library to do this automatically in version 1.1.0 release. It skipped my mind at the time, sorry…

 

Next

In the next post, I’ll show you how you can add child workflows in the mix. As I’ve mentioned above, workflows also implement the ISchedulable interface and can be scheduled into the workflow in the same way as activities.

To find out the latest announcements and updates on the Amazon.SimpleWorkflow.Extensions project, please follow the official twitter account @swf_extensions, and as always, your feedbacks and comments on the project will be much appreciated!

  • Pingback: Introduction to AWS SimpleWorkflow Extensions Part 1 – Hello World example | theburningmonk.com()

  • Pingback: Introduction to AWS SimpleWorkflow Extensions Part 2 – Beyond Hello World | theburningmonk.com()

  • ????????? ???????

    Is your declarative workflow design scalable to many machines? It seems like your decider is hardcoded in workflow definition. Also how can I create child activities on-fly? Like I need to parse workflow input data collection and for each element ruan a parallel activity

  • Yan Cui

    Just realised the code snippet’s formatting is all messed up when I went to a new blog theme, sorry about that!

    For your questions:
    1) does it scale to many machines?

    Yes, SWF is task based, so just run the same code (ie. declaring your workflow and starting it) on every machine and both the decider & activity worker will run on each machine.

    2) can I create child activities on the fly? Like I need to parse workflow input data collection and for each element run a parallel activity.

    There’s no support for `Parallel.forEach` kinda parallelism in the library.

  • ????????? ???????

    Why do you pick the first matched event only and don’t iterate through ALL the event history from workflow? Here’s your code:

    let events = task.Events

    let keyEvt = events |> Seq.pick (function | KeyEvent evtType -> Some evtType | _ -> None)

  • Yan Cui

    that bit of code is looking for the last important event that requires me to do something in the decision worker – eg to retry a failed activity, schedule a new activity, fail the workflow, etc.