Slides and recording of my Lambda talk at LeetSpeak 2016

AWS Lambda – use recursive function to process SQS messages (Part 2)

First of all, apologies for taking months to write this up since part 1, I have been extremely busy since joining Yubl. We have done a lot of work with AWS Lambda and I hope to share more of the lessons we have learnt soon, but for now take a look at this slidedeck to get a flavour.

 

At the end of part 1 we have a recursive Lambda function that will:

  1. fetch messages from SQS using long-polling
  2. process any received messages
  3. recurse by invoking itself asynchronously

however, on its own there are a number of problems:

  • any unhandled exceptions will kill the loop
  • there’s no way to scale the no. of processing loops elastically

 

We can address these concerns with a simple mechanism where:

  • a DynamoDB table stores a row for each loop that should be running
  • when scaling up, a new row is added to the table, and a new loop is started with the Hash Key (let’s call it a token) as input;
  • on each recursion, the looping function will check if its token is still valid
    • if yes, then the function will upsert a last_used timestamp against its token
    • if not (ie, the token is deleted) then terminates the loop
  • when scaling down, one of the tokens is chosen at random and deleted from the table, which causes the corresponding loop to terminate at the end of its current recursion (see above)
  • another Lambda function is triggered every X mins to look for tokens whose last_used timestamp is older than some threshold, and starts a new loop in its place

Depending on how long it takes the processing function to process all its SQS messages – limited by the max 5 min execution time – you can either adjust the threshold accordingly. Alternatively, you can provide another GUID when starting each loop (say, a loop_id?) and extend the loop’s token check to make sure the loop_id associated with its token matches its own.

 

High Level Overview

To implement this mechanism, we will need a few things:

  • a DynamoDB table to store the tokens
  • a function to look for dead loops restart them
  • a CloudWatch event to trigger the restart function every X mins
  • a function to scale_up the no. of processing loops
  • a function to scale_down the no. of processing loops
  • CloudWatch alarm(s) to trigger scaling activity based on no. of messages in the queue
  • SNS topics for the alarms to publish to, which in turn triggers the scale_up and scale_down functions accordingly*

* the SNS topics are required purely because there’s no support for CloudWatch alarms to trigger Lambda functions directly yet.

architecture

A working code sample is available on github which includes:

  • handler code for each of the aforementioned functions
  • s-resources-cf.json that will create the DynamoDB table when deployed
  • _meta/variables/s-variables-*.json files which also specifies the name of the DynamoDB table (and referenced by s-resources-cf.json above)

I’ll leave you to explore the code sample at your own leisure, and instead touch on a few of the intricacies.

 

Setting up Alarms

Assuming that your queue is mostly empty as your processing is keeping pace with the rate of new items, then a simple staggered set up should suffice here, for instance:

cw-alarm-2

Each of the alarms would send notification to the relevant SNS topic on ALARM and OK.

cw-alarm-1

 

Verifying Tokens

Since there’s no way to message a running Lambda function directly, we need a way to signal it to stop recursing somehow, and this is where the DynamoDB table comes in.

At the start of each recursion, the processing-msg function will check against the table to see if its token still exists. And since the table is also used to record a heartbeat from the function (for the restart function to identify failed recursions), it also needs to upsert the last_used timestamp against the token.

We can combine the two ops in one conditional write, which will fail if the token doesn’t exist, and that error will terminate the loop.

ps. I made the assumption in the sample code that processing the SQS messages is quick, hence why the timeout setting for the process-msg function is 30s (20s long polling + 10s processing) and the restart function’s threshold is a very conservative 2 mins (it can be much shorter even after you take into account the effect of eventual consistency).

If your processing logic can take some time to execute – bear in mind that the max timeout allowed for a Lambda function is 5 mins – then here’s a few options that spring to mind:

  1. adjust the restart function’s threshold to be more than 5 mins : which might not be great as it lengthens your time to recovery when things do go wrong;
  2. periodically update the last_used timestamp during processing : which also needs to be conditional writes, whilst swallowing any errors;
  3. add an loop_id in to the DynamoDB table and include it in the ConditionExpression : that way, you keep the restart function’s threshold low and allow the process-msg function to occasionally overrun; when it does, a new loop is started in its place and takes over its token with a new loop_id so that when the overrunning instance finishes it’ll be stopped when it recurses (and fails to verify its token because the loop_id no longer match)

Both option 2 & 3 strike me as reasonable approaches, depending on whether your processing logic are expected to always run for some time (eg, involves some batch processing) or only in unlikely scenarios (occasionally slow third-party API calls).

 

Scanning for Failed Loops

The restart function performs a table scan against the DynamoDB table to look for tokens whose last_used timestamp is either:

  • not set : the process-msg function never managed to set it during the first recursion, perhaps DynamoDB throttling issue or temporary network issue?
  • older than threshold : the process-msg function has stopped for whatever reason

By default, a Scan operation in DynamoDB uses eventually consistent read, which can fetch data that are a few seconds old. You can set the ConsistentRead parameter to true; or, be more conservative with your thresholds.

Also, there’s a size limit of 1mb of scanned data per Scan request. So you’ll need to perform the Scan operation recursively, see here.

 

Find out QueueURL from SNS Message

In the s-resources-cf.json file, you might have noticed that the DynamoDB table has the queue URL of the SQS queue as hash key. This is so that we could use the same mechanism to scale up/down & restart processing functions for many queues (see the section below).

But this brings up a new question: “when the scale-up/scale-down functions are called (by CloudWatch Alarms, via SNS), how do we work out which queue we’re dealing with?”

When SNS calls our function, the payload looks something like this:

which contains all the information we need to workout the queue URL for the SQS queue in question.

We’re making an assumption here that the triggered CloudWatch Alarm exist in the same account & region as the scale-up and scale-down functions (which is a pretty safe bet I’d say).

 

Knowing when NOT to Scale Down

Finally – and these only apply to the scale-down function – there are two things to keep in mind when scaling down:

  1. leave at least 1 processing loop per queue
  2. ignore OK messages whose old state is not ALARM

The first point is easy, just check if the query result contains more than 1 token.

The second point is necessary because CloudWatch Alarms have 3 states – OK, INSUFFICIENT_DATA and ALARM – and we only want to scale down when transitioned from ALARM => OK.

 

Taking It Further

As you can see, there is a fair bit of set up involved. It’d be a waste if we have to do the same for every SQS queue we want to process.

Fortunately, given the current set up there’s nothing stopping you from using the same infrastructure to manage multiple queues:

  • the DynamoDB table is keyed to the queue URL already, and
  • the scale-up and scale-down functions can already work with CloudWatch Alarms for any SQS queues

Firstly, you’d need to implement additional configuration so that given a queue URL you can work out which Lambda function to invoke to process messages from that queue.

Secondly, you need to decide between:

  1. use the same recursive function to poll SQS, but forward received messages to relevant Lambda functions based on the queue URL, or
  2. duplicate the recursive polling logic in each Lambda function (maybe put them in a npm package so to avoid code duplication)

Depending on the volume of messages you’re dealing with, option 1 has a cost consideration given there’s a Lambda invocation per message (in addition to the polling function which is running non-stop).

 

So that’s it folks, it’s a pretty long post and I hope you find the ideas here useful. If you’ve got any feedbacks or suggestions do feel free to leave them in the comments section below.

Serverless – enable caching on query string parameters in API Gateway

Since I started working at Yubl less than 2 weeks ago, I have been doing a lot of work with Amazon API Gateway & Lambda with the help of the Serverless framework. So far that experience has been really great.

One little caveat I ran into was that, it wasn’t clear on how to enable caching on query string and request path parameters. For instance, if I declare in my s-function.json file that my API Gateway endpoint has a query string parameter called query:

"requestParameters": {
    "integration.request.querystring.query": "method.request.querystring.query"
},

When the endpoint is deployed, you will see in the API Gateway console that ‘Caching’ is not enabled:

f0c8f61c-03fc-11e6-8aef-3f2b197f09b6

Which means when I enable caching on my deployed stage, the query string parameter would not be taken into account.

api-gateway-dev-stage

Users visiting the following URLs would get the same cached response back:

    https://my-awesome-service.com/yubl?query=one-awesome-yubl

    https://my-awesome-service.com/yubl?query=another-awesome-yubl

That’s obviously not OK.

Sadly, the same applied to request path parameters too:

"path": "yubl/{query}",
"method": "GET",
"type": "AWS",

07d58dfa-03ff-11e6-9869-46d4de5c13a2

Thankfully, the guys working on Serverless has been very actively in responding to their issues page and one of the team members picked up my ticket and offered some insight on how to make this work.

API Gateway’s own documentation eludes to two Integration parameters called cacheNamespace and cacheKeyParameters. Although, to say they’re poorly documented would be an understatement…

api-gateway-cache-params

So, after some fiddling around:

  1. create an API with cached query string param
  2. deploy it
  3. export as Swagger + API Gateway Extensions
  4. inspect the JSON to see what value is outputted for cacheNamespace and cacheKeyParameters

api-gateway-cache-params-example

Interestingly, when I put the above value – “method.input.params.query” – in my s-function.json I get the following error during deployment:

Invalid cache key parameter specified

what did work though, is the same value I had specified in my requestParameters dictionary, which is “method.request.querystring.query” in this case. ie

"requestParameters": {
    "integration.request.querystring.query": "method.request.querystring.query"
},
"cacheKeyParametes" [
    "method.request.querystring.query"
]

However, applying the same approaches to request path parameters proved fruitless.. I guess we’ll just have to wait for official documentation to be updated to see how it should work, which should be soon by the sound of things!

UPDATE 17/08/2016 : shortly after I posted this I did find a way to get request path parameters working too (which might have been a result of a Serverless update, though I can’t remember now). For request path parameters you need something along the lines of:

"requestParameters": {
    "integration.request.path.otherUserId": "method.request.path.id"
},
"cacheKeyParametes" [
    "method.request.path.id"
]