AWS Lambda – use recursive function to process SQS messages (Part 2)

First of all, apolo­gies for tak­ing months to write this up since part 1, I have been extreme­ly busy since join­ing Yubl. We have done a lot of work with AWS Lamb­da and I hope to share more of the lessons we have learnt soon, but for now take a look at this slid­edeck to get a flavour.


At the end of part 1 we have a recur­sive Lamb­da func­tion that will:

  1. fetch mes­sages from SQS using long-polling
  2. process any received mes­sages
  3. recurse by invok­ing itself asyn­chro­nous­ly

how­ev­er, on its own there are a num­ber of prob­lems:

  • any unhan­dled excep­tions will kill the loop
  • there’s no way to scale the no. of pro­cess­ing loops elas­ti­cal­ly


We can address these con­cerns with a sim­ple mech­a­nism where:

  • a DynamoDB table stores a row for each loop that should be run­ning
  • when scal­ing up, a new row is added to the table, and a new loop is start­ed with the Hash Key (let’s call it a token) as input;
  • on each recur­sion, the loop­ing func­tion will check if its token is still valid
    • if yes, then the func­tion will upsert a last_used time­stamp against its token
    • if not (ie, the token is delet­ed) then ter­mi­nates the loop
  • when scal­ing down, one of the tokens is cho­sen at ran­dom and delet­ed from the table, which caus­es the cor­re­spond­ing loop to ter­mi­nate at the end of its cur­rent recur­sion (see above)
  • anoth­er Lamb­da func­tion is trig­gered every X mins to look for tokens whose last_used time­stamp is old­er than some thresh­old, and starts a new loop in its place

Depend­ing on how long it takes the pro­cess­ing func­tion to process all its SQS mes­sages — lim­it­ed by the max 5 min exe­cu­tion time — you can either adjust the thresh­old accord­ing­ly. Alter­na­tive­ly, you can pro­vide anoth­er GUID when start­ing each loop (say, a loop_id?) and extend the loop’s token check to make sure the loop_id asso­ci­at­ed with its token match­es its own.


High Level Overview

To imple­ment this mech­a­nism, we will need a few things:

  • a DynamoDB table to store the tokens
  • a func­tion to look for dead loops restart them
  • a Cloud­Watch event to trig­ger the restart func­tion every X mins
  • a func­tion to scale_up the no. of pro­cess­ing loops
  • a func­tion to scale_down the no. of pro­cess­ing loops
  • Cloud­Watch alarm(s) to trig­ger scal­ing activ­i­ty based on no. of mes­sages in the queue
  • SNS top­ics for the alarms to pub­lish to, which in turn trig­gers the scale_up and scale_down func­tions accord­ing­ly*

* the SNS top­ics are required pure­ly because there’s no sup­port for Cloud­Watch alarms to trig­ger Lamb­da func­tions direct­ly yet.


A work­ing code sam­ple is avail­able on github which includes:

  • han­dler code for each of the afore­men­tioned func­tions
  • s-resources-cf.json that will cre­ate the DynamoDB table when deployed
  • _meta/variables/s-variables-*.json files which also spec­i­fies the name of the DynamoDB table (and ref­er­enced by s-resources-cf.json above)

I’ll leave you to explore the code sam­ple at your own leisure, and instead touch on a few of the intri­ca­cies.


Setting up Alarms

Assum­ing that your queue is most­ly emp­ty as your pro­cess­ing is keep­ing pace with the rate of new items, then a sim­ple stag­gered set up should suf­fice here, for instance:


Each of the alarms would send noti­fi­ca­tion to the rel­e­vant SNS top­ic on ALARM and OK.



Verifying Tokens

Since there’s no way to mes­sage a run­ning Lamb­da func­tion direct­ly, we need a way to sig­nal it to stop recurs­ing some­how, and this is where the DynamoDB table comes in.

At the start of each recur­sion, the pro­cess­ing-msg func­tion will check against the table to see if its token still exists. And since the table is also used to record a heart­beat from the func­tion (for the restart func­tion to iden­ti­fy failed recur­sions), it also needs to upsert the last_used time­stamp against the token.

We can com­bine the two ops in one con­di­tion­al write, which will fail if the token doesn’t exist, and that error will ter­mi­nate the loop.

ps. I made the assump­tion in the sam­ple code that pro­cess­ing the SQS mes­sages is quick, hence why the time­out set­ting for the process-msg func­tion is 30s (20s long polling + 10s pro­cess­ing) and the restart function’s thresh­old is a very con­ser­v­a­tive 2 mins (it can be much short­er even after you take into account the effect of even­tu­al con­sis­ten­cy).

If your pro­cess­ing log­ic can take some time to exe­cute — bear in mind that the max time­out allowed for a Lamb­da func­tion is 5 mins — then here’s a few options that spring to mind:

  1. adjust the restart function’s thresh­old to be more than 5 mins : which might not be great as it length­ens your time to recov­ery when things do go wrong;
  2. peri­od­i­cal­ly update the last_used time­stamp dur­ing pro­cess­ing : which also needs to be con­di­tion­al writes, whilst swal­low­ing any errors;
  3. add an loop_id in to the DynamoDB table and include it in the Con­di­tion­Ex­pres­sion : that way, you keep the restart function’s thresh­old low and allow the process-msg func­tion to occa­sion­al­ly over­run; when it does, a new loop is start­ed in its place and takes over its token with a new loop_id so that when the over­run­ning instance fin­ish­es it’ll be stopped when it recurs­es (and fails to ver­i­fy its token because the loop_id no longer match)

Both option 2 & 3 strike me as rea­son­able approach­es, depend­ing on whether your pro­cess­ing log­ic are expect­ed to always run for some time (eg, involves some batch pro­cess­ing) or only in unlike­ly sce­nar­ios (occa­sion­al­ly slow third-par­ty API calls).


Scanning for Failed Loops

The restart func­tion per­forms a table scan against the DynamoDB table to look for tokens whose last_used time­stamp is either:

  • not set : the process-msg func­tion nev­er man­aged to set it dur­ing the first recur­sion, per­haps DynamoDB throt­tling issue or tem­po­rary net­work issue?
  • old­er than thresh­old : the process-msg func­tion has stopped for what­ev­er rea­son

By default, a Scan oper­a­tion in DynamoDB uses even­tu­al­ly con­sis­tent read, which can fetch data that are a few sec­onds old. You can set the Con­sis­ten­tRead para­me­ter to true; or, be more con­ser­v­a­tive with your thresh­olds.

Also, there’s a size lim­it of 1mb of scanned data per Scan request. So you’ll need to per­form the Scan oper­a­tion recur­sive­ly, see here.


Find out QueueURL from SNS Message

In the s-resources-cf.json file, you might have noticed that the DynamoDB table has the queue URL of the SQS queue as hash key. This is so that we could use the same mech­a­nism to scale up/down & restart pro­cess­ing func­tions for many queues (see the sec­tion below).

But this brings up a new ques­tion: “when the scale-up/scale-down func­tions are called (by Cloud­Watch Alarms, via SNS), how do we work out which queue we’re deal­ing with?”

When SNS calls our func­tion, the pay­load looks some­thing like this:

which con­tains all the infor­ma­tion we need to work­out the queue URL for the SQS queue in ques­tion.

We’re mak­ing an assump­tion here that the trig­gered Cloud­Watch Alarm exist in the same account & region as the scale-up and scale-down func­tions (which is a pret­ty safe bet I’d say).


Knowing when NOT to Scale Down

Final­ly — and these only apply to the scale-down func­tion — there are two things to keep in mind when scal­ing down:

  1. leave at least 1 pro­cess­ing loop per queue
  2. ignore OK mes­sages whose old state is not ALARM

The first point is easy, just check if the query result con­tains more than 1 token.

The sec­ond point is nec­es­sary because Cloud­Watch Alarms have 3 states — OK, INSUFFICIENT_DATA and ALARM — and we only want to scale down when tran­si­tioned from ALARM => OK.


Taking It Further

As you can see, there is a fair bit of set up involved. It’d be a waste if we have to do the same for every SQS queue we want to process.

For­tu­nate­ly, giv­en the cur­rent set up there’s noth­ing stop­ping you from using the same infra­struc­ture to man­age mul­ti­ple queues:

  • the DynamoDB table is keyed to the queue URL already, and
  • the scale-up and scale-down func­tions can already work with Cloud­Watch Alarms for any SQS queues

First­ly, you’d need to imple­ment addi­tion­al con­fig­u­ra­tion so that giv­en a queue URL you can work out which Lamb­da func­tion to invoke to process mes­sages from that queue.

Sec­ond­ly, you need to decide between:

  1. use the same recur­sive func­tion to poll SQS, but for­ward received mes­sages to rel­e­vant Lamb­da func­tions based on the queue URL, or
  2. dupli­cate the recur­sive polling log­ic in each Lamb­da func­tion (maybe put them in a npm pack­age so to avoid code dupli­ca­tion)

Depend­ing on the vol­ume of mes­sages you’re deal­ing with, option 1 has a cost con­sid­er­a­tion giv­en there’s a Lamb­da invo­ca­tion per mes­sage (in addi­tion to the polling func­tion which is run­ning non-stop).


So that’s it folks, it’s a pret­ty long post and I hope you find the ideas here use­ful. If you’ve got any feed­backs or sug­ges­tions do feel free to leave them in the com­ments sec­tion below.

Like what you’re read­ing? Check out my video course Pro­duc­tion-Ready Server­less and learn the essen­tials of how to run a server­less appli­ca­tion in pro­duc­tion.

We will cov­er top­ics includ­ing:

  • authen­ti­ca­tion & autho­riza­tion with API Gate­way & Cog­ni­to
  • test­ing & run­ning func­tions local­ly
  • CI/CD
  • log aggre­ga­tion
  • mon­i­tor­ing best prac­tices
  • dis­trib­uted trac­ing with X-Ray
  • track­ing cor­re­la­tion IDs
  • per­for­mance & cost opti­miza­tion
  • error han­dling
  • con­fig man­age­ment
  • canary deploy­ment
  • VPC
  • secu­ri­ty
  • lead­ing prac­tices for Lamb­da, Kine­sis, and API Gate­way

You can also get 40% off the face price with the code ytcui. Hur­ry though, this dis­count is only avail­able while we’re in Manning’s Ear­ly Access Pro­gram (MEAP).