Share on facebook
Share on twitter
Share on linkedin

Building aggregations with DynamoDB Streams

Forrest Brazeal
Forrest Brazeal

With the Cloud Adventure: Race To Cloud Castle game in full swing (get your paths in by Sept 4!) now seems like a good time to break down one interesting aspect of the game’s construction: the leaderboard aggregations.

How the game works

In “Cloud Adventure”, players compete to find and report paths marked by gems hidden across various cloud services. When you click “Submit Path” on the game page, the following event flow is triggered:

If your path is valid, we store it in a DynamoDB table in the following format:

This works great for storing and retrieving paths. By storing our user identifier in the partition key and our submitted path in the sort key, we can query the user identifier to get all paths associated with this user in one shot — great for checking your progress in the game.

Adding Aggregations

But we want to do more than just store and retrieve paths. As you can see on the leaderboard page, we want to ask the following questions about our data:

  • How many people (“active players”) have submitted at least one valid path to the leaderboard?
  • How many people have found X valid paths, where X is any number between 1 and 12?
  • How many total paths (with overlap) have been discovered by all players combined?

There’s no simple query you can write against the DynamoDB model laid out above to find the answers to these questions; you’d have to scan the whole table for each and do a bunch of client-side math. Slow and expensive!

Instead, we need to build some aggregates. And if you’re feeling like this must be much more complex than putting the data in a relational database and writing SUM queries, don’t worry: with a bit of help from AWS SAM, we’ll be up and running in just a few minutes.

Table layout

Remember, DynamoDB tables are “schema-on-read”, not “schema-on-write” — you can throw any data into the attributes you want; it’s up to the client to interpret them correctly. This allows us to do something called index overloading. We can store our aggregations as additional records in the same DynamoDB table. After all, why configure two tables when we can just as easily use one?

This is why I gave my partition and sort keys generic names (“pk” and “sk”), so I don’t limit myself to thinking of them as “username” or “path” fields.

Setting up streams

Now all we have to do is keep these aggregates up to date. We could do this by synchronously updating a bunch of aggregation fields in DynamoDB whenever a user reports a new path. But that would quickly start to bog down the user experience.

Instead, we can treat our path updates as events and process them asynchronously, behind the user’s back. Sneaky!

Handily, DynamoDB gives us a cool feature called Streams, which behaves a lot like a direct Kinesis integration with our table. Once we set this up, we can get any changes to table data exported as events for downstream processing.

(And I do mean “any events”. DynamoDB Streams don’t support filtering. So our handlers are going to have to pick through all the table changes in order to find the ones they care about.)

The good news is that we can define DynamoDB Streams with minimal config in an AWS SAM template. Here is my table resource:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: SAM template for DynamoDB aggregations
Resources:
  ResultsTable:
    Type: AWS::DynamoDB::Table
    Properties:
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
      AttributeDefinitions:
        - AttributeName: "pk"
          AttributeType: "S"
        - AttributeName: "sk"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "pk"
          KeyType: "HASH"
        - AttributeName: "sk"
          KeyType: "RANGE"
      BillingMode: "PAY_PER_REQUEST"

SAM uses the StreamViewType attribute to intuit that we want to wire up a stream to this table. The NEW_AND_OLD_IMAGES value means that we want to see both the old and modified data on the stream each time we make a change to the table.

Notice that we are NOT defining any additional secondary indexes on this table — we are going to make our aggregations work using nothing but the main table index!

Now we just need to connect the stream to the Lambda function that will process the aggregations. Under the Events section of our Lambda resource, we’ll add a function trigger of type DynamoDB , wired to the stream we implicitly created on our table. The StartingPosition of TRIM_HORIZON means we always want to start from the oldest unprocessed record in the stream, and the BatchSize means we’ll work with up to ten records at a time.

 AggregatorFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: backend/
      Handler: handler.aggregation_handler
      Runtime: python3.8
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref ResultsTable
      Environment:
        Variables:
          TABLE_NAME: !Ref ResultsTable
      Events:
        DynamoDBEvent:
          Type: DynamoDB
          Properties:
              Stream:
                  !GetAtt ResultsTable.StreamArn
              StartingPosition: TRIM_HORIZON
              BatchSize: 10

Building the aggregates

As you can see in the function definition, I chose to write my Lambda in Python 3. You’ll note that I’m using Python’s logger module to write structured output to CloudWatch — very helpful if you are trying to parse through mounds of debug logs! I’ve also chosen to initialize my boto3 resource and table object in global space, outside the function handler, so I only incur the initialization time penalty on cold start.

import os
import logging
import boto3

logger = logging.getLogger()
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ["TABLE_NAME"])

def aggregation_handler(event, context):
    logger.info(event)
    for record in event['Records']:
        if record['eventName'] == "INSERT" and "user_submitted_path" in record["dynamodb"]["NewImage"]:
            #Make sure the user hasn't already reached their 12-path limit
            num_paths = len(get_recorded_paths_for(record["dynamodb"]["Keys"]["pk"]["S"]))
            if num_paths <= 12:
                #Perform Aggregations

Inside the aggregation_handler itself (the code that gets triggered when DynamoDB Streams invokes the function), I’m looping through the set of up to 10 records that DynamoDB will send me.

Note: because this is a single-producer system where my own code controls what gets written to the table, I’m not super concerned about malformed events making their way onto the stream. But if I was, this code would need more error handling to deal with unexpected data in the records. Otherwise, the stream will back up as Lambda tries and fails to process the same record set over and over again.

Inside the loop, we’ll first need to perform a filtering check on the record, because not every record on the stream is relevant for our aggregations. In our case, we are looking for newly inserted records that contain a user_submitted_path — i.e., a path we need to aggregate into our various metrics.

Assuming this is a relevant record, we will then have to make a single DynamoDB QUERY to retrieve the full set of previously reported paths for this user. It’s important to note that this is the only additional query we will need to make in our aggregation efforts; all other information is contained in the streamed record. The query helper function looks something like this:

def get_recorded_paths_for(user):
    response = table.query(KeyConditionExpression=Key('pk').eq(user))
    logger.info(response)
    paths = response['Items'] if 'Items' in response else []
    return paths

Okay, now that we have our data, we’ll just build the aggregations as follows:

Aggregation #1: update total paths found
                response = table.update_item(
                    Key={
                        'pk': 'AGGREGATES',
                        'sk': 'TOTAL_PATHS_FOUND'
                    },
                    ReturnValues='UPDATED_NEW',
                    UpdateExpression='ADD data_value :d',
                    ExpressionAttributeValues={':d': 1}
                )
                logger.info(response)
                #Aggregation #2: update number of players who have found this number of paths
                response = table.update_item(
                    Key={
                        'pk': 'AGGREGATES',
                        'sk': str(num_paths) + '_PATHS_FOUND'
                    },
                    ReturnValues='UPDATED_NEW',
                    UpdateExpression='ADD data_value :d',
                    ExpressionAttributeValues={':d': 1}
                )
                logger.info(response)
                #Aggregation #3: update total number of players (if necessary)
                if num_paths == 1:
                    response = table.update_item(
                        Key={
                            'pk': 'AGGREGATES',
                            'sk': 'TOTAL_PLAYERS'
                        },
                        ReturnValues='UPDATED_NEW',
                        UpdateExpression='ADD data_value :d',
                        ExpressionAttributeValues={':d': 1}
                    )
                    logger.info(response)

I’m using the ADD action here on my DynamoDB updates, rather than the generally recommended SET, because it initializes records if they do not exist – something that’s always desirable in this case.

And in case you’re wondering, yes, each of these updates triggers a change on the table, which itself places a new record on the stream and triggers this same aggregation function over again! Now you see why it’s so important to place that filter check at the top of the code — otherwise we’d be stuck in an infinite Lambda loop.

You might be wondering: would it make more sense just to put our aggregates in a separate DynamoDB table, so we wouldn’t trigger a recursive streaming action when we update them? Yeah, maybe! As long as we don’t think we’d ever need to write a query that retrieves aggregate and individual data at the same time.

Retrieving our aggregates

Now, from my front end, I can make a single API call that requests the aggregates, and retrieve them as follows:

table.query(KeyConditionExpression=Key('pk').eq("AGGREGATES"))

This makes loading the leaderboard quite fast – try it out! And in case you’re wondering, the total time to asynchronously build and update the aggregates is consistently sub-second, so the leaderboard updates in what feels like real time.

Takeaways

Yes, this may seem like a more complex and fragile way to build aggregations than just putting our data in a relational database and doing a bit of math every time we load the leaderboard. But because the aggregates are precomputed, this leaderboard will load quickly, in constant time — even at millions of records. And at small scale, my table costs remain comfortably in the free tier.

Moreover, once you get the DynamoDB Streams infrastructure set up, the work to add additional aggregations doesn’t feel any worse to me than mapping a new backend function to a relational database with an ORM library. Your mileage there may vary.

Either way, enjoy the game, and I hope the knowledge of how the leaderboard works makes it a little more fun!

Recommended

Get more insights, news, and assorted awesomeness around all things cloud learning.

Get Started
Who’s going to be learning?
Sign In
Welcome Back!
Thanks for reaching out!

You’ll hear from us shortly. In the meantime, why not check out what our customers have to say about ACG?

How many seats do you need?

  • $499 $399 USD per seat per year
  • Billed Annually
  • Renews in 12 months

Ready to accelerate learning?

For over 25 licenses, a member of our sales team will walk you through a custom tailored solution for your business.


$1,995.00

Checkout