Amazon Kinesis Data Analytics
Amazon Kinesis Data
Analytics Developer Guide
Amazon Kinesis Data Analytics: Amazon Kinesis Data Analytics Developer Guide
Copyright © Amazon Web Services, Inc. and/or its affiliates. All rights reserved.
Amazon's trademarks and trade dress may not be used in connection with any product or service that is not Amazon's, in any manner that is likely to cause confusion among customers, or in any manner that disparages or discredits Amazon. All other trademarks not owned by Amazon are the property of their respective owners, who may or may not be affiliated with, connected to, or sponsored by Amazon.
Table of Contents
What Is Kinesis Data Analytics for Apache Flink? ... 1
Getting Started ... 1
How It Works ... 2
Programming Your Apache Flink Application ... 2
DataStream API ... 2
Table API ... 3
Creating Your Kinesis Data Analytics Application ... 3
Creating Applications ... 3
Building your Kinesis Data Analytics Application Code ... 4
Creating your Kinesis Data Analytics Application ... 5
Starting your Kinesis Data Analytics Application ... 5
Verifying your Kinesis Data Analytics Application ... 6
Using Apache Beam ... 6
Running Applications ... 7
Application and Job Status ... 7
Batch workloads ... 8
Application Resources ... 8
Kinesis Data Analytics Application Resources ... 8
Apache Flink Application Resources ... 8
DataStream API ... 9
DataStream API Connectors ... 9
DataStream API Operators ... 14
DataStream API Timestamps ... 15
Table API ... 15
Table API Connectors ... 15
Table API Time Attributes ... 16
Using Python ... 17
Programming an Application ... 17
Creating an Application ... 19
Monitoring ... 20
Runtime Properties ... 21
Working with Runtime Properties in the Console ... 21
Working with Runtime Properties in the CLI ... 21
Accessing Runtime Properties in a Kinesis Data Analytics Application ... 23
Fault Tolerance ... 24
Configuring Checkpointing ... 24
Checkpointing API Examples ... 25
Snapshots ... 27
Scaling ... 30
Configuring Application Parallelism and ParallelismPerKPU ... 30
Allocating Kinesis Processing Units ... 31
Updating Your Application's Parallelism ... 31
Automatic Scaling ... 32
Tagging ... 33
Adding Tags when an Application is Created ... 34
Adding or Updating Tags for an Existing Application ... 34
Listing Tags for an Application ... 34
Removing Tags from an Application ... 35
Apache Flink Dashboard ... 35
Accessing Your Application's Apache Flink Dashboard ... 36
Studio notebooks ... 38
Creating a Studio notebook ... 39
Interactive analysis of streaming data ... 39
Flink interpreters ... 40
Apache Flink table environment variables ... 40
Deploying as an application with durable state ... 41
Scala/Python criteria ... 42
SQL criteria ... 42
IAM Permissions ... 42
Connectors and dependencies ... 43
Default connectors ... 43
Dependencies and custom connectors ... 44
User-Defined Functions ... 44
Enabling Checkpointing ... 45
Setting the checkpointing interval ... 45
Setting the checkpointing type ... 45
Working with AWS Glue ... 45
Table properties ... 46
Examples and Tutorials ... 47
Creating a Studio notebook Tutorial ... 47
Deploying as an Application with Durable State Tutorial ... 61
Examples ... 64
Studio Notebooks vs. SQL Applications ... 72
Troubleshooting ... 72
Stopping a stuck application ... 72
Canceling jobs ... 72
Restarting the Apache Flink interpreter ... 73
Appendix: Creating custom IAM policies ... 73
AWS Glue ... 73
CloudWatch Logs ... 74
Kinesis streams ... 75
Amazon MSK clusters ... 76
Getting Started (DataStream API) ... 77
Application Components ... 77
Prerequisites ... 77
Step 1: Set Up an Account ... 78
Sign Up for AWS ... 78
Create an IAM User ... 78
Next Step ... 80
Step 2: Set Up the AWS CLI ... 80
Next Step ... 81
Step 3: Create an Application ... 81
Create Two Amazon Kinesis Data Streams ... 81
Write Sample Records to the Input Stream ... 82
Download and Examine the Apache Flink Streaming Java Code ... 83
Compile the Application Code ... 83
Upload the Apache Flink Streaming Java Code ... 84
Create and Run the Kinesis Data Analytics Application ... 84
Next Step ... 92
Step 4: Clean Up ... 92
Delete Your Kinesis Data Analytics Application ... 92
Delete Your Kinesis Data Streams ... 92
Delete Your Amazon S3 Object and Bucket ... 92
Delete Your IAM Resources ... 93
Delete Your CloudWatch Resources ... 93
Next Step ... 93
Step 5: Next Steps ... 93
Getting Started (Table API) ... 95
Application Components ... 95
Prerequisites ... 95
Create an Application ... 96
Create Dependent Resources ... 96
Write Sample Records to the Input Stream ... 97
Download and Examine the Apache Flink Streaming Java Code ... 98
Compile the Application Code ... 99
Upload the Apache Flink Streaming Java Code ... 99
Create and Run the Kinesis Data Analytics Application ... 100
Next Step ... 103
Clean Up ... 103
Delete Your Kinesis Data Analytics Application ... 103
Delete Your Amazon MSK Cluster ... 103
Delete Your VPC ... 103
Delete Your Amazon S3 Objects and Bucket ... 104
Delete Your IAM Resources ... 104
Delete Your CloudWatch Resources ... 104
Next Step ... 104
Next Steps ... 104
Getting Started (Python) ... 105
Application Components ... 105
Prerequisites ... 105
Create an Application ... 106
Create Dependent Resources ... 106
Write Sample Records to the Input Stream ... 107
Create and Examine the Apache Flink Streaming Python Code ... 108
Upload the Apache Flink Streaming Python Code ... 109
Create and Run the Kinesis Data Analytics Application ... 110
Next Step ... 113
Clean Up ... 113
Delete Your Kinesis Data Analytics Application ... 113
Delete Your Kinesis Data Streams ... 113
Delete Your Amazon S3 Objects and Bucket ... 114
Delete Your IAM Resources ... 114
Delete Your CloudWatch Resources ... 114
Examples ... 115
DataStream API Examples ... 115
Tumbling Window ... 115
Sliding Window ... 122
S3 Sink ... 128
MSK Replication ... 137
EFO Consumer ... 141
Kinesis Data Firehose Sink ... 148
Cross-Account ... 159
Custom Truststore ... 165
Apache Beam ... 170
Python Examples ... 176
Tumbling Window ... 177
Sliding Window ... 183
S3 Sink ... 190
Kinesis Data Analytics Solutions ... 197
AWS Streaming Data Solution ... 198
Clickstream Lab ... 198
Custom Scaling ... 198
CloudWatch Dashboard ... 198
Amazon MSK ... 198
More Kinesis Data Analytics Solutions on GitHub ... 199
Security ... 200
Data Protection ... 200
Data Encryption ... 200
Identity and Access Management ... 201
Trust Policy ... 202
Permissions Policy ... 202
Monitoring ... 204
Compliance Validation ... 204
Resilience ... 204
Disaster Recovery ... 205
Versioning ... 205
Infrastructure Security ... 205
Security Best Practices ... 206
Implement least privilege access ... 206
Use IAM roles to access other Amazon services ... 206
Implement Server-Side Encryption in Dependent Resources ... 206
Use CloudTrail to Monitor API Calls ... 206
Logging and Monitoring ... 208
Setting Up Logging ... 208
Setting Up CloudWatch Logging Using the Console ... 209
Setting Up CloudWatch Logging Using the CLI ... 210
Application Monitoring Levels ... 213
Logging Best Practices ... 214
Logging Troubleshooting ... 214
Next Step ... 215
Analyzing Logs ... 215
Run a Sample Query ... 215
Example Queries ... 216
Metrics and Dimensions ... 217
Application Metrics ... 218
Kinesis Data Streams Connector Metrics ... 224
Amazon MSK Connector Metrics ... 225
Apache Zeppelin Metrics ... 226
Viewing CloudWatch Metrics ... 226
Metrics ... 227
Custom Metrics ... 228
Alarms ... 231
Writing Custom Messages ... 235
Write to CloudWatch Logs Using Log4J ... 235
Write to CloudWatch Logs Using SLF4J ... 236
Using AWS CloudTrail ... 237
Kinesis Data Analytics Information in CloudTrail ... 237
Understanding Kinesis Data Analytics Log File Entries ... 238
Performance ... 240
Troubleshooting Performance ... 240
The Data Path ... 240
Performance Troubleshooting Solutions ... 240
Performance Best Practices ... 242
Manage scaling properly ... 242
Monitor external dependency resource usage ... 243
Run your Apache Flink application locally ... 244
Monitoring Performance ... 244
Performance Monitoring using CloudWatch Metrics ... 244
Performance Monitoring using CloudWatch Logs and Alarms ... 244
Quota ... 245
Maintenance ... 246
Best Practices ... 248
Fault tolerance: checkpoints and savepoints ... 248
Performance and parallelism ... 248
Logging ... 249
Coding ... 249
Studio notebook refresh interval ... 249
Studio notebook optimum performance ... 249
Earlier Versions ... 251
Using the Apache Flink Kinesis Streams Connector with previous Apache Flink versions ... 251
Building Applications with Apache Flink 1.8.2 ... 252
Building Applications with Apache Flink 1.6.2 ... 253
Upgrading Applications ... 253
Available Connectors in Apache Flink 1.6.2 and 1.8.2 ... 254
Getting Started: Flink 1.11.3 ... 254
Application Components ... 254
Prerequisites ... 255
Step 1: Set Up an Account ... 255
Step 2: Set Up the AWS CLI ... 257
Step 3: Create an Application ... 258
Step 4: Clean Up ... 269
Step 5: Next Steps ... 270
Getting Started: Flink 1.8.2 ... 271
Application Components ... 77
Prerequisites ... 272
Step 1: Set Up an Account ... 272
Step 2: Set Up the AWS CLI ... 274
Step 3: Create an Application ... 275
Step 4: Clean Up ... 286
Getting Started: Flink 1.6.2 ... 287
Application Components ... 288
Prerequisites ... 288
Step 1: Set Up an Account ... 288
Step 2: Set Up the AWS CLI ... 291
Step 3: Create an Application ... 292
Step 4: Clean Up ... 302
Flink Settings ... 304
Apache Flink Configuration ... 304
State Backend ... 304
Checkpointing ... 304
Savepointing ... 305
Heap Sizes ... 305
Using an Amazon VPC ... 306
Amazon VPC Concepts ... 306
VPC Application Permissions ... 307
Permissions Policy for Accessing an Amazon VPC ... 307
Internet and Service Access ... 307
Related Information ... 308
VPC API ... 308
CreateApplication ... 308
AddApplicationVpcConfiguration ... 309
DeleteApplicationVpcConfiguration ... 309
UpdateApplication ... 310
Example: Using a VPC ... 310
Troubleshooting ... 311
Development Troubleshooting ... 311
Enabling Flamegraps ... 311
Issue with EFO connector 1.13.2 ... 311
Compile Error: "Could not resolve dependencies for project" ... 311
Invalid Choice: "kinesisanalyticsv2" ... 312
UpdateApplication Action Isn't Reloading Application Code ... 312
Runtime Troubleshooting ... 312
Troubleshooting Tools ... 312
Application Issues ... 312
Application is Restarting ... 315
Throughput is Too Slow ... 317
Application State Data is Accumulating ... 318
Checkpointing Timing Out ... 318
Document History ... 320
API Example Code ... 323
AddApplicationCloudWatchLoggingOption ... 323
AddApplicationInput ... 324
AddApplicationInputProcessingConfiguration ... 324
AddApplicationOutput ... 325
AddApplicationReferenceDataSource ... 325
AddApplicationVpcConfiguration ... 326
CreateApplication ... 326
CreateApplicationSnapshot ... 327
DeleteApplication ... 327
DeleteApplicationCloudWatchLoggingOption ... 327
DeleteApplicationInputProcessingConfiguration ... 328
DeleteApplicationOutput ... 328
DeleteApplicationReferenceDataSource ... 328
DeleteApplicationSnapshot ... 328
DeleteApplicationVpcConfiguration ... 328
DescribeApplication ... 329
DescribeApplicationSnapshot ... 329
DiscoverInputSchema ... 329
ListApplications ... 330
ListApplicationSnapshots ... 330
StartApplication ... 330
StopApplication ... 330
UpdateApplication ... 331
API Reference ... 332
Getting Started
What Is Amazon Kinesis Data Analytics for Apache Flink?
With Amazon Kinesis Data Analytics for Apache Flink, you can use Java, Scala, or SQL to process and analyze streaming data. The service enables you to author and run code against streaming sources to perform time-series analytics, feed real-time dashboards, and create real-time metrics.
You can build Java and Scala applications in Kinesis Data Analytics using open-source libraries based on Apache Flink. Apache Flink is a popular framework and engine for processing data streams.
NoteAlthough Kinesis Data Analytics supports Apache Flink applications written in Scala version 2.12, this guide only contains code examples written in Java.
Kinesis Data Analytics provides the underlying infrastructure for your Apache Flink applications. It handles core capabilities like provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots). You can use the high-level Flink programming features (such as operators, functions, sources, and sinks) in the same way that you use them when hosting the Flink infrastructure yourself.
Getting Started
You can start by creating a Kinesis Data Analytics application that continuously reads and processes streaming data. Then, author your code using your IDE of choice, and test it with live streaming data. You can also configure destinations where you want Kinesis Data Analytics to send the results.
To get started, we recommend that you read the following sections:
• Kinesis Data Analytics for Apache Flink: How It Works (p. 2)
• Getting Started with Amazon Kinesis Data Analytics for Apache Flink (DataStream API) (p. 77)
Programming Your Apache Flink Application
Kinesis Data Analytics for Apache Flink: How It Works
Kinesis Data Analytics for Apache Flink is a fully managed Amazon service that enables you to use an Apache Flink application to process streaming data.
Programming Your Apache Flink Application
An Apache Flink application is a Java or Scala application that is created with the Apache Flink framework. You author and build your Apache Flink application locally.
Applications primarily use either the DataStream API or the Table API. The other Apache Flink APIs are also available for you to use, but they are less commonly used in building streaming applications.
The features of the two APIs are as follows:
DataStream API
The Apache Flink DataStream API programming model is based on two components:
• Data stream: The structured representation of a continuous flow of data records.
• Transformation operator: Takes one or more data streams as input, and produces one or more data streams as output.
Applications created with the DataStream API do the following:
• Read data from a Data Source (such as a Kinesis stream or Amazon MSK topic).
• Apply transformations to the data, such as filtering, aggregation, or enrichment.
• Write the transformed data to a Data Sink.
Applications that use the DataStream API can be written in Java or Scala, and can read from a Kinesis data stream, a Amazon MSK topic, or a custom source.
Your application processes data by using a connector. Apache Flink uses the following types of connectors:
• Source: A connector used to read external data.
• Sink: A connector used to write to external locations.
• Operator: A connector used to process data within the application.
A typical application consists of at least one data stream with a source, a data stream with one or more operators, and at least one data sink.
For more information about using the DataStream API, see DataStream API (p. 9).
Table API
Table API
The Apache Flink Table API programming model is based on the following components:
• Table Environment: An interface to underlying data that you use to create and host one or more tables.
• Table: An object providing access to a SQL table or view.
• Table Source: Used to read data from an external source, such as an Amazon MSK topic.
• Table Function: A SQL query or API call used to transform data.
• Table Sink: Used to write data to an external location, such as an Amazon S3 bucket.
Applications created with the Table API do the following:
• Create a TableEnvironment by connecting to a Table Source.
• Create a table in the TableEnvironment using either SQL queries or Table API functions.
• Run a query on the table using either Table API or SQL
• Apply transformations on the results of the query using Table Functions or SQL queries.
• Write the query or function results to a Table Sink.
Applications that use the Table API can be written in Java or Scala, and can query data using either API calls or SQL queries.
For more information about using the Table API, see Table API (p. 15).
Creating Your Kinesis Data Analytics Application
A Kinesis Data Analytics application is an AWS resource that is hosted by the Kinesis Data Analytics service. Your Kinesis Data Analytics application hosts your Apache Flink application and provides it with the following settings:
• Runtime Properties (p. 21): Parameters that you can provide to your application. You can change these parameters without recompiling your application code.
• Fault Tolerance (p. 24): How your application recovers from interrupts and restarts.
• Logging and Monitoring (p. 208): How your application logs events to CloudWatch Logs.
• Scaling (p. 30): How your application provisions computing resources.
You create your Kinesis Data Analytics application using either the console or the AWS CLI. To get started creating a Kinesis Data Analytics application, see Getting Started (DataStream API) (p. 77).
Creating a Kinesis Data Analytics for Apache Flink Application
This topic contains information about creating a Kinesis Data Analytics for Apache Flink application.
This topic contains the following sections:
• Building your Kinesis Data Analytics Application Code (p. 4)
Building your Kinesis Data Analytics Application Code
• Creating your Kinesis Data Analytics Application (p. 5)
• Starting your Kinesis Data Analytics Application (p. 5)
• Verifying your Kinesis Data Analytics Application (p. 6)
• Creating Kinesis Data Analytics applications with Apache Beam (p. 6)
Building your Kinesis Data Analytics Application Code
This section describes the components you use to build the application code for your Kinesis Data Analytics application.
We recommend that you use the latest supported version of Apache Flink for your application code.
The latest version of Apache Flink that Kinesis Data Analytics supports is 1.13.2. For information about upgrading Kinesis Data Analytics applications, see Upgrading Applications (p. 253).
You build your application code using Apache Maven. An Apache Maven project uses a pom.xml file to specify the versions of components that it uses.
NoteKinesis Data Analytics supports JAR files up to 512 MB in size. If you use a JAR file larger than this, your application will fail to start.
Use the following component versions for Kinesis Data Analytics applications:
Component Version
Java 11 (recommended)
Scala 2.12
Kinesis Data Analytics for Flink Runtime (aws-
kinesisanalytics-runtime) 1.2.0
Kinesis Data Analytics Flink Connectors (aws-
kinesisanalytics-flink) 2.0.0
AWS Kinesis Connector (flink-connector-kinesis) 1.13.2
Apache Beam (Beam Applications Only) 2.33.0, with Jackson version 2.12.2
For an example of a pom.xml file for a Kinesis Data Analytics application that uses Apache Flink version 1.13.2, see the Kinesis Data Analytics Getting Started Application.
For information about creating a Kinesis Data Analytics application that uses Apache Beam, see Using Apache Beam (p. 6).
Specifying your Application's Apache Flink Version
When using Kinesis Data Analytics for Flink Runtime version 1.1.0 and later, you specify the version of Apache Flink that your application uses when you compile your application. You provide the version of Apache Flink with the -Dflink.version parameter as follows:
mvn package -Dflink.version=1.13.2
For building applications with older versions of Apache Flink, see Earlier Versions (p. 251).
Creating your Kinesis Data Analytics Application
Creating your Kinesis Data Analytics Application
Once you have built your application code, you do the following to create your Kinesis Data Analytics application:
• Upload your Application code: Upload your application code to an Amazon S3 bucket. You specify the S3 bucket name and object name of your application code when you create your application. For a tutorial that shows how to upload your application code, see the section called “Upload the Apache Flink Streaming Java Code” (p. 84) in the Getting Started (DataStream API) (p. 77) tutorial.
• Create your Kinesis Data Analytics application: Use one of the following methods to create your Kinesis Data Analytics application:
• Create your Kinesis Data Analytics application using the AWS console: You can create and configure your application using the AWS console.
When you create your application using the console, your application's dependent resources (such as CloudWatch Logs streams, IAM roles, and IAM policies) are created for you.
When you create your application using the console, you specify what version of Apache Flink your application uses by selecting it from the pull-down on the Kinesis Analytics - Create application page.
For a tutorial about how to use the console to create an application, see the section called “Create and Run the Application (Console)” (p. 84) in the Getting Started (DataStream API) (p. 77)
tutorial.
• Create your Kinesis Data Analytics application using the AWS CLI: You can create and configure your application using the AWS CLI.
When you create your application using the CLI, you must also create your application's dependent resources (such as CloudWatch Logs streams, IAM roles, and IAM policies) manually.
When you create your application using the CLI, you specify what version of Apache Flink your application uses by using the RuntimeEnvironment parameter of the CreateApplication action.
For a tutorial about how to use the CLI to create an application, see the section called “Create and Run an Application Using the CLI” (p. 87) in the Getting Started (DataStream API) (p. 77)
tutorial.
NoteYou cannot change the RuntimeEnvironment of an existing application. If you need to change the RuntimeEnvironment of an existing application, you must delete the application and create it again.
Starting your Kinesis Data Analytics Application
After you have built your application code, uploaded it to S3, and created your Kinesis Data Analytics application, you then start your application. Starting a Kinesis Data Analytics application typically takes several minutes.
Use one of the following methods to start your application:
• Start your Kinesis Data Analytics application using the AWS console: You can run your application by choosing Run on your application's page in the AWS console.
• Start your Kinesis Data Analytics application using the AWS API: You can run your application using
Verifying your Kinesis Data Analytics Application
Verifying your Kinesis Data Analytics Application
You can verify that your application is working in the following ways:
• Using CloudWatch Logs: You can use CloudWatch Logs and CloudWatch Logs Insights to verify that your application is running properly. For information about using CloudWatch Logs with your Kinesis Data Analytics application, see Logging and Monitoring (p. 208).
• Using CloudWatch Metrics: You can use CloudWatch Metrics to monitor your application's activity, or activity in the resources your application uses for input or output (such as Kinesis streams, Kinesis Data Firehose delivery streams, or Amazon S3 buckets.) For more information about CloudWatch metrics, see Working with Metrics in the Amazon CloudWatch User Guide.
• Monitoring Output Locations: If your application writes output to a location (such as an Amazon S3 bucket or database), you can monitor that location for written data.
Creating Kinesis Data Analytics applications with Apache Beam
You can use the Apache Beam framework with your Kinesis Data Analytics application to process streaming data. Kinesis Data Analytics applications that use Apache Beam use Apache Flink runner to execute Beam pipelines.
For a tutorial about how to use Apache Beam in a Kinesis Data Analytics application, see Apache Beam (p. 170).
This topic contains the following sections:
• Using Apache Beam with Kinesis Data Analytics (p. 6)
• Beam Capabilities (p. 6)
Using Apache Beam with Kinesis Data Analytics
Note the following about using the Apache Flink runner with Kinesis Data Analytics:
• Apache Beam metrics are not viewable in the Kinesis Data Analytics console.
• Apache Beam is only supported with Kinesis Data Analytics applications that use Apache Flink version 1.8 and above. Apache Beam is not supported with Kinesis Data Analytics applications that use Apache Flink version 1.6.
Beam Capabilities
Kinesis Data Analytics supports the same Apache Beam capabilties as the Apache Flink runner.
For information about what features are supported with the Apache Flink runner, see the Beam Compatibility Matrix.
We recommend that you test your Apache Flink application in the Kinesis Data Analytics service to verify that we support all the features that your application needs.
Running Applications
Running a Kinesis Data Analytics for Apache Flink Application
This topic contains information about running a Kinesis Data Analytics for Apache Flink application.
When you run your Kinesis Data Analytics application, the Kinesis Data Analytics service creates an Apache Flink job. An Apache Flink job is the execution lifecycle of your Kinesis Data Analytics application.
The execution of the job, and the resources it uses, are managed by the Job Manager. The Job Manager separates the execution of the application into tasks. Each task is managed by a Task Manager. When you monitor your application's performance, you can examine the performance of each Task Manager, or of the Job Manager as a whole.
For information about Apache Flink jobs, see Jobs and Scheduling in the Apache Flink Documentation.
Application and Job Status
Both your application and the application's job have a current execution status:
• Application status: Your application has a current status that describes its phase of execution.
Application statuses include the following:
• Steady application statuses: Your application typically stays in these statuses until you make a status change:
• READY: A new or stopped application is in the READY status until you run it.
• RUNNING: An application that has successfully started is in the RUNNING status.
• Transient application statuses: An application in these statuses is typically in the process of transitioning to another status. If an application stays in a transient status for a length of time, you can stop the application using the StopApplication action with the Force parameter set to true.
These statuses include the following:
• STARTING: Occurs after the StartApplication action. The application is transitioning from the READY to the RUNNING status.
• STOPPING: Occurs after the StopApplication action. The application is transitioning from the RUNNING to the READY status.
• DELETING: Occurs after the DeleteApplication action. The application is in the process of being deleted.
• UPDATING: Occurs after the UpdateApplication action. The application is updating, and will transition back to the RUNNING or READY status.
• AUTOSCALING: The application has the AutoScalingEnabled property of the ParallelismConfiguration set to true, and the service is increasing the parallelism of the application. When the application is in this status, the only valid API action you can use is the StopApplication action with the Force parameter set to true. For information about automatic scaling, see Automatic Scaling (p. 32).
• FORCE_STOPPING: Occurs after the StopApplication action is called with the Force parameter set to true. The application is in the process of being force stopped. The application transitions from the STARTING, UPDATING, STOPPING, or AUTOSCALING status to the READY status.
• ROLLING_BACK: Occurs after the RollbackApplication action is called. The application is in the process of being rolled back to a previous version. The application transitions from the UPDATING or AUTOSCALING status to the RUNNING status.
• ROLLED_BACK: When you successfully roll back an application, this becomes the status of the version that you rolled back from. For information about rolling back an application, see RollbackApplication.
• MAINTENANCE: Occurs while Kinesis Data Analytics applies patches to your application. For more information, see Maintenance (p. 246).
Batch workloads
You can check your application's status using the console, or by using the DescribeApplication action.
• Job status: When your application is in the RUNNING status, your job has a status that describes its current execution phase. A job starts in the CREATED status, and then proceeds to the RUNNING status when it has started. If error conditions occur, your application enters the following status:
• For applications using Apache Flink 1.11 and later, your application enters the RESTARTING status.
• For applications using Apache Flink 1.8 and prior, your application enters the FAILING status.
The application then proceeds to either the RESTARTING or FAILED status, depending on whether the job can be restarted.
You can check the job's status by examining your application's CloudWatch log for status changes.
Batch workloads
Kinesis Data Analytics supports running Apache Flink batch workloads. In a batch job, when an Apache Flink job gets to the FINISHED status, Kinesis Data Analytics application status is set to READY. For more information about Flink job statuses, see Jobs and Scheduling.
Application Resources
This section describes the system resources that your application uses. Understanding how Kinesis Data Analytics provisions and uses resources will help you design, create, and maintain a performant and stable Kinesis Data Analytics application.
Kinesis Data Analytics Application Resources
Kinesis Data Analytics is an AWS service that creates an environment for hosting your Apache Flink application. The Kinesis Data Analytics service provides resources using units called Kinesis Processing Units (KPUs).
One KPU represents the following system resources:
• One CPU core
• 4 GB of memory, of which one GB is native memory and three GB are heap memory
• 50 GB of disk space
KPUs run applications in distinct execution units called tasks and subtasks. You can think of a subtask as the equivalent of a thread.
The number of KPUs available to an application is equal to the application's Parallelism setting, divided by the application's ParallelismPerKPU setting.
For more information about application parallelism, see Scaling (p. 30).
Apache Flink Application Resources
The Apache Flink environment allocates resources for your application using units called task slots.
When Kinesis Data Analytics allocates resources for your application, it assigns one or more Apache Flink task slots to a single KPU. The number of slots assigned to a single KPU is equal to your application's ParallelismPerKPU setting. For more information about task slots, see Job Scheduling in the Apache Flink documentation.
DataStream API
Operator Parallelism
You can set the maximum number of subtasks that an operator can use. This value is called Operator Parallelism. By default, the parallelism of each operator in your application is equal to the application's parallelism. This means that by default, each operator in your application can use all of the available subtasks in the application if needed.
You can set the parallelism of the operators in your application using the setParallelism method.
Using this method, you can control the number of subtasks each operator can use at one time.
For more information about operator chaining, see Task chaining and resource groups in the Apache Flink Documentation.
Operator Chaining
Normally, each operator uses a separate subtask to execute, but if several operators always execute in sequence, the runtime can assign them all to the same task. This process is called Operator Chaining.
Several sequential operators can be chained into a single task if they all operate on the same data. The following are some of the criteria needed for this to be true:
• The operators do 1-to-1 simple forwarding.
• The operators all have the same operator parallelism.
When your application chains operators into a single subtask, it conserves system resources, because the service doesn't need to perform network operations and allocate subtasks for each operator. To determine if your application is using operator chaining, look at the job graph in the Kinesis Data Analytics console. Each vertex in the application represents one or more operators. The graph shows operators that have been chained as a single vertex.
DataStream API
Your Apache Flink application uses the Apache Flink DataStream API to transform data in a data stream.
This section contains the following topics:
• Using Connectors to Move Data in Kinesis Data Analytics for Apache Flink With the DataStream API (p. 9): These components move data between your application and external data sources and destinations.
• Transforming Data Using Operators in Kinesis Data Analytics for Apache Flink With the DataStream API (p. 14): These components transform or group data elements within your application.
• Tracking Events in Kinesis Data Analytics for Apache Flink Using the DataStream API (p. 15): This topic describes how Kinesis Data Analytics tracks events when using the DataStream API.
Using Connectors to Move Data in Kinesis Data
Analytics for Apache Flink With the DataStream API
In the Amazon Kinesis Data Analytics for Apache Flink DataStream API, connectors are software
components that move data into and out of a Kinesis Data Analytics application. Connectors are flexible integrations that enable you to read from files and directories. Connectors consist of complete modules for interacting with Amazon services and third-party systems.
DataStream API Connectors Types of connectors include the following:
• Sources (p. 10): Provide data to your application from a Kinesis data stream, file, or other data source.
• Sinks (p. 11): Send data from your application to a Kinesis data stream, Kinesis Data Firehose delivery stream, or other data destination.
• Asynchronous I/O (p. 14): Provides asynchronous access to a data source (such as a database) to enrich stream events.
Available Connectors
The Apache Flink framework contains connectors for accessing data from a variety of sources. For information about connectors available in the Apache Flink framework, see Connectors in the Apache Flink documentation.
Adding Streaming Data Sources to Kinesis Data Analytics for Apache Flink
Apache Flink provides connectors for reading from files, sockets, collections, and custom sources. In your application code, you use an Apache Flink source to receive data from a stream. This section describes the sources that are available for Amazon services.
Kinesis Data Streams
The FlinkKinesisConsumer source provides streaming data to your application from an Amazon Kinesis data stream.
Creating a FlinkKinesisConsumer
The following code example demonstrates creating a FlinkKinesisConsumer:
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
For more information about using a FlinkKinesisConsumer, see Download and Examine the Apache Flink Streaming Java Code (p. 83).
Creating a FlinkKinesisConsumer that uses an EFO consumer The FlinkKinesisConsumer now supports Enhanced Fan-Out (EFO).
If a Kinesis consumer uses EFO, the Kinesis Data Streams service gives it its own dedicated bandwidth, rather than having the consumer share the fixed bandwidth of the stream with the other consumers reading from the stream.
For more information about using EFO with the Kinesis consumer, see FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers.
You enable the EFO consumer by setting the following parameters on the Kinesis consumer:
• RECORD_PUBLISHER_TYPE: Set this parameter to EFO for your application to use an EFO consumer to access the Kinesis Data Stream data.
DataStream API Connectors
• EFO_CONSUMER_NAME: Set this parameter to a string value that is unique among the consumers of this stream. Re-using a consumer name in the same Kinesis Data Stream will cause the previous consumer using that name to be terminated.
To configure a FlinkKinesisConsumer to use EFO, add the following parameters to the consumer:
consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");
For an example of a Kinesis Data Analytics application that uses an EFO consumer, see EFO Consumer (p. 141).
Amazon MSK
The FlinkKafkaConsumer source provides streaming data to your application from an Amazon MSK topic.
Creating a FlinkKafkaConsumer
The following code example demonstrates creating a FlinkKafkaConsumer:
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty("bootstrap.servers", "Cluster Bootstrap Broker String");
inputProperties.setProperty("security.protocol", "SSL");
inputProperties.setProperty("ssl.truststore.location", "/usr/lib/jvm/java-11-amazon- corretto/lib/security/cacerts");
inputProperties.setProperty("ssl.truststore.password", "changeit");
DataStream<string> input = env.addSource(new FlinkKafkaConsumer<>("MyMSKTopic", new SimpleStringSchema(), inputProperties));
For more information about using a FlinkKafkaConsumer, see MSK Replication (p. 137).
Writing Data Using Sinks in Kinesis Data Analytics for Apache Flink
In your application code, you use an Apache Flink sink to write data from an Apache Flink stream to an AWS service, such as Kinesis Data Streams.
Apache Flink provides sinks for files, sockets, and custom sinks. The following sinks are available for AWS:
Kinesis Data Streams
Apache Flink provides information about the Kinesis Data Streams Connector in the Apache Flink documentation.
For an example of an application that uses a Kinesis data stream for input and output, see Getting Started (DataStream API) (p. 77).
Amazon S3
You can use the Apache Flink StreamingFileSink to write objects to an Amazon S3 bucket.
For an example about how to write objects to S3, see the section called “S3 Sink” (p. 128).
DataStream API Connectors
Kinesis Data Firehose
The FlinkKinesisFirehoseProducer is a reliable, scalable Apache Flink sink for storing application output using the Kinesis Data Firehose service. This section describes how to set up a Maven project to create and use a FlinkKinesisFirehoseProducer.
Topics
• Creating a FlinkKinesisFirehoseProducer (p. 12)
• FlinkKinesisFirehoseProducer Code Example (p. 12)
Creating a FlinkKinesisFirehoseProducer
The following code example demonstrates creating a FlinkKinesisFirehoseProducer:
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
FlinkKinesisFirehoseProducer<String> sink = new
FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
FlinkKinesisFirehoseProducer Code Example
The following code example demonstrates how to create and configure a
FlinkKinesisFirehoseProducer and send data from an Apache Flink data stream to the Kinesis Data Firehose service.
package com.amazonaws.services.kinesisanalytics;
import
com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import
com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
public class StreamingJob {
private static final String region = "us-east-1";
private static final String inputStreamName = "ExampleInputStream";
private static final String outputStreamName = "ExampleOutputStream";
private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
DataStream API Connectors
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
}
private static DataStream<String>
createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties =
KinesisAnalyticsRuntime.getApplicationProperties();
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
applicationProperties.get("ConsumerConfigProperties")));
}
private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /*
* com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with.
*/
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
FlinkKinesisFirehoseProducer<String> sink = new
FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
ProducerConfigConstants config = new ProducerConfigConstants();
return sink;
}
private static FlinkKinesisFirehoseProducer<String>
createFirehoseSinkFromApplicationProperties() throws IOException { /*
* com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with.
*/
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
FlinkKinesisFirehoseProducer<String> sink = new
FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties"));
return sink;
}
public static void main(String[] args) throws Exception { // set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
/* if you would like to use runtime configuration properties, uncomment the lines below
* DataStream<String> input = createSourceFromApplicationProperties(env);
*/
DataStream<String> input = createSourceFromStaticConfig(env);
// Kinesis Firehose sink
input.addSink(createFirehoseSinkFromStaticConfig());
// If you would like to use runtime configuration properties, uncomment the lines below // input.addSink(createFirehoseSinkFromApplicationProperties());
DataStream API Operators env.execute("Flink Streaming Java API Skeleton");
}}
For a complete tutorial about how to use the Kinesis Data Firehose sink, see the section called “Kinesis Data Firehose Sink” (p. 148).
Using Asynchronous I/O in Kinesis Data Analytics for Apache Flink
An Asynchronous I/O operator enriches stream data using an external data source such as a database.
Kinesis Data Analytics enriches the stream events asynchronously so that requests can be batched for greater efficiency.
For more information, see Asynchronous I/O in the Apache Flink Documentation.
Transforming Data Using Operators in Kinesis Data Analytics for Apache Flink With the DataStream API
To transform incoming data in a Kinesis Data Analytics for Apache Flink application, you use an Apache Flink operator. An Apache Flink operator transforms one or more data streams into a new data stream.
The new data stream contains modified data from the original data stream. Apache Flink provides more than 25 pre-built stream processing operators. For more information, see Operators in the Apache Flink Documentation.
This topic contains the following sections:
• Transform Operators (p. 14)
• Aggregation Operators (p. 14)
Transform Operators
The following is an example of a simple text transformation on one of the fields of a JSON data stream.
This code creates a transformed data stream. The new data stream has the same data as the original stream, with the string " Company" appended to the contents of the TICKER field.
DataStream<ObjectNode> output = input.map(
new MapFunction<ObjectNode, ObjectNode>() { @Override
public ObjectNode map(ObjectNode value) throws Exception {
return value.put("TICKER", value.get("TICKER").asText() + " Company");
} } );
Aggregation Operators
The following is an example of an aggregation operator. The code creates an aggregated data stream.
The operator creates a 5-second tumbling window and returns the sum of the PRICE values for the records in the window with the same TICKER value.
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
DataStream API Timestamps .reduce((node1, node2) -> {
double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
return node1;
});
For a complete code example that uses operators, see Getting Started (DataStream API) (p. 77).
Source code for the Getting Started application is available at Getting Started in the Kinesis Data Analytics Java Examples GitHub repository.
Tracking Events in Kinesis Data Analytics for Apache Flink Using the DataStream API
Kinesis Data Analytics tracks events using the following timestamps:
• Processing Time: Refers to the system time of the machine that is executing the respective operation.
• Event Time: Refers to the time that each individual event occurred on its producing device.
• Ingestion Time: Refers to the time that events enter the Kinesis Data Analytics service.
You set the time used by the streaming environment using setStreamTimeCharacteristic:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
For more information about timestamps, see Event Time in the Apache Flink Documentation.
Table API
Your Apache Flink application uses the Apache Flink Table API to interact with data in a stream using a relational model. You use the Table API to access data using Table sources, and then use Table functions to transform and filter table data. You can transform and filter tabular data using either API functions or SQL commands.
This section contains the following topics:
• Table API Connectors (p. 15): These components move data between your application and external data sources and destinations.
• Table API Time Attributes (p. 16): This topic describes how Kinesis Data Analytics tracks events when using the Table API.
Table API Connectors
In the Apache Flink programming model, connectors are components that your application uses to read or write data from external sources, such as other AWS services.
With the Apache Flink Table API, you can use the following types of connectors:
• Table API Sources (p. 16): You use Table API source connectors to create tables within your TableEnvironment using either API calls or SQL queries.
• Table API Sinks (p. 16): You use SQL commands to write table data to external sources such as an Amazon MSK topic or an Amazon S3 bucket.
Table API Time Attributes
Table API Sources
You create a table source from a data stream. The following code creates a table from an Amazon MSK topic:
//create the table
final FlinkKafkaConsumer<StockRecord> consumer = new
FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
consumer.setStartFromEarliest();
//Obtain stream
DataStream<StockRecord> events = env.addSource(consumer);
Table table = streamTableEnvironment.fromDataStream(events);
For more information about table sources, see Table & Connectors in the Apache Flink documentation.
Table API Sinks
To write table data to a sink, you create the sink in SQL, and then run the SQL-based sink on the StreamTableEnvironment object.
The following code example demonstrates how to write table data to an Amazon S3 sink:
final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," +
"ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" +
" PARTITIONED BY (ticker,dt,hr)" + " WITH" +
"(" +
" 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" +
") ";
//send to s3
streamTableEnvironment.executeSql(s3Sink);
filteredTable.executeInsert("sink_table");
You can use the format parameter to control what format Kinesis Data Analytics uses to write the output to the sink. For information about formats, see Formats in the Apache Flink documentation.
For more information about table sinks, see Table & Connectors in the Apache Flink documentation.
User-Defined Sources and Sinks
You can use existing Apache Kafka connectors for sending data to and from other AWS services, such as Amazon MSK and Amazon S3. For interacting with other data sources and destinations, you can define your own sources and sinks. For more information, see User-Defined Sources and Sinks in the Apache Flink documentation.
Table API Time Attributes
Each record in a data stream has several timestamps that define when events related to the record occurred:
Using Python
• Event Time: A user-defined timestamp that defines when the event that created the record occurred.
• Ingestion Time: The time when your application retrieved the record from the data stream.
• Processing Time: The time when your application processed the record.
When the Apache Flink Table API creates windows based on record times, you define which of these timestamps it uses by using the setStreamTimeCharacteristic method.
For more information about using timestamps with the Table API, see Time Attributes in the Apache Flink documentation.
Using Python with Kinesis Data Analytics
Apache Flink version 1.13.2 includes support for creating applications using Python version 3.8, using the PyFlink library. You create a Kinesis Data Analytics application using Python by doing the following:
• Create your Python application code as a text file with a main method.
• Bundle your application code file and any Python or Java dependencies into a zip file, and upload it to an Amazon S3 bucket.
• Create your Kinesis Data Analytics application, specifying your Amazon S3 code location, application properties, and application settings.
At a high level, the Python Table API is a wrapper around the Java Table API. For information about the Python Table API, see Intro to the Python Table API in the Apache Flink Documentation.
Programming Your Kinesis Data Analytics for Python Application
You code your Kinesis Data Analytics for Python application using the Apache Flink Python Table API.
The Apache Flink engine translates Python Table API statements (running in the Python VM) into Java Table API statements (running in the Java VM).
You use the Python Table API by doing the following:
• Create a reference to the StreamTableEnvironment.
• Create table objects from your source streaming data by executing queries on the StreamTableEnvironment reference.
• Execute queries on your table objects to create output tables.
• Write your output tables to your destinations using a StatementSet.
To get started using the Python Table API in Kinesis Data Analytics, see Getting Started with Amazon Kinesis Data Analytics for Apache Flink for Python (p. 105).
Reading and Writing Streaming Data
To read and write streaming data, you execute SQL queries on the table environment.
Creating a Table
The following code example demonstrates a user-defined function that creates a SQL query. The SQL query creates a table that interacts with a Kinesis stream:
Programming an Application
def create_table(table_name, stream_name, region, stream_initpos):
return """ CREATE TABLE {0} (
`record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL )
PARTITIONED BY (record_id) WITH (
'connector' = 'kinesis', 'stream' = '{1}',
'aws.region' = '{2}',
'scan.stream.initpos' = '{3}',
'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
) """.format(table_name, stream_name, region, stream_initpos)
Reading Streaming Data
The following code example demonstrates how to use preceding CreateTableSQL query on a table environment reference to read data:
table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
Writing Streaming Data
The following code example demonstrates how to use the SQL query from the CreateTable example to create an output table reference, and how to use a StatementSet to interact with the tables to write data to a destination Kinesis stream:
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
.format(output_table_name, input_table_name))
Reading Runtime Properties
You can use runtime properties to configure your application without changing your application code.
You specify application properties for your application the same way as with a Kinesis Data Analytics for Java application. You can specify runtime properties in the following ways:
• Using the CreateApplication action.
• Using the UpdateApplication action.
• Configuring your application by using the console.
You retrieve application properties in code by reading a json file called
application_properties.json that the Kinesis Data Analytics runtime creates.
The following code example demonstrates reading application properties from the application_properties.json file:
file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path):
Creating an Application with open(file_path, 'r') as file:
contents = file.read()
properties = json.loads(contents)
The following user-defined function code example demonstrates reading a property group from the application properties object: retrieves:
def property_map(properties, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]
The following code example demonstrates reading a property called INPUT_STREAM_KEY from a property group that the previous example returns:
input_stream = input_property_map[INPUT_STREAM_KEY]
Creating your application's code package
Once you have created your Python application, you bundle your code file and dependencies into a zip file.
Your zip file must contain a python script with a main method, and can optionally contain the following:
• Additional Python code files
• User-defined Java code in JAR files
• Java libraries in JAR files
NoteYour application zip file must contain all of the dependencies for your application. You can't reference libraries from other sources for your application.
Creating Your Python Kinesis Data Analytics Application
Specifying your Code Files
Once you have created your application's code package, you upload it to an Amazon S3 bucket. You then create your application using either the console or the CreateApplication action.
When you create your application using the CreateApplication action, you specify the code files and archives in your zip file using a special application property group called kinesis.analytics.flink.run.options. You can define the following types files:
• python: A text file containing a Python main method.
• jarfile: A Java JAR file containing Java user-defined functions.
• pyFiles: A Python resource file containing resources to be used by the application.
• pyArchives: A zip file containing resource files for the application.
For more information about Apache Flink Python code file types, see Command Line Usage in the Apache Flink Documentation.
Monitoring
NoteKinesis Data Analytics does not support the pyModule, pyExecutable, or pyRequirements file types. All of the code, requirements, and dependencies must be in your zip file. You can't specify dependencies to be installed using pip.
The following example json snippet demonstrates how to specify file locations within your application's zip file:
"ApplicationConfiguration": { "EnvironmentProperties": { "PropertyGroups": [ {
"PropertyGroupId": "kinesis.analytics.flink.run.options", "PropertyMap": {
"python": "MyApplication/main.py",
"jarfile": "MyApplication/lib/myJarFile.jar", "pyFiles": "MyApplication/lib/myDependentFile.py", "pyArchives": "MyApplication/lib/myArchive.zip"
} },
Monitoring Your Python Kinesis Data Analytics Application
You use your application's CloudWatch log to monitor your Python Kinesis Data Analytics application.
Kinesis Data Analytics logs the following messages for Python applications:
• Messages written to the console using print() in the application's main method.
• Messages sent in user-defined functions using the logging package. The following code example demonstrates writing to the application log from a user-defined function:
import logging
@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def doNothingUdf(i):
logging.info("Got {} in the doNothingUdf".format(str(i))) return i
• Error messages thrown by the application.
If the application throws an exception in the main function, it will appear in your application's logs.
The following example demonstrates a log entry for an exception thrown from Python code:
2021-03-15 16:21:20.000 --- Python Process Started ---
2021-03-15 16:21:21.000 Traceback (most recent call last):
2021-03-15 16:21:21.000 " File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/
flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/
PythonUdfUndeclared.py"", line 101, in <module>"
2021-03-15 16:21:21.000 main()
2021-03-15 16:21:21.000 " File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/
flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/
PythonUdfUndeclared.py"", line 54, in main"
2021-03-15 16:21:21.000 " table_env.register_function(""doNothingUdf"", doNothingUdf)"
2021-03-15 16:21:21.000 NameError: name 'doNothingUdf' is not defined 2021-03-15 16:21:21.000 --- Python Process Exited ---
Runtime Properties 2021-03-15 16:21:21.000 Run python process failed
2021-03-15 16:21:21.000 Error occurred when trying to start the job
NoteDue to performance issues, we recommend that you only use custom log messages during application development.
Querying Logs with CloudWatch Insights
The following CloudWatch Insights query searches for logs created by the Python entrypoint while executing the main function of your application:
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
Runtime Properties in Kinesis Data Analytics for Apache Flink
You can use runtime properties to configure your application without recompiling your application code.
This topic contains the following sections:
• Working with Runtime Properties in the Console (p. 21)
• Working with Runtime Properties in the CLI (p. 21)
• Accessing Runtime Properties in a Kinesis Data Analytics Application (p. 23)
Working with Runtime Properties in the Console
You can add, update, or remove runtime properties from your Kinesis Data Analytics application using the console.
NoteYou can't add runtime properties when you create an application in the Kinesis Data Analytics console.
Update Runtime Properties for a Kinesis Data Analytics application
1. Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.
2. Choose your Kinesis Data Analytics application. Choose Application details.
3. On the page for your application, choose Configure.
4. Expand the Properties section.
5. Use the controls in the Properties section to define a property group with key-value pairs. Use these controls to add, update, or remove property groups and runtime properties.
6. Choose Update.
Working with Runtime Properties in the CLI
You can add, update, or remove runtime properties using the AWS CLI.
Working with Runtime Properties in the CLI
This section includes example requests for API actions for configuring runtime properties for an application. For information about how to use a JSON file for input for an API action, see Kinesis Data Analytics API Example Code (p. 323).
Note
Replace the sample account ID (012345678901) in the examples following with your account ID.
Adding Runtime Properties when Creating an Application
The following example request for the CreateApplication action adds two runtime property groups (ProducerConfigProperties and ConsumerConfigProperties) when you create an application:
{ "ApplicationName": "MyApplication",
"ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_13",
"ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": {
"ApplicationCodeConfiguration": { "CodeContent": {
"S3ContentLocation": {
"BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar"
} },
"CodeContentType": "ZIPFILE"
},
"EnvironmentProperties": { "PropertyGroups": [
{
"PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : {
"flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false"
} }, {
"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : {
"aws.region" : "us-west-2"
} } ] } } }
Adding and Updating Runtime Properties in an Existing Application
The following example request for the UpdateApplication action adds or updates runtime properties for an existing application:
{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [
{
Accessing Runtime Properties in a Kinesis Data Analytics Application "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : {
"flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2",
"AggregationEnabled" : "false"
} }, {
"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : {
"aws.region" : "us-west-2"
} } ] } } }
NoteIf you use a key that has no corresponding runtime property in a property group, Kinesis Data Analytics adds the key-value pair as a new property. If you use a key for an existing runtime property in a property group, Kinesis Data Analytics updates the property value.
Removing Runtime Properties
The following example request for the UpdateApplication action removes all runtime properties and property groups from an existing application:
{
"ApplicationName": "MyApplication", "CurrentApplicationVersionId": 3, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": []
} }}
Important
If you omit an existing property group or an existing property key in a property group, that property group or property is removed.
Accessing Runtime Properties in a Kinesis Data Analytics Application
You retrieve runtime properties in your Java application code using the static
KinesisAnalyticsRuntime.getApplicationProperties() method, which returns a Map<String, Properties> object.
The following Java code example retrieves runtime properties for your application:
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
You retrieve a property group (as a Java.Util.Properties object) as follows:
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");