AWS Lambda —3 pro tips for working with Kinesis streams

At Yubl, we arrived at a non-trivial serverless architecture where Lambda and Kinesis became a prominent feature of this architecture.

Whilst our experience using Lambda with Kinesis was great in general, there was a couple of lessons that we had to learn along the way. Here are 3 useful tips to help you avoid some of the pitfalls we fell into and accelerate your own adoption of Lambda and Kinesis.

Consider partial failures

From the Lambda documentation:

AWS Lambda polls your stream and invokes your Lambda function. Therefore, if a Lambda function fails, AWS Lambda attempts to process the erring batch of records until the time the data expires…

Because the way Lambda functions are retried, if you allow your function to err on partial failures then the default behavior is to retry the entire batch until success or the data expires from the stream.

To decide if this default behavior is right for you, you have to answer certain questions:

  • can events be processed more than once?
  • what if those partial failures are persistent? (perhaps due to a bug in the business logic that is not handling certain edge cases gracefully)
  • is it more important to process every event till success than keeping the overall system real-time?

In the case of Yubl (which was a social networking app with a timeline feature similar to Twitter) we found that for most of our use cases it’s more important to keep the system flowing than to halt processing for any failed events, even if for a minute.

For instance, when you create a new post, we would distribute it to all of your followers by processing the yubl-posted event. The 2 basic choices we’re presented with are:

  1. allow errors to bubble up and fail the invocation—we give every event every opportunity to be processed; but if some events fail persistently then no one will receive new posts in their feed and the system appears unavailable
  2. catch and swallow partial failures—failed events are discarded, some users will miss some posts but the system appears to be running normally to users (even affected users might not realize that they had missed some posts)

(of course, it doesn’t have to be a binary choice, there’s plenty of room to add smarter handling for partial failures, which we will discuss shortly)

We encapsulated these 2 choices as part of our tooling so that we get the benefit of reusability and the developers can make an explicit choice (and the code makes that choice obvious to anyone reading it later on) for every Kinesis processor they create.

You would probably apply different choices depending on the problem you’re solving, the important thing is to always consider how partial failures would affect your system as a whole.

Use dead letter queues (DLQ)

AWS announced support for Dead Letter Queues (DLQ) at the end of 2016, however, at the time of writing this support only extends to asynchronous invocations (SNS, S3, IOT, etc.) but not poll-based invocations such as Kinesis and DynamoDB streams.

That said, there’s nothing stopping you from applying the DLQ concept yourself.

First, let’s roll back the clock to a time when we didn’t have Lambda. Back then, we’d use long running applications to poll Kinesis streams ourselves. Heck, I even wrote my own producer and consumer libraries because when AWS rolled out Kinesis they totally ignored anyone not running on the JVM!

Lambda has taken over a lot of the responsibilities—polling, tracking where you are in the stream, error handling, etc.—but as we have discussed above it doesn’t remove you from the need to think for yourself. Nor does it change what good looks like for a system that processes Kinesis events, which for me must have at least these 3 qualities:

  • it should be real-time (most domains consider real-time as “within a few seconds”)
  • it should retry failed events, but retries should not violate the realtime constraint on the system
  • it should be possible to retrieve events that could not be processed so someone can investigate root cause or provide manual intervention

Back then, my long running application would:

  1. poll Kinesis for events
  2. process the events by passing them to a delegate function (your code)
  3. failed events are retried 2 additional times
  4. after the 2 retries are exhausted, they are saved into a SQS queue
  5. record the last sequence number of the batch so that we don’t lose the current progress if the host VM dies or the application crashes
  6. another long running application (perhaps on another VM) would poll the SQS queue for events that couldn’t be process realtime
  7. process the failed events by passing them to the same delegate function as above (your code)
  8. after the max no. of retrievals the events are passed off to a DLQ
  9. this triggers CloudWatch alarms and someone can manually retrieve the event from the DLQ to investigate

A Lambda function that processes Kinesis events should also:

  • retry failed events X times depending on processing time
  • send failed events to a DLQ after exhausting X retries

Since SNS already comes with DLQ support, you can simplify your setup by sending the failed events to a SNS topic instead—Lambda would then process it a further 3 times before passing it off to the designated DLQ.

Avoid “hot” streams

We found that when a Kinesis stream has 5 or more Lambda function subscribers we would start to see lots ReadProvisionedThroughputExceeded errors in CloudWatch. Fortunately these errors are silent to us as they happen to (and are handled by) the Lambda service polling the stream.

However, we occasionally see spikes in the GetRecords.IteratorAge metric, which tells us that a Lambda function will sometimes lag behind. This did not happen frequently enough to present a problem but the spikes were unpredictable and did not correlate to spikes in traffic or number of incoming Kinesis events.

Increasing the no. of shards in the stream made matters worse and the no. of ReadProvisionedThroughputExceeded increased proportionally.


According to the Kinesis documentation

Each shard can support up to 5 transactions per second for reads, up to a maximum total data reads of 2 MB per second.

and Lambda documentation

If your stream has 100 active shards, there will be 100 Lambda functions running concurrently. Then, each Lambda function processes events on a shard in the order that they arrive.

One would assume that each of the aforementioned Lambda functions would be polling its shard independently. Since the problem is having too many Lambda functions poll the same shard, it makes sense that adding new shards will only escalate the problem further.


“All problems in computer science can be solved by another level of indirection.”

—David Wheeler

After speaking to the AWS support team about this, the only advice we received (and one that we had already considered) was to apply the fan out pattern by adding another layer of Lambda function who would distribute the Kinesis events to others.

Whilst this is simple to implement, it has some downsides:

  • it vastly complicates the logic for handling partial failures (see above)
  • all functions now process events at the rate of the slowest function, potentially damaging the realtime-ness of the system

We also considered and discounted several other alternatives, including:

  • have one stream per subscriber—this has a significant cost implication, and more importantly it means publishers would need to publish the same event to multiple Kinesis streams in a “transaction” with no easy way to rollback (since you can’t unpublish an event in Kinesis) on partial failures
  • roll multiple subscriber logic into one—this corrodes our service boundary as different subsystems are bundled together to artificially reduce the no. of subscribers

In the end, we didn’t find a truly satisfying solution and decided to reconsider if Kinesis was the right choice for our Lambda functions on a case by case basis.

  • for subsystems that do not have to be realtime, use S3 as source instead—all our Kinesis events are persisted to S3 via Kinesis Firehose, the resulting S3 files can then be processed by these subsystems, eg. Lambda functions that stream events to Google BigQuery for BI
  • for work that are task-based (ie, order is not important), use SNS/SQS as source instead—SNS is natively supported by Lambda, and we implemented a proof-of-concept architecture for processing SQS events with recursive Lambda functions, with elastic scaling; now that SNS has DLQ support (it was not available at the time) it would definitely be the preferred option provided that its degree of parallelism would not flood and overwhelm downstream systems such as databases, etc.
  • for everything else, continue to use Kinesis and apply the fan out pattern as an absolute last resort

Wrapping up…

So there you have it, 3 pro tips from a group of developers who have had the pleasure of working extensively with Lambda and Kinesis.

I hope you find this post useful, if you have any interesting observations or learning from your own experience working with Lambda and Kinesis, please share them in the comments section below.

Links

Yubl’s road to serverless — Part 1, Overview

Yubl’s road to serverless — Part 2, Testing and CI/CD

Yubl’s road to serverless — Part 3, Ops

AWS Lambda — use recursive functions to process SQS messages, Part 1

AWS Lambda — use recursive functions to process SQS messages, Part 2

AWS Lambda – build yourself a URL shortener in 2 hours

An interesting requirement came up at work this week where we discussed potentially having to run our own URL Shortener because the Universal Links mechanism (in iOS 9 and above) requires a JSON manifest at

https://domain.com/apple-app-site-association

Since the OS doesn’t follow redirects this manifest has to be hosted on the URL shortener’s root domain.

Owing to a limitation on AppsFlyer it’s currently not able to shorten links when you have Universal Links configured for your app. Whilst we can switch to another vendor it means more work for our (already stretched) client devs and we really like AppsFlyer‘s support for attributions.

Which brings us back to the question

“should we build a URL shortener?”

swiftly followed by

“how hard can it be to build a scalable URL shortener in 2017?”

Well, turns out it wasn’t hard at all 

Lambda FTW

For this URL shortener we’ll need several things:

  1. a GET /{shortUrl} endpoint that will redirect you to the original URL
  2. a POST / endpoint that will accept an original URL and return the shortened URL
  3. an index.html page where someone can easily create short URLs
  4. a GET /apple-app-site-association endpoint that serves a static JSON response

all of which can be accomplished with API Gateway + Lambda.

Overall, this is the project structure I ended up with:

  • using the Serverless framework’s aws-nodejs template
  • each of the above endpoint have a corresponding handler function
  • the index.html file is in the static folder
  • the test cases are written in such a way that they can be used both as integration as well as acceptance tests
  • there’s a build.sh script which facilitates running
    • integration tests, eg ./build.sh int-test {env} {region} {aws_profile}
    • acceptance tests, eg ./build.sh acceptance-test {env} {region} {aws_profile}
    • deployment, eg ./build.sh deploy {env} {region} {aws_profile}

Get /apple-app-site-association endpoint

Seeing as this is a static JSON blob, it makes sense to precompute the HTTP response and return it every time.

POST / endpoint

For an algorithm to shorten URLs, you can find a very simple and elegant solution on StackOverflow. All you need is an auto-incremented ID, like the ones you normally get with RDBMS.

However, I find DynamoDB a more appropriate DB choice here because:

  • it’s a managed service, so no infrastructure for me to worry about
  • OPEX over CAPEX, man!
  • I can scale reads & writes throughput elastically to match utilization level and handle any spikes in traffic

but, DynamoDB has no such concept as an auto-incremented ID which the algorithm needs. Instead, you can use an atomic counter to simulate an auto-incremented ID (at the expense of an extra write-unit per request).

GET /{shortUrl} endpoint

Once we have the mapping in a DynamoDB table, the redirect endpoint is a simple matter of fetching the original URL and returning it as part of the Location header.

Oh, and don’t forget to return the appropriate HTTP status code, in this case a 308 Permanent Redirect.

GET / index page

Finally, for the index page, we’ll need to return some HTML instead (and a different content-type to go with the HTML).

I decided to put the HTML file in a static folder, which is loaded and cached the first time the function is invoked.

Getting ready for production

Fortunately I have had plenty of practice getting Lambda functions to production readiness, and for this URL shortener we will need to:

  • configure auto-scaling parameters for the DynamoDB table (which we have an internal system for managing the auto-scaling side of things)
  • turn on caching in API Gateway for the production stage

Future Improvements

If you put in the same URL multiple times you’ll get back different short-urls, one optimization (for storage and caching) would be to return the same short-url instead.

To accomplish this, you can:

  1. add GSI to the DynamoDB table on the longUrl attribute to support efficient reverse lookup
  2. in the shortenUrl function, perform a GET with the GSI to find existing short url(s)

I think it’s better to add a GSI than to create a new table here because it avoids having “transactions” that span across multiple tables.

Useful Links

Slides for my AWS user group talk – AWS Lambda from the trenches

Hello, just a quick note to say thanks to everyone who attended the AWS user group meetup last night, was a pleasure to talk to many of you afterwards. As promised, here are the slides for my talk from last night.

AWS Lambda – comparing platform performances

As Lambda adds nodejs 6.10 to its supported platforms I wondered if there’s any performance differences between the platforms. Thankfully the templates in the Serverless framework make it a relative breeze to test it out with a simple HelloWorld function.

 

The Test

see the test code here.

I created a simple Lambda function for each platform that will respond to an API Gateway event and return “hello”. This is the nodejs version.

I decided to use API Gateway as the trigger as it allows me to invoke the function and apply a constant load using standard load testing tools for HTTP. I chose Artillery because you can get going with minimal fuzz and I had used it before.

For each platform, I ran a test with 10 virtual users sending 1 request per second (ie. a total of 10 req/s) for an hour.

artillery quick duration 3600 rate 10 n 1 http://my.lambda.backed.api/

Since we’re interested in the performance characteristics of the different Lambda platforms, we’ll only be looking at the function Duration metric, and we’ll ignore the initial cold start times.

 

Observation 1 – C# is slower?

Unsurprisingly the invocation duration is fairly consistent across the functions, although C# is sticking out like a sore thumb.

Take this 10 mins window for instance – where there were no spikes that looked like cold starts – the C# platform is consistently higher than the rest.

 

Observation 2 – Java has very consistent performance

If you look at the max duration for the same 10 mins window – for whatever reason, I didn’t get any percentile metrics from CloudWatch for the entire duration of the test so had to settle for max instead – the Java platform was both lower and had less variance, by some distance.

If you compare the average and max duration for the Java platform over a longer time window, you’ll also see that there’s very little difference between the two (if you ignore the spike at 01:38 which might be down to GC pause as opposed to cold start) which suggests the performance of the Java platform is very consistent.

 

Observation 3 – static languages has more consistent performance?

Following on from the previous observation, it seems that both C# and Java shows less variance when it comes to max duration, so perhaps it’s because both are compiled languages?

 

Observation 4 – Java packages are big…

One of the benefits with using nodejs and Python to write Lambda functions is that they produce much smaller packages, which we know translates to lower code start time. Now, the fault might lie with the Serverless template for aws-java-maven, but my HelloWorld Java example produces a whooping 2MB package, which is orders of magnitude bigger than the nodejs and Python functions. I expected it to be bigger than nodejs, but perhaps closer to the size of the C# package.

 

Conclusions

Take these results with a pinch of salt. Things are evolving at an incredible pace and whatever performance discrepancies we’re seeing today can change quickly as AWS improves all the platforms behind the scenes.

Even as I observe that the C# platform appears to be slower in this test, we’re talking about sub-millisecond difference for a HelloWorld example, hardly representative of a real world application. The DotNetCore platform itself (which C# Lambda functions run on) is also evolving quickly, and any future performance improvements in that underlying platform will be transferred to you at no cost, so don’t let this post dissuade you from writing Lambda functions in C#.

Yubl’s road to Serverless architecture – Part 3 – ops

Note: see here for the rest of the series.

 

A couple of folks asked me about our strategy for monitoring, logging, etc. after part 2, and having watched Chris Swan talk about “Serverless Operations is not a Solved Problem” at the Serverless meetup it’s a good time for us to talk about how we approached ops with AWS Lambda.

 

NoOps != No Ops

The notion of “NoOps” have often been mentioned with serverless technologies (I have done it myself), but it doesn’t mean that you no longer have to worry about ops.

To me, “ops” is the umbrella term for everything related to keeping my systems operational and performing within acceptable parameters, including (but not limited to) resource provisioning, configuration management, monitoring and being on-hand to deal with any live issues. The responsibilities of keeping the systems up and running will always exist regardless whether your software is running on VMs in the cloud, on-premise hardware, or as small Lambda functions.

Within your organization, someone needs to fulfill these responsibilities. It might be that you have a dedicated ops team, or perhaps your developers will share those responsibilities.

NoOps to me, means no ops specialization in my organization – ie. no dedicated ops team – because the skills and efforts required to fulfill the ops responsibilities do not justify the need for such specialization. As an organization it’s in your best interest to delay such specialization for as long as you can both from a financial point of view and also, perhaps more importantly, because Conway’s law tells us that having an ops team is the surefire way to end up with a set of operational procedures/processes, tools and infrastructure whose complexity will in turn justify the existence of said ops team.

At Yubl, as we migrated to a serverless architecture our deployment pipeline became more streamlined, our toolchain became simpler and we found less need for a dedicated ops team and were in the process of disbanding our ops team altogether.

 

Logging

Whenever you write to the stdout from your Lambda function – eg. when you do console.log in your nodejs code – it ends up in the function’s Log Group in CloudWatch Logs.

Centralised Logging

However, logs are not easily searchable, and once you have a dozen Lambda functions you will want to collect them in one central place. The ELK stack is the de facto standard for centralised logging these days, you can run your own ELK stack on EC2, and elastic.co also offers a hosted version of Elasticsearch and Kibana.

To ship your logs from CloudWatch Logs to ELK you can subscribe the Log Group to a cloudwatch-logs-to-elk function that is responsible for shipping the logs.

You can subscribe the Log Group manually via the AWS management console.

But, you don’t want a manual step everyone needs to remember every time they create a new Lambda function. Instead, it’s better to setup a rule in CloudWatch Events to invoke a subscribe-log-group Lambda function to set up the subscription for new Log Groups.

2 things to keep in mind:

  • lots services create logs in CloudWatch Logs, so you’d want to filter Log Groups by name, Lambda function logs have the prefix /aws/lambda/
  • don’t subscribe the Log Group for the cloudwatch-logs-to-elk function (or whatever you decide to call it), otherwise you create an infinite loop for the cloudwatch-logs-to-elk function where its own logs will trigger itself and produce more logs and so on

 

Distributed Tracing

Having all your logs in one easily searchable place is great, but as your architecture expands into more and more services that depends on one another you will need to correlated logs from different services to understand all the events that occurred during one user request.

For instance, when a user creates a new post in the Yubl app we distribute the post to all of the user’s followers. Many things happen along this flow:

  1. user A’s client calls the legacy API to create the new post
  2. the legacy API fires a yubl-posted event into a Kinesis stream
  3. the distribute-yubl function is invoked to handle this event
  4. distribute-yubl function calls the relationship-api to find user A’s followers
  5. distribute-yubl function then performs some business logic, group user A’s followers into batches and for each batch fires a message to a SNS topic
  6. the add-to-feed function is invoked for each SNS message and adds the new post to each follower’s feed

If one of user A’s followers didn’t receive his new post in the feed then the problem can lie in a number of different places. To make such investigations easier we need to be able to see all the relevant logs in chronological order, and that’s where correlation IDs (eg. initial request-id, user-id, yubl-id, etc.) come in.

Because the handling of the initial user request flows through API calls, Kinesis events and SNS messages, it means the correlation IDs also need to be captured and passed through API calls, Kinesis events and SNS messages.

Our approach was to roll our own client libraries which will pass the captured correlation IDs along.

Capturing Correlation IDs

All of our Lambda functions are created with wrappers that wraps your handler code with additional goodness such as capturing the correlation IDs into a global.CONTEXT object (which works because nodejs is single-threaded).

Forwarding Correlation IDs

Our HTTP client library is a thin wrapper around the superagent HTTP client and injects the captured correlation IDs into outgoing HTTP headers.

We also have a client library for publishing Kinesis events, which can inject the correlation IDs into the record payload.

For SNS, you can include the correlation IDs as message attributes when publishing a message.

Zipkin and Amazon X-Ray

Since then, AWS has announced x-ray but it’s still in preview so I have not had a chance to see how it works in practice, and it doesn’t support Lambda at the time of writing.

There is also Zipkin, but it requires you to run additional infrastructure on EC2 and whilst it has wide range of support for instrumentation the path to adoption in the serverless environment (where you don’t have or need traditional web frameworks) is not clear to me.

 

Monitoring

Out of the box you get a number of basic metrics from CloudWatch – invocation counts, durations, errors, etc.

You can also publish custom metrics to CloudWatch (eg. user created, post viewed) using the AWS SDK. However, since these are HTTP calls you have to be conscious of the latencies they’ll add for user-facing functions (ie. those serving APIs). You can mitigate the added latencies by publishing them in a fire-and-forget fashion, and/or budgeting the amount of time (say, to a max of 50ms) you can spend publishing metrics at the end of a request.

Because you have to do everything during the invocation of a function, it forces you to make trade offs.

Another approach is to take a leaf from Datadog’s book and use special log messages and process them after the fact. For instance, if you write logs in the format MONITORING|epoch_timestamp|metric_value|metric_type|metric_name like below..

console.log(“MONITORING|1489795335|27.4|latency|user-api-latency”);

console.log(“MONITORING|1489795335|8|count|yubls-served”);

then you can process these log messages (see Logging section above) and publish them as metrics instead. With this approach you’ll be trading off liveness of metrics for less API latency overhead.

Of course, you can employ both approaches in your architecture and use the appropriate one for each situation:

  • for functions on the critical path (that will directly impact the latency your users experience), choose the approach of publishing metrics as special log messages;
  • for other functions (cron jobs, kinesis processors, etc.) where invocation duration doesn’t significantly impact a user’s experience, publish metrics as part of the invocation

Dashboards + Alerts

We have a number of dashboards setup in CloudWatch as well as Graphite (using hostedgraphite, for our legacy stack running on EC2), and they’re displayed on large monitors near the server team area. We also set up alerts against various metrics such as API latencies and error count, and have opsgenie setup to alert whoever’s on-call that week.

Consider alternatives to CloudWatch

Whilst CloudWatch is good, cost-effective solution for monitoring (in some cases the only way to get metrics out of AWS services such as Kinesis and DynamoDB) it has its drawbacks.

Its UI and customization is not on-par with competitors such as Graphite, Datadog and Sysdig, and it lacks advanced features such as anomaly detection and finding correlations that you find in Stackdrvier and Wavefront.

The biggest limitation however, is that CloudWatch metrics are only granular to the minute. It means your time to discovery of issues is measured in mins (you need a few data points to separate real issues that require manual intervention from temporary blips) and consequently your time to recover is likely to be measured in tens of mins. As your scale up and the cost of unavailability goes up you need to invest efforts to cut down both times, which might mean that you need more granular metrics than CloudWatch is able to give you.

Another good reason for not using CloudWatch is that, you really don’t want your monitoring system to fail at the same time as the system it monitors. Over the years we have experienced a number of AWS outages that impacted both our core systems running on EC2 as well as CloudWatch itself. As our system fails and recovers we don’t have the visibility to see what’s happening and how it’s impacting our users.

 

Config Management

Whatever approach you use for config management you should always ensure that:

  • sensitive data (eg. credentials, connection strings) are encrypted in-flight, and at rest
  • access to sensitive data should be based on roles
  • you can easily and quickly propagate config changes

Nowadays, you can add environment variables to your Lambda functions directly, and have them encrypted with KMS.

This was the approach we started with, albeit using environment variables in the Serverless framework since it wasn’t a feature of the Lambda service at the time. After we had a dozen functions that share config values (eg. MongoDB connection strings) this approach became cumbersome and it was laborious and painfully slow to propagate config changes manually (by updating and re-deploying every function that require the updated config value).

It was at this point in our evolution that we moved to a centralised config service. Having considered consul (which I know a lot of folks use) we decided to write our own using API Gateway, Lambda and DynamoDB because:

  • we don’t need many of consul‘s features, only the kv store
  • consul is another thing we’d have to run and manage
  • consul is another thing we’d have to learn
  • even running consul with 2 nodes (you need some redundancy for production) it is still order of magnitude more expensive

Sensitive data are encrypted (by a developer) using KMS and stored in the config API in its encrypted form, when a Lambda function starts up it’ll ask the config API for the config values it needs and it’ll use KMS to decrypt the encrypted blob.

We secured access to the config API with api keys created in API Gateway, in the event these keys are compromised attackers will be able to update config values via this API. You can take this a step further (which we didn’t get around to in the end) by securing the POST endpoint with IAM roles, which will require developers to make signed requests to update config values.

Attackers can still retrieve sensitive data in encrypted form, but they will not be able to decrypt them as KMS also requires role-based access.

client library

As most of our Lambda functions need to talk to the config API we invested efforts into making our client library really robust and baked in caching support and periodic polling to refresh config values from the source.

 

So, that’s it folks, hope you’ve enjoyed this post, click here to see the rest of the series.

The emergence of AWS Lambda and other serverless technologies have significantly simplified the skills and tools required to fulfil the ops responsibilities inside an organization. However, this new paradigm has also introduced new limitations and challenges for existing toolchains and requires us to come up with new answers. Things are changing at an incredibly fast pace, and I for one am excited to see what new practices and tools emerge from this space!