Yan Cui
I help clients go faster for less using serverless technologies.
The series so far:
Within a workflow, not all activities have to be performed sequentially. In fact, to increase throughput and/or reduce the overall time required to finish a workflow, you might want to perform several activities in parallel provided that they don’t have any inter-dependencies and can be performed independently.
In this post we’re going to see how we can use the SWF extensions library to parallelize activities by scheduling several activity tasks at a single step and then aggregate their results into a singular input to the next activity in the workflow.
The parallelized activities receive their input from either:
- the workflow execution’s input if this is the first step of the workflow, or
- the result of the preceding activity/child workflow in the workflow
As of now, the library requires you to specify a ‘reducer’ which is responsible for aggregating the results of the parallel activities into a single string which is returned as the result of the step in the workflow. There are some caveats to this reducer function (of signature Dictionary<int, string> –> string right now) as you will see in the example, I’ll look to address these oddities and clean up the API in future versions of the library, so please bear with me for now.
The aggregate result of these parallel activities can then be passed along as the input to the subsequent activity as per the example in my previous post.
Example : Count HTML element types
Suppose that, given a URL, you want to count the number of different HTML elements (e.g. <div>, <span>, …) the returned HTML contains, the counting of each element type is independent and can be carried out in parallel. For nicety, we can add an echo activity before and after the count activities so that we can print the input URL and the results to the screen. So you will end up with a workflow that perhaps looks like this:
The implementation of this workflow is as follows:
Untitled
1: #r "bin/Release/AWSSDK.dll" 2: #r "bin/Release/SWF.Extensions.Core.dll" 3: 4: open Amazon.SimpleWorkflow 5: open Amazon.SimpleWorkflow.Extensions 6: 7: open System.Collections.Generic 8: open System.Net 9: 10: let echo str = printfn "%s" str; str 11: 12: // a function to count the number of occurances of a pattern inside the HTML returned 13: // by the specified URL address 14: let countMatches (pattern : string) (address : string) = 15: let webClient = new WebClient() 16: let html = webClient.DownloadString address 17: 18: seq { 0..html.Length - pattern.Length } 19: |> Seq.map (fun i -> html.Substring(i, pattern.Length)) 20: |> Seq.filter ((=) pattern) 21: |> Seq.length 22: 23: let echoActivity = Activity( 24: "echo", "echo input", echo, 25: taskHeartbeatTimeout = 60, 26: taskScheduleToStartTimeout = 10, 27: taskStartToCloseTimeout = 10, 28: taskScheduleToCloseTimeout = 20) 29: 30: let countDivs = Activity<string, int>( 31: "count_divs", "count the number of <div> elements", 32: countMatches "<div", 33: taskHeartbeatTimeout = 60, 34: taskScheduleToStartTimeout = 10, 35: taskStartToCloseTimeout = 10, 36: taskScheduleToCloseTimeout = 20) 37: 38: let countScripts = Activity<string, int>( 39: "count_scripts", "count the number of <script> elements", 40: countMatches "<script", 41: taskHeartbeatTimeout = 60, 42: taskScheduleToStartTimeout = 10, 43: taskStartToCloseTimeout = 10, 44: taskScheduleToCloseTimeout = 20) 45: 46: let countSpans = Activity<string, int>( 47: "count_spans", "count the number of <span> elements", 48: countMatches "<span", 49: taskHeartbeatTimeout = 60, 50: taskScheduleToStartTimeout = 10, 51: taskStartToCloseTimeout = 10, 52: taskScheduleToCloseTimeout = 20) 53: 54: let countActivities = [| countDivs :> ISchedulable 55: countScripts :> ISchedulable 56: countSpans :> ISchedulable |] 57: 58: let countReducer (results : Dictionary<int, string>) = 59: sprintf "Divs : %d\nScripts : %d\nSpans : %d\n" (int results.[0]) (int results.[1]) (int results.[2]) 60: 61: let countElementsWorkflow = 62: Workflow(domain = "theburningmonk.com", name = "count_html_elements", 63: description = "this workflow counts", 64: version = "1") 65: ++> echoActivity 66: ++> (countActivities, countReducer) 67: ++> echoActivity 68: 69: let awsKey = "PUT-YOUR-AWS-KEY-HERE" 70: let awsSecret = "PUT-YOUR-AWS-SECRET-HERE" 71: let client = new AmazonSimpleWorkflowClient(awsKey, awsSecret) 72: 73: countElementsWorkflow.Start(client)
Full name: ParallelizeActivities.echo
val string : value:’T -> stringFull name: Microsoft.FSharp.Core.Operators.string
——————–
type string = System.String
Full name: Microsoft.FSharp.Core.string
type WebClient = inherit Component
new : unit -> WebClient
member AllowReadStreamBuffering : bool with get, set
member AllowWriteStreamBuffering : bool with get, set
member BaseAddress : string with get, set
member CachePolicy : RequestCachePolicy with get, set
member CancelAsync : unit -> unit
member Credentials : ICredentials with get, set
member DownloadData : address:string -> byte[] + 1 overload
member DownloadDataAsync : address:Uri -> unit + 1 overload
member DownloadDataTaskAsync : address:string -> Task<byte[]> + 1 overload
…
Full name: System.Net.WebClient
——————–
WebClient() : unit
WebClient.DownloadString(address: string) : string
val seq : sequence:seq<‘T> -> seq<‘T>Full name: Microsoft.FSharp.Core.Operators.seq
——————–
type seq<‘T> = IEnumerable<‘T>
Full name: Microsoft.FSharp.Collections.seq<_>
System.String.Substring(startIndex: int, length: int) : string
type Activity = Activity<string,string>Full name: Amazon.SimpleWorkflow.Extensions.Activity
——————–
new : name:obj * description:obj * processor:System.Func<‘TInput,’TOutput> * taskHeartbeatTimeout:obj * taskScheduleToStartTimeout:obj * taskStartToCloseTimeout:obj * taskScheduleToCloseTimeout:obj * ?taskList:obj -> Activity<‘TInput,’TOutput>
new : name:string * description:string * processor:(‘TInput -> ‘TOutput) * taskHeartbeatTimeout:Model.Seconds * taskScheduleToStartTimeout:Model.Seconds * taskStartToCloseTimeout:Model.Seconds * taskScheduleToCloseTimeout:Model.Seconds * ?taskList:string * ?maxAttempts:int -> Activity<‘TInput,’TOutput>
val int : value:’T -> int (requires member op_Explicit)Full name: Microsoft.FSharp.Core.Operators.int
——————–
type int = int32
Full name: Microsoft.FSharp.Core.int
——————–
type int<‘Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
interface abstract member Description : string
abstract member MaxAttempts : int
abstract member Name : string
end
Full name: Amazon.SimpleWorkflow.Extensions.ISchedulable
type Dictionary<‘TKey,’TValue> = new : unit -> Dictionary<‘TKey, ‘TValue> + 5 overloads
member Add : key:’TKey * value:’TValue -> unit
member Clear : unit -> unit
member Comparer : IEqualityComparer<‘TKey>
member ContainsKey : key:’TKey -> bool
member ContainsValue : value:’TValue -> bool
member Count : int
member GetEnumerator : unit -> Enumerator<‘TKey, ‘TValue>
member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
member Item : ‘TKey -> ‘TValue with get, set
…
nested type Enumerator
nested type KeyCollection
nested type ValueCollection
Full name: System.Collections.Generic.Dictionary<_,_>
——————–
Dictionary() : unit
Dictionary(capacity: int) : unit
Dictionary(comparer: IEqualityComparer<‘TKey>) : unit
Dictionary(dictionary: IDictionary<‘TKey,’TValue>) : unit
Dictionary(capacity: int, comparer: IEqualityComparer<‘TKey>) : unit
Dictionary(dictionary: IDictionary<‘TKey,’TValue>, comparer: IEqualityComparer<‘TKey>) : unit
type Workflow = interface IWorkflow
new : domain:string * name:string * description:string * version:string * ?taskList:string * ?stages:Stage list * ?taskStartToCloseTimeout:Seconds * ?execStartToCloseTimeout:Seconds * ?childPolicy:ChildPolicy * ?identity:Identity * ?maxAttempts:int -> Workflow
member private Append : toStageAction:(‘a -> StageAction) * args:’a -> Workflow
member Start : swfClt:AmazonSimpleWorkflowClient -> unit
member add_OnActivityFailed : Handler<Domain * Name * ActivityId * Details option * Reason option> -> unit
member add_OnActivityTaskError : Handler<Exception> -> unit
member add_OnDecisionTaskError : Handler<Exception> -> unit
member add_OnWorkflowCompleted : Handler<Domain * Name> -> unit
member add_OnWorkflowFailed : Handler<Domain * Name * RunId * Details option * Reason option> -> unit
member NumberOfStages : int
…
Full name: Amazon.SimpleWorkflow.Extensions.Workflow
——————–
new : domain:string * name:string * description:string * version:string * ?taskList:string * ?stages:Stage list * ?taskStartToCloseTimeout:Model.Seconds * ?execStartToCloseTimeout:Model.Seconds * ?childPolicy:Model.ChildPolicy * ?identity:Model.Identity * ?maxAttempts:int -> Workflow
type AmazonSimpleWorkflowClient = inherit AmazonWebServiceClient
new : unit -> AmazonSimpleWorkflowClient + 11 overloads
member BeginCountClosedWorkflowExecutions : countClosedWorkflowExecutionsRequest:CountClosedWorkflowExecutionsRequest * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginCountOpenWorkflowExecutions : countOpenWorkflowExecutionsRequest:CountOpenWorkflowExecutionsRequest * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginCountPendingActivityTasks : countPendingActivityTasksRequest:CountPendingActivityTasksRequest * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginCountPendingDecisionTasks : countPendingDecisionTasksRequest:CountPendingDecisionTasksRequest * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginDeprecateActivityType : deprecateActivityTypeRequest:DeprecateActivityTypeRequest * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginDeprecateDomain : deprecateDomainRequest:DeprecateDomainRequest * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginDeprecateWorkflowType : deprecateWorkflowTypeRequest:DeprecateWorkflowTypeRequest * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginDescribeActivityType : describeActivityTypeRequest:DescribeActivityTypeRequest * callback:AsyncCallback * state:obj -> IAsyncResult
member BeginDescribeDomain : describeDomainRequest:DescribeDomainRequest * callback:AsyncCallback * state:obj -> IAsyncResult
…
Full name: Amazon.SimpleWorkflow.AmazonSimpleWorkflowClient
——————–
AmazonSimpleWorkflowClient() : unit
(+0 other overloads)
AmazonSimpleWorkflowClient(region: Amazon.RegionEndpoint) : unit
(+0 other overloads)
AmazonSimpleWorkflowClient(config: AmazonSimpleWorkflowConfig) : unit
(+0 other overloads)
AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials) : unit
(+0 other overloads)
AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials, region: Amazon.RegionEndpoint) : unit
(+0 other overloads)
AmazonSimpleWorkflowClient(credentials: Amazon.Runtime.AWSCredentials, clientConfig: AmazonSimpleWorkflowConfig) : unit
(+0 other overloads)
AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSecretAccessKey: string) : unit
(+0 other overloads)
AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSecretAccessKey: string, region: Amazon.RegionEndpoint) : unit
(+0 other overloads)
AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSecretAccessKey: string, clientConfig: AmazonSimpleWorkflowConfig) : unit
(+0 other overloads)
AmazonSimpleWorkflowClient(awsAccessKeyId: string, awsSecretAccessKey: string, awsSessionToken: string) : unit
(+0 other overloads)
Thanks to Tomas Petricek’s FSharp.Formatting project I’m now able to provide code snippets with intellisense! Tomas, you rock!
Running the above example and starting a workflow execution with the input
outputs the following to the console:
If you take a look at the history of events below, the decision task following the completion of stage 0 (the first echo activity) was completed with the state (tracked in the Execution Context property):
{“CurrentStageNumber”:1,”NumberOfActions”:3,”Results”:{}}
Without going into too much details on the inner workings of the generated decider, this JSON serialized state tells us that the workflow has moved into stage no. 1, where there are a total of 3 actions, each represented by an activity task to count a particular type of HTML element.
Switching to the Activities tab, you can see that 3 activities were completed at stage index 1 of the count_html_elements workflow, judging by the Activity ID and Version of the 3 activities:
Caveats
Looking at the example code, a couple of questions jump out straight away:
Q. What is the ISchedulable interface?
The ISchedulable interface represents anything that can be scheduled as a part of a workflow, i.e. an activity or a child workflow. Both IActivity and IWorkflow inherits from it though in all the examples so far we’ve only worked directly against the concept implementation types for these two interfaces.
Q. Why does the reducer take a Dictionary<int, string>?
A. As far as the reducer is concerned, it probably doesn’t need to be. The main reason I’ve used a dictionary here is that when intermediate results are available (e.g. 2 out of 3 parallel activities have completed) I wanted to be able to show the current set of results in the Execution Context for the workflow execution (see screenshot above). Because we don’t have all the results back, so I needed to be able to show the result against the originating cativity, hence why a dictionary where the key is the zero-based index of the activity in the input array and the value is the string representation of the result.
Q. Why then, are the Dictionary’s value strings when the scheduled activity can be generic and return arbitrary types?
Because not all the activities have to return the same type.
Under the hood the generic Activity<TInput, TOutput> marshals data to and from JSON strings using ServiceStack.Text JSON serializer, and the Activity type is just a special case where both TInput and TOutput are strings.
When the result is recorded and retrieved via SWF, they’re already in string format, although it’s possible to inspect the originating activity’s generic type parameters to work out the returned type, to cater for different return types, the dictionary would need to be a Dictionary<int, object> instead, which is not any better.
Q. So what if I want to return anything other than a string from the reducer function?
For now, you can use the ServiceStack.Text JSON serializer (for better compatibility) to serialize the return value to string yourself, I’ll add support for the library to do this automatically in version 1.1.0 release. It skipped my mind at the time, sorry…
Next
In the next post, I’ll show you how you can add child workflows in the mix. As I’ve mentioned above, workflows also implement the ISchedulable interface and can be scheduled into the workflow in the same way as activities.
To find out the latest announcements and updates on the Amazon.SimpleWorkflow.Extensions project, please follow the official twitter account @swf_extensions, and as always, your feedbacks and comments on the project will be much appreciated!
Whenever you’re ready, here are 3 ways I can help you:
- Production-Ready Serverless: Join 20+ AWS Heroes & Community Builders and 1000+ other students in levelling up your serverless game. This is your one-stop shop for quickly levelling up your serverless skills.
- I help clients launch product ideas, improve their development processes and upskill their teams. If you’d like to work together, then let’s get in touch.
- Join my community on Discord, ask questions, and join the discussion on all things AWS and Serverless.
Pingback: Introduction to AWS SimpleWorkflow Extensions Part 1 – Hello World example | theburningmonk.com
Pingback: Introduction to AWS SimpleWorkflow Extensions Part 2 – Beyond Hello World | theburningmonk.com
Is your declarative workflow design scalable to many machines? It seems like your decider is hardcoded in workflow definition. Also how can I create child activities on-fly? Like I need to parse workflow input data collection and for each element ruan a parallel activity
Just realised the code snippet’s formatting is all messed up when I went to a new blog theme, sorry about that!
For your questions:
1) does it scale to many machines?
Yes, SWF is task based, so just run the same code (ie. declaring your workflow and starting it) on every machine and both the decider & activity worker will run on each machine.
2) can I create child activities on the fly? Like I need to parse workflow input data collection and for each element run a parallel activity.
There’s no support for `Parallel.forEach` kinda parallelism in the library.
Why do you pick the first matched event only and don’t iterate through ALL the event history from workflow? Here’s your code:
let events = task.Events
let keyEvt = events |> Seq.pick (function | KeyEvent evtType -> Some evtType | _ -> None)
that bit of code is looking for the last important event that requires me to do something in the decision worker – eg to retry a failed activity, schedule a new activity, fail the workflow, etc.