When you create your application, Kinesis Data Analytics creates the following Amazon CloudWatch resources if they don't already exist:
• A log group called /aws/kinesis-analytics-java/MyApplication.
• A log stream called kinesis-analytics-log-stream.
Write Sample Records to the Input Stream
In this section, you use a Python script to write sample records to the Amazon MSK topic for the application to process.
1. Connect to the client instance you created in Step 4: Create a Client Machine of the Getting Started Using Amazon MSK tutorial.
2. Install Python3, Pip, and the Kafka Python library:
$ sudo yum install python37
$ curl -O https://bootstrap.pypa.io/get-pip.py
$ python3 get-pip.py --user
$ pip install kafka-python
3. Create a file named stock.py with the following contents. Replace the BROKERS value with your bootstrap broker list you recorded previously.
from kafka import KafkaProducer import json
import random
from datetime import datetime BROKERS = "<Bootstrap Brokers List>"
producer = KafkaProducer(
bootstrap_servers=BROKERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500,
request_timeout_ms=20000, security_protocol='PLAINTEXT')
def getStock():
data = {}
now = datetime.now()
str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now
data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100
data['price'] = round(price, 2) return data
Download and Examine the Apache Flink Streaming Java Code while True:
data =getStock() # print(data) try:
future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush()
record_metadata = future.get(timeout=10)
print("sent event to Kafka! topic {} partition {} offset
{}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e:
print(e.with_traceback())
4. Later in the tutorial, you run the stock.py script to send data to the application.
$ python3 stock.py
Download and Examine the Apache Flink Streaming Java Code
The Java application code for this example is available from GitHub.
To download the Java application code
1. Clone the remote repository using the following command:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples 2. Navigate to the amazon-kinesis-data-analytics-java-examples/GettingStartedTable
directory.
Note the following about the application code:
• A Project Object Model (pom.xml) file contains information about the application's configuration and dependencies, including the Kinesis Data Analytics libraries.
• The StreamingJob.java file contains the main method that defines the application's functionality.
• The application uses a FlinkKafkaConsumer to read from the Amazon MSK topic. The following snippet creates a FlinkKafkaConsumer object:
final FlinkKafkaConsumer<StockRecord> consumer = new
FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProps);
• Your application creates source and sink connectors to access external resources using StreamExecutionEnvironment and TableEnvironment objects.
• The application creates source and sink connectors using dynamic application properties, so you can specify your application parameters (such as your S3 bucket) without recompiling the code.
//read the parameters from the Kinesis Analytics environment Map<String, Properties> applicationProperties =
KinesisAnalyticsRuntime.getApplicationProperties();
Properties flinkProperties = null;
String kafkaTopic = parameter.get("kafka-topic", "AWSKafkaTutorialTopic");
String brokers = parameter.get("brokers", "");
String s3Path = parameter.get("s3Path", "");
Compile the Application Code if (applicationProperties != null) {
flinkProperties = applicationProperties.get("FlinkApplicationProperties");
}
if (flinkProperties != null) {
kafkaTopic = flinkProperties.get("kafka-topic").toString();
brokers = flinkProperties.get("brokers").toString();
s3Path = flinkProperties.get("s3Path").toString();
}
For more information about runtime properties, see Runtime Properties (p. 21).
Compile the Application Code
In this section, you use the Apache Maven compiler to create the Java code for the application. For information about installing Apache Maven and the Java Development Kit (JDK), see Prerequisites for Completing the Exercises (p. 77).
To compile the application code
1. To use your application code, you compile and package it into a JAR file. You can compile and package your code in one of two ways:
• Use the command-line Maven tool. Create your JAR file by running the following command in the directory that contains the pom.xml file:
mvn package -Dflink.version=1.13.2
• Use your development environment. See your development environment documentation for details.
NoteThe provided source code relies on libraries from Java 11.
You can either upload your package as a JAR file, or you can compress your package and upload it as a ZIP file. If you create your application using the AWS CLI, you specify your code content type (JAR or ZIP).
2. If there are errors while compiling, verify that your JAVA_HOME environment variable is correctly set.
If the application compiles successfully, the following file is created:
target/aws-kinesis-analytics-java-apps-1.0.jar
Upload the Apache Flink Streaming Java Code
In this section, you create an Amazon S3 bucket and upload your application code.
To upload the application code
1. Open the Amazon S3 console at https://console.aws.amazon.com/s3/.
2. Choose Create bucket.
3. Enter ka-app-code-<username> in the Bucket name field. Add a suffix to the bucket name, such as your user name, to make it globally unique. Choose Next.
4. In the Configure options step, keep the settings as they are, and choose Next.
5. In the Set permissions step, keep the settings as they are, and choose Next.
Create and Run the Kinesis Data Analytics Application 6. Choose Create bucket.
7. In the Amazon S3 console, choose the ka-app-code-<username> bucket, and choose Upload.
8. In the Select files step, choose Add files. Navigate to the aws-kinesis-analytics-java-apps-1.0.jar file that you created in the previous step. Choose Next.
9. You don't need to change any of the settings for the object, so choose Upload.
Your application code is now stored in an Amazon S3 bucket where your application can access it.
Create and Run the Kinesis Data Analytics Application
Follow these steps to create, configure, update, and run the application using the console.
Create the Application
1. Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.
2. On the Kinesis Data Analytics dashboard, choose Create analytics application.
3. On the Kinesis Analytics - Create application page, provide the application details as follows:
• For Application name, enter MyApplication.
• For Description, enter My java test app.
• For Runtime, choose Apache Flink.
• Leave the version as Apache Flink version 1.13.2 (Recommended version).
4. For Access permissions, choose Create / update IAM role kinesis-analytics-MyApplication-us-west-2.
5. Choose Create application.
Note
When you create a Kinesis Data Analytics application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:
• Policy: kinesis-analytics-service-MyApplication-us-west-2
• Role: kinesis-analytics-MyApplication-us-west-2
Edit the IAM Policy
Edit the IAM policy to add permissions to access the Amazon S3 bucket.
To edit the IAM policy to add S3 bucket permissions
1. Open the IAM console at https://console.aws.amazon.com/iam/.
2. Choose Policies. Choose the kinesis-analytics-service-MyApplication-us-west-2 policy that the console created for you in the previous section.
3. On the Summary page, choose Edit policy. Choose the JSON tab.
4. Add the highlighted section of the following policy example to the policy.
{
"Version": "2012-10-17", "Statement": [
{
"Sid": "ReadCode",
Create and Run the Kinesis Data Analytics Application
"arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/
MyApplication:log-stream:*"
"arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/
MyApplication:log-stream:kinesis-analytics-log-stream"
Create and Run the Kinesis Data Analytics Application
Configure the Application
Use the following procedure to configure the application.
To configure the application
1. On the MyApplication page, choose Configure.
2. On the Configure application page, provide the Code location:
• For Amazon S3 bucket, enter ka-app-code-<username>.
• For Path to Amazon S3 object, enter aws-kinesis-analytics-java-apps-1.0.jar.
3. Under Access to application resources, for Access permissions, choose Create / update IAM role kinesis-analytics-MyApplication-us-west-2.
4. Under Properties, choose Create group. For Group ID, enter FlinkApplicationProperties.
5. Enter the following application properties and values:
Key Value
kafka-topic AWSKafkaTutorialTopic
brokers Your Amazon MSK cluster's Bootstrap
Brokers list
s3Path ka-app-<username>
security.protocol SSL
ssl.truststore.location
/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
ssl.truststore.password changeit
6. Under Monitoring, ensure that the Monitoring metrics level is set to Application.
7. For CloudWatch logging, select the Enable check box.
8. In the Virtual Private Cloud (VPC) section, choose VPC configuration based on Amazon MSK cluster. Choose AWSKafkaTutorialCluster.
9. Choose Update.
NoteWhen you choose to enable Amazon CloudWatch logging, Kinesis Data Analytics creates a log group and log stream for you. The names of these resources are as follows:
• Log group: /aws/kinesis-analytics/MyApplication
• Log stream: kinesis-analytics-log-stream
Run the Application
Use the following procedure to run the application.
To run the application
1. On the MyApplication page, choose Run. Confirm the action.
2. When the application is running, refresh the page. The console shows the Application graph.
Next Step
3. From your Amazon EC2 client, run the Python script you created previously to write records to the Amazon MSK cluster for your application to process:
$ python3 stock.py
Stop the Application
To stop the application, on the MyApplication page, choose Stop. Confirm the action.
Next Step
Clean Up AWS Resources (p. 103)
Clean Up AWS Resources
This section includes procedures for cleaning up AWS resources created in the Getting Started (Table API) tutorial.
This topic contains the following sections.
• Delete Your Kinesis Data Analytics Application (p. 103)