Best practices discovered while processing over 200 billion records on AWS every month with Amazon Kinesis Streams
After building a mission-critical data production pipeline at ironSource that processes over 200 billions records every month, we’d like to share some of our rules written with blood.
An Overview of Amazon Kinesis Streams
Kinesis is an infinitely scalable stream as a service that consists of shards. The service is commonly used due to its ease of use and low overhead along side its competitive pricing. This is a common differentiator between Kinesis Streams and Kafka.
Like any managed service, Amazon Kinesis has some limitations you should be familiar with — and how to overcome these with scaling and throttling. It will be wise to leverage the AWS provided producers, consumers and available tools in order to leverage these best practices.
Reduces cost with the Amazon Kinesis Producer Library (KPL)
At a large scale, it’s hard to change architecture once in production and cost becomes a very big pain. The service is billed per 25kb payload unit, so it makes sense to aggregate messages if you have records that are smaller in size.
When sending data into your Kinesis stream you should compress and aggregate several messages into one in order to reduce costs.
The Amazon Kinesis Producer Library (KPL) aggregates and compresses (using Protocol Buffers) multiple logical user records into a single Amazon Kinesis record for efficient puts into the stream. The library is built by AWS in C++ and has (only) Java bindings. An open-source version in Golang is available.
Use the Kinesis Client Library (KCL)
The KCL library, is written by AWS and supports automatic de-aggregation of KPL user records. The KCL takes care of many of the complex tasks associated with distributed computing — such as load-balancing across multiple instances, responding to instance failures, checkpointing processed records, and reacting to resharding.
Processing methods — On-Demand / Spot-instances / Lambda
While processing a Kinesis stream can be done using on-demand instances, it is highly recommend leveraging AWS spot-instances in order to process your stream — it is the most cost effective method.
There is also a way of processing the data using AWS Lambda with Kinesis, and Kinesis Record Aggregation & Deaggregation Modules for AWS Lambda. It is very easy to hook up a Kinesis stream to a Lambda function — but you must take cost into consideration and see if it makes sense for your specific use-case.
Monitoring Kinesis Streams
There are two sets of metrics you should take into consideration when monitoring your Kinesis Streams with CloudWatch:
- Basic Stream-level Metrics
- Enhanced Shard-level Metrics
For the stream-level metric, it’s good practice to set up an alarm on the GetRecords.IteratorAgeMilliseconds to know if your workers are lagging behind on the stream.
However, sometimes there might be a specific worker/shard that is out of sync — but the state won’t be reflected at the stream level via the global IteratorAgeMilliseconds average. In order to overcome this, I recommend running a Lambda script every minute and query at the shard-level for its IteratorAgeMilliseconds and alert if needed.
Amazon Kinesis Streams Metrics
AWS recommends monitoring the following metrics:
Tracks the read position across all shards and consumers in the stream. Note that if an iterator’s age passes 50% of the retention period (by default 24 hours, configurable up to 7 days), there is risk for data loss due to record expiration. AWS advises the use of CloudWatch alarms on the maximum statistic to alert you before this loss is a risk. For an example scenario that uses this metric, see Consumer Record Processing Falling Behind.
When your consumer side record processing is falling behind, it is sometimes difficult to know where the bottleneck is. Use this metric to determine if your reads are being throttled due to exceeding your read throughput limits. The most commonly used statistic for this metric is average.
This is for the same purpose as the
ReadProvisionedThroughputExceeded metric, but for the producer (put) side of the stream. The most commonly used statistic for this metric is average.
AWS advises the use of CloudWatch alarms on the average statistic to indicate if records are failing to the stream. Choose one or both put types depending on what your producer uses. If using the Kinesis Producer Library (KPL), use
AWS advises the use of CloudWatch alarms on the average statistic to indicate if records are failing from the stream.
Throttling Kinesis Streams
If you push it to the limit, Kinesis will start throttling your requests and you’ll have to re-shard your stream. There might be several reasons for throttling. For example, you may have sent more than 1 MB of payload / 1,000 records per second per shard. But you might have a throttling problem caused by DynamoDB limits.
As noted in Tracking Amazon Kinesis Streams Application State, the KCL tracks the shards in the stream using an Amazon DynamoDB table. When new shards are created as a result of re-sharding, the KCL discovers the new shards and populates new rows in the table. The workers automatically discover the new shards and create processors to handle the data from them. The KCL also distributes the shards in the stream across all the available workers and record processors. Make sure you have enough read/write capacity in your DynamoDB table.
Re-Sharding a Kinesis Stream
When re-sharding a stream, scaling is much faster when it’s in multiples of 2 or halves. You can re-shard your stream using the UpdateShardCount API. Note that scaling a stream with more than 200 shards is unsupported via this API. Otherwise, you could use the Amazon Kinesis scaling utils.
Re-sharding a stream with hundreds of shards can take time. An alternative method involves spinning up another stream with the desired capacity, and then redirecting all the traffic to the new stream.
AWS Kinesis Resources
Developing Kinesis Producers & Consumers
- Amazon Kinesis Producer Library — Streams Producer Library (KPL) provides metrics per shard, worker, and KPL application.
- CloudWatch metrics — Streams sends Amazon CloudWatch custom metrics with detailed monitoring for each stream.
- Amazon Kinesis Agent — The Amazon Kinesis Agent publishes custom CloudWatch metrics to help assess if the agent is working as expected.
- API logging — Streams uses AWS CloudTrail to log API calls and store the data in an Amazon S3 bucket.
- Some Streams Records are Skipped When Using the Kinesis Client Library
- Records Belonging to the Same Shard are Processed by Different Record Processors at the Same Time
- Consumer Application is Reading at a Slower Rate Than Expected
- GetRecords Returns Empty Records Array Even When There is Data in the Stream
- Shard Iterator Expires Unexpectedly
- Consumer Record Processing Falling Behind
- Amazon Kinesis Scaling Utils