Prerequisites
You must meet the following requirements in order to complete this tutorial:
Amazon Web Services Account
Before you begin, ensure that you are familiar with the concepts discussed in Amazon Kinesis Data Streams Terminology and Concepts (p. 3), particularly with streams, shards, producers, and consumers.
It is also helpful to have completed the steps in the following guide: Install and Configure the AWS CLI (p. 14).
You must have an AWS account and a web browser to access the AWS Management Console.
For console access, use your IAM user name and password to sign in to the AWS Management Console from the IAM sign-in page. IAM lets you securely control access to AWS services and resources in your AWS account. For details about console and programmatic credentials, see Understanding and getting your security credentials in the AWS General Reference.
For more information about IAM and security key setup instructions, see Create an IAM User.
System Software Requirements
The system that you are using to run the application must have Java 7 or higher installed. To download and install the latest Java Development Kit (JDK), go to Oracle's Java SE installation site.
If you have a Java IDE, such as Eclipse, you can use it to open, edit, build, and run the source code.
You need the latest AWS SDK for Java version. If you are using Eclipse as your IDE, you can install the AWS Toolkit for Eclipse instead.
The consumer application requires the Kinesis Client Library (KCL) version 2.2.9 or higher, which you can obtain from GitHub at https://github.com/awslabs/amazon-kinesis-client/tree/master.
Next Steps
Step 1: Create a Data Stream (p. 22)
Step 1: Create a Data Stream
First, you must create the data stream that you will use in subsequent steps of this tutorial.
To create a stream
1. Sign in to the AWS Management Console and open the Kinesis console at https://
console.aws.amazon.com/kinesis.
2. Choose Data Streams in the navigation pane.
3. In the navigation bar, expand the region selector and choose a region.
4. Choose Create Kinesis stream.
Step 2: Create an IAM Policy and User
5. Enter a name for your data stream (for example, StockTradeStream).
6. Enter 1 for the number of shards, but leave Estimate the number of shards you'll need collapsed.
7. Choose Create Kinesis stream.
On the Kinesis streams list page, the status of your stream appears as CREATING while the stream is being created. When the stream is ready to use, the status changes to ACTIVE.
If you choose the name of your stream, in the page that appears, the Details tab displays a summary of your data stream configuration. The Monitoring section displays monitoring information for the stream.
Next Steps
Step 2: Create an IAM Policy and User (p. 23)
Step 2: Create an IAM Policy and User
Security best practices for AWS dictate the use of fine-grained permissions to control access to different resources. AWS Identity and Access Management (IAM) allows you to manage users and user permissions in AWS. An IAM policy explicitly lists actions that are allowed and the resources on which the actions are applicable.
The following are the minimum permissions generally required for Kinesis Data Streams producers and consumers.
Producer
Actions Resource Purpose
DescribeStream,
DescribeStreamSummary, DescribeStreamConsumer
Kinesis data stream Before attempting to read records, the consumer checks if the data stream exists, if it's active, and if the shards are contained in the data stream.
SubscribeToShard, RegisterStreamConsumer
Kinesis data stream Subscribes and registers consumers to a shard.
PutRecord, PutRecords Kinesis data stream Writes records to Kinesis Data Streams.
Consumer
Actions Resource Purpose
DescribeStream Kinesis data stream Before attempting to read records, the consumer checks if the data stream exists, if it's active, and if the shards are contained in the data stream.
GetRecords, GetShardIterator
Kinesis data stream Reads records from a shard.
CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem
Amazon DynamoDB
table If the consumer is developed using the Kinesis Client Library (KCL) (either version 1.x or 2.x), it needs permissions to a DynamoDB table to track the processing state of the application.
DeleteItem Amazon DynamoDB
table For when the consumer performs split/merge operations on Kinesis Data Streams shards.
PutMetricData Amazon CloudWatch
log The KCL also uploads metrics to CloudWatch, which are useful for monitoring the application.
Step 2: Create an IAM Policy and User
For this tutorial, you will create a single IAM policy that grants all of the above permissions. In production, you might want to create two policies, one for producers and one for consumers.
To create an IAM policy
1. Locate the Amazon Resource Name (ARN) for the new data stream that you created in the step above. You can find this ARN listed as Stream ARN at the top of the Details tab. The ARN format is as follows:
arn:aws:kinesis:region:account:stream/name
region
The AWS region code; for example, us-west-2. For more information, see Region and Availability Zone Concepts.
account
The AWS account ID, as shown in Account Settings.
name
The name of the data stream that you created in the step above, which is StockTradeStream.
2. Determine the ARN for the DynamoDB table to be used by the consumer (and to be created by the first consumer instance). It must be in the following format:
arn:aws:dynamodb:region:account:table/name
The region and account ID are identical to the values in the ARN of the data stream that you're using for this tutorial, but the name is the name of the DynamoDB table created and used by the consumer application. KCL uses the application name as the table name. In this step, use
StockTradesProcessor for the DynamoDB table name, because that is the application name used in later steps in this tutorial.
3. In the IAM console, in Policies (https://console.aws.amazon.com/iam/home#policies), choose Create policy. If this is the first time that you have worked with IAM policies, choose Get Started, Create Policy.
4. Choose Select next to Policy Generator.
5. Choose Amazon Kinesis as the AWS service.
6. Select DescribeStream, GetShardIterator, GetRecords, PutRecord, and PutRecords as the allowed actions.
7. Enter the ARN of the data stream that you're using in this tutorial.
8. Use Add Statement for each of the following:
AWS Service Actions ARN
Amazon DynamoDB CreateTable, DeleteItem,
DescribeTable, GetItem, PutItem, Scan, UpdateItem
The ARN of the DynamoDB table that you created in Step 2 of this procedure.
Amazon CloudWatch PutMetricData *
The asterisk (*) that is used when specifying an ARN is not required. In this case, it's because there is no specific resource in CloudWatch on which the PutMetricData action is invoked.
9. Choose Next Step.
Step 2: Create an IAM Policy and User
10. Change Policy Name to StockTradeStreamPolicy, review the code, and choose Create Policy.
The resulting policy document should look like this:
{ "Version": "2012-10-17", "Statement": [ "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards",
"kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer"
],
"Resource": [
"arn:aws:kinesis:us-west-2:123:stream/StockTradeStream"
]
"kinesis:SubscribeToShard", "kinesis:DescribeStreamConsumer"
],
"Resource": [
"arn:aws:kinesis:us-west-2:123:stream/StockTradeStream/*"
]
"arn:aws:dynamodb:us-west-2:123:table/StockTradesProcessor"
]
"cloudwatch:PutMetricData"
],
To create an IAM user
1. Open the IAM console at https://console.aws.amazon.com/iam/.
2. On the Users page, choose Add user.
Step 3: Download and Build the Code
3. For User name, type StockTradeStreamUser.
4. For Access type, choose Programmatic access, and then choose Next: Permissions.
5. Choose Attach existing policies directly.
6. Search by name for the policy that you created in the procedure above
(StockTradeStreamPolicy. Select the box to the left of the policy name, and then choose Next:
Review.
7. Review the details and summary, and then choose Create user.
8. Copy the Access key ID, and save it privately. Under Secret access key, choose Show, and save that key privately also.
9. Paste the access and secret keys to a local file in a safe place that only you can access. For this application, create a file named ~/.aws/credentials (with strict permissions). The file should be in the following format:
[default]
aws_access_key_id=access key
aws_secret_access_key=secret access key
To attach an IAM policy to a user
1. In the IAM console, open Policies and choose Policy Actions.
2. Choose StockTradeStreamPolicy and Attach.
3. Choose StockTradeStreamUser and Attach Policy.
Next Steps
Step 3: Download and Build the Code (p. 26)
Step 3: Download and Build the Code
This topic provides sample implementation code for the sample stock trades ingestion into the data stream (producer) and the processing of this data (consumer).
To download and build the code
1. Download the source code from the https://github.com/aws-samples/amazon-kinesis-learning GitHub repo to your computer.
2. Create a project in your IDE with the source code, adhering to the provided directory structure.
3. Add the following libraries to the project:
• Amazon Kinesis Client Library (KCL)
• AWS SDK
• Apache HttpCore
• Apache HttpClient
• Apache Commons Lang
• Apache Commons Logging
• Guava (Google Core Libraries For Java)
• Jackson Annotations
• Jackson Core
• Jackson Databind
• Jackson Dataformat: CBOR
Step 4: Implement the Producer
• Joda Time
4. Depending on your IDE, the project might be built automatically. If not, build the project using the appropriate steps for your IDE.
If you complete these steps successfully, you are now ready to move to the next section, the section called “Step 4: Implement the Producer” (p. 27).
Next Steps
(p. 27)
Step 4: Implement the Producer
This tutorial uses the real-world scenario of stock market trade monitoring. The following principles briefly explain how this scenario maps to the producer and its supporting code structure.
Refer to the source code and review the following information.
StockTrade class
An individual stock trade is represented by an instance of the StockTrade class. This instance contains attributes such as the ticker symbol, price, number of shares, the type of the trade (buy or sell), and an ID uniquely identifying the trade. This class is implemented for you.
Stream record
A stream is a sequence of records. A record is a serialization of a StockTrade instance in JSON format. For example:
{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator class
StockTradeGenerator has a method called getRandomTrade() that returns a new randomly generated stock trade every time it is invoked. This class is implemented for you.
StockTradesWriter class
The main method of the producer, StockTradesWriter continuously retrieves a random trade and then sends it to Kinesis Data Streams by performing the following tasks:
1. Reads the data stream name and region name as input.
2. Uses the KinesisAsyncClientBuilder to set the region, credentials, and client configuration.
3. Checks that the stream exists and is active (if not, it exits with an error).
4. In a continuous loop, calls the StockTradeGenerator.getRandomTrade() method and then the sendStockTrade method to send the trade to the stream every 100 milliseconds.
The sendStockTrade method of the StockTradesWriter class has the following code:
private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient,
Step 4: Implement the Producer
String streamName) {
byte[] bytes = trade.toJsonAsBytes();
// The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
if (bytes == null) {
LOG.warn("Could not get JSON bytes for stock trade");
return;
}
LOG.info("Putting trade: " + trade.toString());
PutRecordRequest request = PutRecordRequest.builder()
.partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
.streamName(streamName)
.data(SdkBytes.fromByteArray(bytes)) .build();
try {
kinesisClient.putRecord(request).get();
} catch (InterruptedException e) {
LOG.info("Interrupted, assuming shutdown.");
} catch (ExecutionException e) {
LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
} }
Refer to the following code breakdown:
• The PutRecord API expects a byte array, and you need to convert trade to JSON format. This single line of code performs that operation:
byte[] bytes = trade.toJsonAsBytes();
• Before you can send the trade, you create a new PutRecordRequest instance (called request in this case). Each request requires the stream name, partition key, and a data blob.
PutPutRecordRequest request = PutRecordRequest.builder()
.partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
.streamName(streamName)
.data(SdkBytes.fromByteArray(bytes)) .build();
The example uses a stock ticket as a partition key, which maps the record to a specific shard. In practice, you should have hundreds or thousands of partition keys per shard such that records are evenly dispersed across your stream. For more information about how to add data to a stream, see Writing Data to Amazon Kinesis Data Streams (p. 84).
Now request is ready to send to the client (the put operation):
kinesisClient.putRecord(request).get();
• Error checking and logging are always useful additions. This code logs error conditions:
Step 4: Implement the Producer
if (bytes == null) {
LOG.warn("Could not get JSON bytes for stock trade");
return;
}
Add the try/catch block around the put operation:
try {
kinesisClient.putRecord(request).get();
} catch (InterruptedException e) {
LOG.info("Interrupted, assuming shutdown.");
} catch (ExecutionException e) {
LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
}
This is because a Kinesis Data Streams put operation can fail because of a network error, or due to the data stream reaching its throughput limits and getting throttled. It is recommended that you carefully consider your retry policy for put operations to avoid data loss, such using as a simple retry.
• Status logging is helpful but optional:
LOG.info("Putting trade: " + trade.toString());
The producer shown here uses the Kinesis Data Streams API single record functionality, PutRecord.
In practice, if an individual producer generates many records, it is often more efficient to use the multiple records functionality of PutRecords and send batches of records at a time. For more information, see Writing Data to Amazon Kinesis Data Streams (p. 84).
To run the producer
1. Verify that the access key and secret key pair retrieved in Step 2: Create an IAM Policy and User (p. 23) are saved in the file ~/.aws/credentials.
2. Run the StockTradeWriter class with the following arguments:
StockTradeStream us-west-2
If you created your stream in a region other than us-west-2, you have to specify that region here instead.
You should see output similar to the following:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
Step 5: Implement the Consumer
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Your stock trades are now being ingested by Kinesis Data Streams.
Next Steps
Step 5: Implement the Consumer (p. 30)
Step 5: Implement the Consumer
The consumer application in this tutorial continuously processes the stock trades in your data stream. It then outputs the most popular stocks being bought and sold every minute. The application is built on top of the Kinesis Client Library (KCL), which does much of the heavy lifting common to consumer apps.
For more information, see Using the Kinesis Client Library (p. 113).
Refer to the source code and review the following information.
StockTradesProcessor class
Main class of the consumer, provided for you, which performs the following tasks:
• Reads the application, data stream, and region names, passed in as arguments.
• Creates a KinesisAsyncClient instance with the region name.
• Creates a StockTradeRecordProcessorFactory instance that serves instances of ShardRecordProcessor, implemented by a StockTradeRecordProcessor instance.
• Creates a ConfigsBuilder instance with the KinesisAsyncClient, StreamName,
ApplicationName and StockTradeRecordProcessorFactory instance. This is useful for creating all configurations with default values.
• Creates a KCL scheduler (previously, in KCL versions 1.x it was known as the KCL worker) with the ConfigsBuilder instance.
• The scheduler creates a new thread for each shard (assigned to this consumer instance), which continuously loops to read records from the data stream. It then invokes the StockTradeRecordProcessor instance to process each batch of records received.
StockTradeRecordProcessor class
Implementation of the StockTradeRecordProcessor instance, which in turn implements five required methods: initialize, processRecords, leaseLost, shardEnded, and shutdownRequested.
The initialize and shutdownRequested methods are used by the KCL to let the record processor know when it should be ready to start receiving records and when it should expect to stop receiving records, respectively, so it can perform any application-specific setup and termination tasks. leaseLost and shardEnded are used to implement any logic for what to do when a lease is lost or a processing has reached the end of a shard. In this example, we simply log messages indicating these events.
The code for these methods is provided for you. The main processing happens in the
processRecords method, which in turn uses processRecord for each record. This latter method is provided as the mostly empty skeleton code for you to implement in the next step, where it is explained in greater detail.
Also of note is the implementation of the support methods for processRecord: reportStats, and resetStats, which are empty in the original source code.
The processRecords method is implemented for you, and performs the following steps:
Step 5: Implement the Consumer
• For each record passed in, it calls processRecord on it.
• If at least 1 minute has elapsed since the last report, calls reportStats(), which prints out the latest stats, and then resetStats() which clears the stats so that the next interval includes only new records.
• Sets the next reporting time.
• If at least 1 minute has elapsed since the last checkpoint, calls checkpoint().
• Sets the next checkpointing time.
This method uses 60-second intervals for the reporting and checkpointing rate. For more information about checkpointing, see Using the Kinesis Client Library.
StockStats class
This class provides data retention and statistics tracking for the most popular stocks over time. This code is provided for you and contains the following methods:
• addStockTrade(StockTrade): injects the given StockTrade into the running statistics.
• toString(): returns the statistics in a formatted string.
This class keeps track of the most popular stock by keeping a running count of the total number of trades for each stock and the maximum count. It updates these counts whenever a stock trade arrives.
Add code to the methods of the StockTradeRecordProcessor class, as shown in the following steps.
To implement the consumer
1. Implement the processRecord method by instantiating a correctly sized StockTrade object and adding the record data to it, logging a warning if there's a problem.
byte[] arr = new byte[record.data().remaining()];
record.data().get(arr);
StockTrade trade = StockTrade.fromJsonAsBytes(arr);
if (trade == null) {
log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
return;
}
stockStats.addStockTrade(trade);
2. Implement a simple reportStats method. Feel free to modify the output format to suit your preferences.
System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******
\n" +
stockStats + "\n" +
"****************************************************************\n");
3. Implement the resetStats method, which creates a new stockStats instance.
stockStats = new StockStats();
4. Implement the following methods required by ShardRecordProcessor interface
Step 5: Implement the Consumer
@Override
public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating.");
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) { try {
log.info("Reached shard end checkpointing.");
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at shard end. Giving up.", e);
} }
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing.");
checkpoint(shutdownRequestedInput.checkpointer());
}
private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId);
try {
checkpointer.checkpoint();
} catch (ShutdownException se) {
// Ignore checkpoint if the processor instance has been shutdown (fail over).
log.info("Caught shutdown exception, skipping checkpoint.", se);
} catch (ThrottlingException e) {
// Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
log.error("Caught throttling exception, skipping checkpoint.", e);
} catch (InvalidStateException e) {
// This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
}
}
To run the consumer
1. Run the producer that you wrote in (p. 27) to inject simulated stock trade records into your stream.
2. Verify that the access key and secret key pair retrieved earlier (when creating the IAM user) are saved in the file ~/.aws/credentials.
3. Run the StockTradesProcessor class with the following arguments:
StockTradesProcessor StockTradeStream us-west-2
Note that if you created your stream in a region other than us-west-2, you have to specify that
Note that if you created your stream in a region other than us-west-2, you have to specify that