The one mistake everyone makes when using Kinesis with Lambda

Yan Cui

I help clients go faster for less using serverless technologies.

AWS Kinesis and Lambda are a great combo for processing large amounts of data in real-time. However, there’s a common oversight that many developers make when integrating these two services.

There are established best practices for configuring Lambda’s EventSourceMapping [1] for Kinesis:

  • Configure an OnFailure destination for failed records.
  • Enable BisectBatchOnFunctionError.
  • Override MaximumRetryAttempts. Choose a value that gives failed messages a reasonable number of retries before sending them to the OnFailure destination.

These are great advice for building a resilient data processing pipeline and guarding against poison messages. But one crucial detail is missing: the payload captured in the OnFailure destination does not contain the event payload.

This is what will be captured in the OnFailure destination.

{
  "requestContext": {
    "requestId": "cf6fa2a6-48e2-49a6-bc23-7ed2b154b176",
    "functionArn": "arn:aws:lambda:us-east-1:xxx:function:kinesis-processor",
    "condition": "RetryAttemptsExhausted",
    "approximateInvokeCount": 4
  },
  "responseContext": {
    "statusCode": 200,
    "executedVersion": "$LATEST",
    "functionError": "Unhandled"
  },
  "version": "1.0",
  "timestamp": "2023-12-10T22:06:23.446Z",
  "KinesisBatchInfo": {
    "shardId": "shardId-000000000000",
    "startSequenceNumber": "49647238774018758449143790847474952390342044683891376130",
    "endSequenceNumber": "49647238774018758449143790941112301748593148785147772930",
    "approximateArrivalOfFirstRecord": "2023-12-10T22:05:59.280Z",
    "approximateArrivalOfLastRecord": "2023-12-10T22:06:19.196Z",
    "batchSize": 3,
    "streamArn": "arn:aws:kinesis:us-east-1:xxx:stream/MyKinesisStream"
  }
}

The KinesisBatchInfo contains the shard ID, sequence numbers and batch size. The actual data payload – the heart of the event that you need for processing and analysis – is not included.

This omission can lead to significant issues:

  1. Difficulty in debugging: Without the actual event data, pinpointing the cause of the failure becomes a complex task.
  2. Data loss: If you are not able to fetch the record before it expires from the stream, then the data is lost forever. Default retention for a Kinesis stream is 24 hours. If the error occurs on a Saturday, likely, your developers won’t see it until they are back in the office on Monday. By then, the record is no longer available in the stream.
  3. Increased complexity: Developers need to implement additional mechanisms to retrieve and store the original event data.

Most people don’t realise this and many have been caught off-guard when they encounter a failed message for the first time!

Hydrating event data

To handle this situation, you need to hydrate the event data yourself.

Instead of using an SQS queue as the OnFailure destination directly:

Let’s add a layer of indirection:

  • For the Kinesis function, use an SNS topic as the OnFailure destination.
  • Subscribe another Lambda function to this topic. This hydrate function retrieves the record from the Kinesis stream.
  • Because the hydrate function is invoked asynchronously by SNS, we can configure an OnSuccess destination to write its return value to the original SQS DLQ. This way, we don’t have to maintain the code to write to SQS. However, the downside is that the message captured in the SQS queue is more verbose (see below). All we care about is the responsePayload where we enriched the original OnFailure message with the KinesisBatch array.
{
  "version": "1.0",
  "timestamp": "2023-12-10T22:43:49.863Z",
  "requestContext": {
    "requestId": "2944cfad-d4c4-4b28-ad45-bfdd7f4526ef",
    "functionArn": "arn:aws:lambda:us-east-1:374852340823:function:hydrate-kinesis-dlq-dev-hydrate:$LATEST",
    "condition": "Success",
    "approximateInvokeCount": 1
  },
  "requestPayload": {
    "Records": [
      {
        "EventSource": "aws:sns",
        "EventVersion": "1.0",
        "EventSubscriptionArn": "arn:aws:sns:us-east-1:374852340823:hydrate-kinesis-dlq-dev-dlq-topic:aee08643-65ae-4caa-b117-de4c20cda524",
        "Sns": {
          "Type": "Notification",
          "MessageId": "002d4204-3a47-5737-b1b5-1ff47b22a32b",
          "TopicArn": "arn:aws:sns:us-east-1:374852340823:hydrate-kinesis-dlq-dev-dlq-topic",
          "Subject": null,
          "Message": "{\"requestContext\":{\"requestId\":\"5577e27b-a674-48cc-9fcb-aa6cf675e939\",\"functionArn\":\"arn:aws:lambda:us-east-1:374852340823:function:hydrate-kinesis-dlq-dev-kinesis\",\"condition\":\"RetryAttemptsExhausted\",\"approximateInvokeCount\":4},\"responseContext\":{\"statusCode\":200,\"executedVersion\":\"$LATEST\",\"functionError\":\"Unhandled\"},\"version\":\"1.0\",\"timestamp\":\"2023-12-10T22:43:49.459Z\",\"KinesisBatchInfo\":{\"shardId\":\"shardId-000000000000\",\"startSequenceNumber\":\"49647238774018758449143791361301066723689190929215782914\",\"endSequenceNumber\":\"49647238774018758449143791361304693501148036740885250050\",\"approximateArrivalOfFirstRecord\":\"2023-12-10T22:42:50.398Z\",\"approximateArrivalOfLastRecord\":\"2023-12-10T22:43:20.392Z\",\"batchSize\":4,\"streamArn\":\"arn:aws:kinesis:us-east-1:374852340823:stream/hydrate-kinesis-dlq-dev-KinesisStream-nrH7Hk48mKe2\"}}",
          "Timestamp": "2023-12-10T22:43:49.506Z",
          "SignatureVersion": "1",
          "Signature": "MauQRhErmYexthVBYzNAsLk2LSkJLWBGdRZj1ubK4py1Mss7XSGM4dTlI3NUMct9xivRqNDt1n/iGRr1QVsF8zYCV5YKJUnYQeXZKkoL81RrTTtq2xg35IoU5VQz3gvvl6nwc5C765K+62W1/mPCGgy5byyepnr/FsFyXesl2wbQZ/nje26JYqT/4xaVm9BrExI83tXXhVMxcYymWs46/Qcq1MEWAhjPiTG4TmAdAB3vwX99ZAikqoIm7zL8TxG838Dy52B09T44GHunfyEcL5uIGHAOr1J7YHc+OVNsgSFxNa3yB+nQFEyHJso9pTeA+w2F7bYGeEVyhMSPjmuFsg==",
          "SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-01d088a6f77103d0fe307c0069e40ed6.pem",
          "UnsubscribeUrl": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:374852340823:hydrate-kinesis-dlq-dev-dlq-topic:aee08643-65ae-4caa-b117-de4c20cda524",
          "MessageAttributes": {}
        }
      }
    ]
  },
  "responseContext": {
    "statusCode": 200,
    "executedVersion": "$LATEST"
  },
  "responsePayload": {
    "requestContext": {
      "requestId": "5577e27b-a674-48cc-9fcb-aa6cf675e939",
      "functionArn": "arn:aws:lambda:us-east-1:374852340823:function:hydrate-kinesis-dlq-dev-kinesis",
      "condition": "RetryAttemptsExhausted",
      "approximateInvokeCount": 4
    },
    "responseContext": {
      "statusCode": 200,
      "executedVersion": "$LATEST",
      "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2023-12-10T22:43:49.459Z",
    "KinesisBatchInfo": {
      "shardId": "shardId-000000000000",
      "startSequenceNumber": "49647238774018758449143791361301066723689190929215782914",
      "endSequenceNumber": "49647238774018758449143791361304693501148036740885250050",
      "approximateArrivalOfFirstRecord": "2023-12-10T22:42:50.398Z",
      "approximateArrivalOfLastRecord": "2023-12-10T22:43:20.392Z",
      "batchSize": 4,
      "streamArn": "arn:aws:kinesis:us-east-1:374852340823:stream/hydrate-kinesis-dlq-dev-KinesisStream-nrH7Hk48mKe2"
    },
    "KinesisBatch": [
      "Hello, theburningmonk!",
      "Hello, theburningmonk!",
      "Hello, theburningmonk!",
      "Hello, theburningmonk!"
    ]
  }
}

If you’re interested in trying this out yourself, then please take a look at this GitHub repo [2]. In particular, look at the hydrate function to see how to retrieve the records from the Kinesis stream.

It’s worth noting that this approach also works for DynamoDB streams. In the GitHub repo linked above, you will find working examples for both Kinesis and DynamoDB streams.

Next steps

I hope this post helps you avoid a very common mistake people make when working with Kinesis and Lambda.

To learn more about building production-ready serverless applications, check out my upcoming workshop [3]. I’m working on some new lessons to incorporate the latest changes from re:Invent 2023. And I’m adding support for CDK as well, due to popular demand!

Hope to see you there.

Links

[1] EventSourceMapping documentation

[2] GitHub repo for a working demo

[3] Production-Ready Serverless workshop

Related posts

How to reprocess Lambda dead-letter queue messages on-demand

Whenever you’re ready, here are 3 ways I can help you:

  1. Production-Ready Serverless: Join 20+ AWS Heroes & Community Builders and 1000+ other students in levelling up your serverless game. This is your one-stop shop to level up your serverless skills quickly.
  2. Do you want to know how to test serverless architectures with a fast dev & test loop? Check out my latest course, Testing Serverless Architectures and learn the smart way to test serverless.
  3. I help clients launch product ideas, improve their development processes and upskill their teams. If you’d like to work together, then let’s get in touch.