• 沒有找到結果。

Amazon Kinesis Data Analytics

N/A
N/A
Protected

Academic year: 2022

Share "Amazon Kinesis Data Analytics"

Copied!
340
0
0

加載中.... (立即查看全文)

全文

(1)

Amazon Kinesis Data Analytics

Amazon Kinesis Data

Analytics Developer Guide

(2)

Amazon Kinesis Data Analytics: Amazon Kinesis Data Analytics Developer Guide

Copyright © Amazon Web Services, Inc. and/or its affiliates. All rights reserved.

Amazon's trademarks and trade dress may not be used in connection with any product or service that is not Amazon's, in any manner that is likely to cause confusion among customers, or in any manner that disparages or discredits Amazon. All other trademarks not owned by Amazon are the property of their respective owners, who may or may not be affiliated with, connected to, or sponsored by Amazon.

(3)

Table of Contents

What Is Kinesis Data Analytics for Apache Flink? ... 1

Getting Started ... 1

How It Works ... 2

Programming Your Apache Flink Application ... 2

DataStream API ... 2

Table API ... 3

Creating Your Kinesis Data Analytics Application ... 3

Creating Applications ... 3

Building your Kinesis Data Analytics Application Code ... 4

Creating your Kinesis Data Analytics Application ... 5

Starting your Kinesis Data Analytics Application ... 5

Verifying your Kinesis Data Analytics Application ... 6

Using Apache Beam ... 6

Running Applications ... 7

Application and Job Status ... 7

Batch workloads ... 8

Application Resources ... 8

Kinesis Data Analytics Application Resources ... 8

Apache Flink Application Resources ... 8

DataStream API ... 9

DataStream API Connectors ... 9

DataStream API Operators ... 14

DataStream API Timestamps ... 15

Table API ... 15

Table API Connectors ... 15

Table API Time Attributes ... 16

Using Python ... 17

Programming an Application ... 17

Creating an Application ... 19

Monitoring ... 20

Runtime Properties ... 21

Working with Runtime Properties in the Console ... 21

Working with Runtime Properties in the CLI ... 21

Accessing Runtime Properties in a Kinesis Data Analytics Application ... 23

Fault Tolerance ... 24

Configuring Checkpointing ... 24

Checkpointing API Examples ... 25

Snapshots ... 27

Scaling ... 30

Configuring Application Parallelism and ParallelismPerKPU ... 30

Allocating Kinesis Processing Units ... 31

Updating Your Application's Parallelism ... 31

Automatic Scaling ... 32

Tagging ... 33

Adding Tags when an Application is Created ... 34

Adding or Updating Tags for an Existing Application ... 34

Listing Tags for an Application ... 34

Removing Tags from an Application ... 35

Apache Flink Dashboard ... 35

Accessing Your Application's Apache Flink Dashboard ... 36

Studio notebooks ... 38

Creating a Studio notebook ... 39

Interactive analysis of streaming data ... 39

Flink interpreters ... 40

(4)

Apache Flink table environment variables ... 40

Deploying as an application with durable state ... 41

Scala/Python criteria ... 42

SQL criteria ... 42

IAM Permissions ... 42

Connectors and dependencies ... 43

Default connectors ... 43

Dependencies and custom connectors ... 44

User-Defined Functions ... 44

Enabling Checkpointing ... 45

Setting the checkpointing interval ... 45

Setting the checkpointing type ... 45

Working with AWS Glue ... 45

Table properties ... 46

Examples and Tutorials ... 47

Creating a Studio notebook Tutorial ... 47

Deploying as an Application with Durable State Tutorial ... 61

Examples ... 64

Studio Notebooks vs. SQL Applications ... 72

Troubleshooting ... 72

Stopping a stuck application ... 72

Canceling jobs ... 72

Restarting the Apache Flink interpreter ... 73

Appendix: Creating custom IAM policies ... 73

AWS Glue ... 73

CloudWatch Logs ... 74

Kinesis streams ... 75

Amazon MSK clusters ... 76

Getting Started (DataStream API) ... 77

Application Components ... 77

Prerequisites ... 77

Step 1: Set Up an Account ... 78

Sign Up for AWS ... 78

Create an IAM User ... 78

Next Step ... 80

Step 2: Set Up the AWS CLI ... 80

Next Step ... 81

Step 3: Create an Application ... 81

Create Two Amazon Kinesis Data Streams ... 81

Write Sample Records to the Input Stream ... 82

Download and Examine the Apache Flink Streaming Java Code ... 83

Compile the Application Code ... 83

Upload the Apache Flink Streaming Java Code ... 84

Create and Run the Kinesis Data Analytics Application ... 84

Next Step ... 92

Step 4: Clean Up ... 92

Delete Your Kinesis Data Analytics Application ... 92

Delete Your Kinesis Data Streams ... 92

Delete Your Amazon S3 Object and Bucket ... 92

Delete Your IAM Resources ... 93

Delete Your CloudWatch Resources ... 93

Next Step ... 93

Step 5: Next Steps ... 93

Getting Started (Table API) ... 95

Application Components ... 95

Prerequisites ... 95

Create an Application ... 96

(5)

Create Dependent Resources ... 96

Write Sample Records to the Input Stream ... 97

Download and Examine the Apache Flink Streaming Java Code ... 98

Compile the Application Code ... 99

Upload the Apache Flink Streaming Java Code ... 99

Create and Run the Kinesis Data Analytics Application ... 100

Next Step ... 103

Clean Up ... 103

Delete Your Kinesis Data Analytics Application ... 103

Delete Your Amazon MSK Cluster ... 103

Delete Your VPC ... 103

Delete Your Amazon S3 Objects and Bucket ... 104

Delete Your IAM Resources ... 104

Delete Your CloudWatch Resources ... 104

Next Step ... 104

Next Steps ... 104

Getting Started (Python) ... 105

Application Components ... 105

Prerequisites ... 105

Create an Application ... 106

Create Dependent Resources ... 106

Write Sample Records to the Input Stream ... 107

Create and Examine the Apache Flink Streaming Python Code ... 108

Upload the Apache Flink Streaming Python Code ... 109

Create and Run the Kinesis Data Analytics Application ... 110

Next Step ... 113

Clean Up ... 113

Delete Your Kinesis Data Analytics Application ... 113

Delete Your Kinesis Data Streams ... 113

Delete Your Amazon S3 Objects and Bucket ... 114

Delete Your IAM Resources ... 114

Delete Your CloudWatch Resources ... 114

Examples ... 115

DataStream API Examples ... 115

Tumbling Window ... 115

Sliding Window ... 122

S3 Sink ... 128

MSK Replication ... 137

EFO Consumer ... 141

Kinesis Data Firehose Sink ... 148

Cross-Account ... 159

Custom Truststore ... 165

Apache Beam ... 170

Python Examples ... 176

Tumbling Window ... 177

Sliding Window ... 183

S3 Sink ... 190

Kinesis Data Analytics Solutions ... 197

AWS Streaming Data Solution ... 198

Clickstream Lab ... 198

Custom Scaling ... 198

CloudWatch Dashboard ... 198

Amazon MSK ... 198

More Kinesis Data Analytics Solutions on GitHub ... 199

Security ... 200

Data Protection ... 200

Data Encryption ... 200

(6)

Identity and Access Management ... 201

Trust Policy ... 202

Permissions Policy ... 202

Monitoring ... 204

Compliance Validation ... 204

Resilience ... 204

Disaster Recovery ... 205

Versioning ... 205

Infrastructure Security ... 205

Security Best Practices ... 206

Implement least privilege access ... 206

Use IAM roles to access other Amazon services ... 206

Implement Server-Side Encryption in Dependent Resources ... 206

Use CloudTrail to Monitor API Calls ... 206

Logging and Monitoring ... 208

Setting Up Logging ... 208

Setting Up CloudWatch Logging Using the Console ... 209

Setting Up CloudWatch Logging Using the CLI ... 210

Application Monitoring Levels ... 213

Logging Best Practices ... 214

Logging Troubleshooting ... 214

Next Step ... 215

Analyzing Logs ... 215

Run a Sample Query ... 215

Example Queries ... 216

Metrics and Dimensions ... 217

Application Metrics ... 218

Kinesis Data Streams Connector Metrics ... 224

Amazon MSK Connector Metrics ... 225

Apache Zeppelin Metrics ... 226

Viewing CloudWatch Metrics ... 226

Metrics ... 227

Custom Metrics ... 228

Alarms ... 231

Writing Custom Messages ... 235

Write to CloudWatch Logs Using Log4J ... 235

Write to CloudWatch Logs Using SLF4J ... 236

Using AWS CloudTrail ... 237

Kinesis Data Analytics Information in CloudTrail ... 237

Understanding Kinesis Data Analytics Log File Entries ... 238

Performance ... 240

Troubleshooting Performance ... 240

The Data Path ... 240

Performance Troubleshooting Solutions ... 240

Performance Best Practices ... 242

Manage scaling properly ... 242

Monitor external dependency resource usage ... 243

Run your Apache Flink application locally ... 244

Monitoring Performance ... 244

Performance Monitoring using CloudWatch Metrics ... 244

Performance Monitoring using CloudWatch Logs and Alarms ... 244

Quota ... 245

Maintenance ... 246

Best Practices ... 248

Fault tolerance: checkpoints and savepoints ... 248

Performance and parallelism ... 248

Logging ... 249

(7)

Coding ... 249

Studio notebook refresh interval ... 249

Studio notebook optimum performance ... 249

Earlier Versions ... 251

Using the Apache Flink Kinesis Streams Connector with previous Apache Flink versions ... 251

Building Applications with Apache Flink 1.8.2 ... 252

Building Applications with Apache Flink 1.6.2 ... 253

Upgrading Applications ... 253

Available Connectors in Apache Flink 1.6.2 and 1.8.2 ... 254

Getting Started: Flink 1.11.3 ... 254

Application Components ... 254

Prerequisites ... 255

Step 1: Set Up an Account ... 255

Step 2: Set Up the AWS CLI ... 257

Step 3: Create an Application ... 258

Step 4: Clean Up ... 269

Step 5: Next Steps ... 270

Getting Started: Flink 1.8.2 ... 271

Application Components ... 77

Prerequisites ... 272

Step 1: Set Up an Account ... 272

Step 2: Set Up the AWS CLI ... 274

Step 3: Create an Application ... 275

Step 4: Clean Up ... 286

Getting Started: Flink 1.6.2 ... 287

Application Components ... 288

Prerequisites ... 288

Step 1: Set Up an Account ... 288

Step 2: Set Up the AWS CLI ... 291

Step 3: Create an Application ... 292

Step 4: Clean Up ... 302

Flink Settings ... 304

Apache Flink Configuration ... 304

State Backend ... 304

Checkpointing ... 304

Savepointing ... 305

Heap Sizes ... 305

Using an Amazon VPC ... 306

Amazon VPC Concepts ... 306

VPC Application Permissions ... 307

Permissions Policy for Accessing an Amazon VPC ... 307

Internet and Service Access ... 307

Related Information ... 308

VPC API ... 308

CreateApplication ... 308

AddApplicationVpcConfiguration ... 309

DeleteApplicationVpcConfiguration ... 309

UpdateApplication ... 310

Example: Using a VPC ... 310

Troubleshooting ... 311

Development Troubleshooting ... 311

Enabling Flamegraps ... 311

Issue with EFO connector 1.13.2 ... 311

Compile Error: "Could not resolve dependencies for project" ... 311

Invalid Choice: "kinesisanalyticsv2" ... 312

UpdateApplication Action Isn't Reloading Application Code ... 312

Runtime Troubleshooting ... 312

(8)

Troubleshooting Tools ... 312

Application Issues ... 312

Application is Restarting ... 315

Throughput is Too Slow ... 317

Application State Data is Accumulating ... 318

Checkpointing Timing Out ... 318

Document History ... 320

API Example Code ... 323

AddApplicationCloudWatchLoggingOption ... 323

AddApplicationInput ... 324

AddApplicationInputProcessingConfiguration ... 324

AddApplicationOutput ... 325

AddApplicationReferenceDataSource ... 325

AddApplicationVpcConfiguration ... 326

CreateApplication ... 326

CreateApplicationSnapshot ... 327

DeleteApplication ... 327

DeleteApplicationCloudWatchLoggingOption ... 327

DeleteApplicationInputProcessingConfiguration ... 328

DeleteApplicationOutput ... 328

DeleteApplicationReferenceDataSource ... 328

DeleteApplicationSnapshot ... 328

DeleteApplicationVpcConfiguration ... 328

DescribeApplication ... 329

DescribeApplicationSnapshot ... 329

DiscoverInputSchema ... 329

ListApplications ... 330

ListApplicationSnapshots ... 330

StartApplication ... 330

StopApplication ... 330

UpdateApplication ... 331

API Reference ... 332

(9)

Getting Started

What Is Amazon Kinesis Data Analytics for Apache Flink?

With Amazon Kinesis Data Analytics for Apache Flink, you can use Java, Scala, or SQL to process and analyze streaming data. The service enables you to author and run code against streaming sources to perform time-series analytics, feed real-time dashboards, and create real-time metrics.

You can build Java and Scala applications in Kinesis Data Analytics using open-source libraries based on Apache Flink. Apache Flink is a popular framework and engine for processing data streams.

NoteAlthough Kinesis Data Analytics supports Apache Flink applications written in Scala version 2.12, this guide only contains code examples written in Java.

Kinesis Data Analytics provides the underlying infrastructure for your Apache Flink applications. It handles core capabilities like provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots). You can use the high-level Flink programming features (such as operators, functions, sources, and sinks) in the same way that you use them when hosting the Flink infrastructure yourself.

Getting Started

You can start by creating a Kinesis Data Analytics application that continuously reads and processes streaming data. Then, author your code using your IDE of choice, and test it with live streaming data. You can also configure destinations where you want Kinesis Data Analytics to send the results.

To get started, we recommend that you read the following sections:

• Kinesis Data Analytics for Apache Flink: How It Works (p. 2)

• Getting Started with Amazon Kinesis Data Analytics for Apache Flink (DataStream API) (p. 77)

(10)

Programming Your Apache Flink Application

Kinesis Data Analytics for Apache Flink: How It Works

Kinesis Data Analytics for Apache Flink is a fully managed Amazon service that enables you to use an Apache Flink application to process streaming data.

Programming Your Apache Flink Application

An Apache Flink application is a Java or Scala application that is created with the Apache Flink framework. You author and build your Apache Flink application locally.

Applications primarily use either the DataStream API or the Table API. The other Apache Flink APIs are also available for you to use, but they are less commonly used in building streaming applications.

The features of the two APIs are as follows:

DataStream API

The Apache Flink DataStream API programming model is based on two components:

Data stream: The structured representation of a continuous flow of data records.

Transformation operator: Takes one or more data streams as input, and produces one or more data streams as output.

Applications created with the DataStream API do the following:

• Read data from a Data Source (such as a Kinesis stream or Amazon MSK topic).

• Apply transformations to the data, such as filtering, aggregation, or enrichment.

• Write the transformed data to a Data Sink.

Applications that use the DataStream API can be written in Java or Scala, and can read from a Kinesis data stream, a Amazon MSK topic, or a custom source.

Your application processes data by using a connector. Apache Flink uses the following types of connectors:

Source: A connector used to read external data.

Sink: A connector used to write to external locations.

Operator: A connector used to process data within the application.

A typical application consists of at least one data stream with a source, a data stream with one or more operators, and at least one data sink.

For more information about using the DataStream API, see DataStream API (p. 9).

(11)

Table API

Table API

The Apache Flink Table API programming model is based on the following components:

Table Environment: An interface to underlying data that you use to create and host one or more tables.

Table: An object providing access to a SQL table or view.

Table Source: Used to read data from an external source, such as an Amazon MSK topic.

Table Function: A SQL query or API call used to transform data.

Table Sink: Used to write data to an external location, such as an Amazon S3 bucket.

Applications created with the Table API do the following:

• Create a TableEnvironment by connecting to a Table Source.

• Create a table in the TableEnvironment using either SQL queries or Table API functions.

• Run a query on the table using either Table API or SQL

• Apply transformations on the results of the query using Table Functions or SQL queries.

• Write the query or function results to a Table Sink.

Applications that use the Table API can be written in Java or Scala, and can query data using either API calls or SQL queries.

For more information about using the Table API, see Table API (p. 15).

Creating Your Kinesis Data Analytics Application

A Kinesis Data Analytics application is an AWS resource that is hosted by the Kinesis Data Analytics service. Your Kinesis Data Analytics application hosts your Apache Flink application and provides it with the following settings:

Runtime Properties (p. 21): Parameters that you can provide to your application. You can change these parameters without recompiling your application code.

Fault Tolerance (p. 24): How your application recovers from interrupts and restarts.

Logging and Monitoring (p. 208): How your application logs events to CloudWatch Logs.

Scaling (p. 30): How your application provisions computing resources.

You create your Kinesis Data Analytics application using either the console or the AWS CLI. To get started creating a Kinesis Data Analytics application, see Getting Started (DataStream API) (p. 77).

Creating a Kinesis Data Analytics for Apache Flink Application

This topic contains information about creating a Kinesis Data Analytics for Apache Flink application.

This topic contains the following sections:

• Building your Kinesis Data Analytics Application Code (p. 4)

(12)

Building your Kinesis Data Analytics Application Code

• Creating your Kinesis Data Analytics Application (p. 5)

• Starting your Kinesis Data Analytics Application (p. 5)

• Verifying your Kinesis Data Analytics Application (p. 6)

• Creating Kinesis Data Analytics applications with Apache Beam (p. 6)

Building your Kinesis Data Analytics Application Code

This section describes the components you use to build the application code for your Kinesis Data Analytics application.

We recommend that you use the latest supported version of Apache Flink for your application code.

The latest version of Apache Flink that Kinesis Data Analytics supports is 1.13.2. For information about upgrading Kinesis Data Analytics applications, see Upgrading Applications (p. 253).

You build your application code using Apache Maven. An Apache Maven project uses a pom.xml file to specify the versions of components that it uses.

NoteKinesis Data Analytics supports JAR files up to 512 MB in size. If you use a JAR file larger than this, your application will fail to start.

Use the following component versions for Kinesis Data Analytics applications:

Component Version

Java 11 (recommended)

Scala 2.12

Kinesis Data Analytics for Flink Runtime (aws-

kinesisanalytics-runtime) 1.2.0

Kinesis Data Analytics Flink Connectors (aws-

kinesisanalytics-flink) 2.0.0

AWS Kinesis Connector (flink-connector-kinesis) 1.13.2

Apache Beam (Beam Applications Only) 2.33.0, with Jackson version 2.12.2

For an example of a pom.xml file for a Kinesis Data Analytics application that uses Apache Flink version 1.13.2, see the Kinesis Data Analytics Getting Started Application.

For information about creating a Kinesis Data Analytics application that uses Apache Beam, see Using Apache Beam (p. 6).

Specifying your Application's Apache Flink Version

When using Kinesis Data Analytics for Flink Runtime version 1.1.0 and later, you specify the version of Apache Flink that your application uses when you compile your application. You provide the version of Apache Flink with the -Dflink.version parameter as follows:

mvn package -Dflink.version=1.13.2

For building applications with older versions of Apache Flink, see Earlier Versions (p. 251).

(13)

Creating your Kinesis Data Analytics Application

Creating your Kinesis Data Analytics Application

Once you have built your application code, you do the following to create your Kinesis Data Analytics application:

Upload your Application code: Upload your application code to an Amazon S3 bucket. You specify the S3 bucket name and object name of your application code when you create your application. For a tutorial that shows how to upload your application code, see the section called “Upload the Apache Flink Streaming Java Code” (p. 84) in the Getting Started (DataStream API) (p. 77) tutorial.

Create your Kinesis Data Analytics application: Use one of the following methods to create your Kinesis Data Analytics application:

Create your Kinesis Data Analytics application using the AWS console: You can create and configure your application using the AWS console.

When you create your application using the console, your application's dependent resources (such as CloudWatch Logs streams, IAM roles, and IAM policies) are created for you.

When you create your application using the console, you specify what version of Apache Flink your application uses by selecting it from the pull-down on the Kinesis Analytics - Create application page.

For a tutorial about how to use the console to create an application, see the section called “Create and Run the Application (Console)” (p. 84) in the Getting Started (DataStream API) (p. 77)

tutorial.

Create your Kinesis Data Analytics application using the AWS CLI: You can create and configure your application using the AWS CLI.

When you create your application using the CLI, you must also create your application's dependent resources (such as CloudWatch Logs streams, IAM roles, and IAM policies) manually.

When you create your application using the CLI, you specify what version of Apache Flink your application uses by using the RuntimeEnvironment parameter of the CreateApplication action.

For a tutorial about how to use the CLI to create an application, see the section called “Create and Run an Application Using the CLI” (p. 87) in the Getting Started (DataStream API) (p. 77)

tutorial.

NoteYou cannot change the RuntimeEnvironment of an existing application. If you need to change the RuntimeEnvironment of an existing application, you must delete the application and create it again.

Starting your Kinesis Data Analytics Application

After you have built your application code, uploaded it to S3, and created your Kinesis Data Analytics application, you then start your application. Starting a Kinesis Data Analytics application typically takes several minutes.

Use one of the following methods to start your application:

Start your Kinesis Data Analytics application using the AWS console: You can run your application by choosing Run on your application's page in the AWS console.

Start your Kinesis Data Analytics application using the AWS API: You can run your application using

(14)

Verifying your Kinesis Data Analytics Application

Verifying your Kinesis Data Analytics Application

You can verify that your application is working in the following ways:

Using CloudWatch Logs: You can use CloudWatch Logs and CloudWatch Logs Insights to verify that your application is running properly. For information about using CloudWatch Logs with your Kinesis Data Analytics application, see Logging and Monitoring (p. 208).

Using CloudWatch Metrics: You can use CloudWatch Metrics to monitor your application's activity, or activity in the resources your application uses for input or output (such as Kinesis streams, Kinesis Data Firehose delivery streams, or Amazon S3 buckets.) For more information about CloudWatch metrics, see Working with Metrics in the Amazon CloudWatch User Guide.

Monitoring Output Locations: If your application writes output to a location (such as an Amazon S3 bucket or database), you can monitor that location for written data.

Creating Kinesis Data Analytics applications with Apache Beam

You can use the Apache Beam framework with your Kinesis Data Analytics application to process streaming data. Kinesis Data Analytics applications that use Apache Beam use Apache Flink runner to execute Beam pipelines.

For a tutorial about how to use Apache Beam in a Kinesis Data Analytics application, see Apache Beam (p. 170).

This topic contains the following sections:

• Using Apache Beam with Kinesis Data Analytics (p. 6)

• Beam Capabilities (p. 6)

Using Apache Beam with Kinesis Data Analytics

Note the following about using the Apache Flink runner with Kinesis Data Analytics:

• Apache Beam metrics are not viewable in the Kinesis Data Analytics console.

Apache Beam is only supported with Kinesis Data Analytics applications that use Apache Flink version 1.8 and above. Apache Beam is not supported with Kinesis Data Analytics applications that use Apache Flink version 1.6.

Beam Capabilities

Kinesis Data Analytics supports the same Apache Beam capabilties as the Apache Flink runner.

For information about what features are supported with the Apache Flink runner, see the Beam Compatibility Matrix.

We recommend that you test your Apache Flink application in the Kinesis Data Analytics service to verify that we support all the features that your application needs.

(15)

Running Applications

Running a Kinesis Data Analytics for Apache Flink Application

This topic contains information about running a Kinesis Data Analytics for Apache Flink application.

When you run your Kinesis Data Analytics application, the Kinesis Data Analytics service creates an Apache Flink job. An Apache Flink job is the execution lifecycle of your Kinesis Data Analytics application.

The execution of the job, and the resources it uses, are managed by the Job Manager. The Job Manager separates the execution of the application into tasks. Each task is managed by a Task Manager. When you monitor your application's performance, you can examine the performance of each Task Manager, or of the Job Manager as a whole.

For information about Apache Flink jobs, see Jobs and Scheduling in the Apache Flink Documentation.

Application and Job Status

Both your application and the application's job have a current execution status:

Application status: Your application has a current status that describes its phase of execution.

Application statuses include the following:

Steady application statuses: Your application typically stays in these statuses until you make a status change:

READY: A new or stopped application is in the READY status until you run it.

RUNNING: An application that has successfully started is in the RUNNING status.

Transient application statuses: An application in these statuses is typically in the process of transitioning to another status. If an application stays in a transient status for a length of time, you can stop the application using the StopApplication action with the Force parameter set to true.

These statuses include the following:

• STARTING: Occurs after the StartApplication action. The application is transitioning from the READY to the RUNNING status.

STOPPING: Occurs after the StopApplication action. The application is transitioning from the RUNNING to the READY status.

• DELETING: Occurs after the DeleteApplication action. The application is in the process of being deleted.

• UPDATING: Occurs after the UpdateApplication action. The application is updating, and will transition back to the RUNNING or READY status.

• AUTOSCALING: The application has the AutoScalingEnabled property of the ParallelismConfiguration set to true, and the service is increasing the parallelism of the application. When the application is in this status, the only valid API action you can use is the StopApplication action with the Force parameter set to true. For information about automatic scaling, see Automatic Scaling (p. 32).

• FORCE_STOPPING: Occurs after the StopApplication action is called with the Force parameter set to true. The application is in the process of being force stopped. The application transitions from the STARTING, UPDATING, STOPPING, or AUTOSCALING status to the READY status.

• ROLLING_BACK: Occurs after the RollbackApplication action is called. The application is in the process of being rolled back to a previous version. The application transitions from the UPDATING or AUTOSCALING status to the RUNNING status.

• ROLLED_BACK: When you successfully roll back an application, this becomes the status of the version that you rolled back from. For information about rolling back an application, see RollbackApplication.

• MAINTENANCE: Occurs while Kinesis Data Analytics applies patches to your application. For more information, see Maintenance (p. 246).

(16)

Batch workloads

You can check your application's status using the console, or by using the DescribeApplication action.

Job status: When your application is in the RUNNING status, your job has a status that describes its current execution phase. A job starts in the CREATED status, and then proceeds to the RUNNING status when it has started. If error conditions occur, your application enters the following status:

• For applications using Apache Flink 1.11 and later, your application enters the RESTARTING status.

• For applications using Apache Flink 1.8 and prior, your application enters the FAILING status.

The application then proceeds to either the RESTARTING or FAILED status, depending on whether the job can be restarted.

You can check the job's status by examining your application's CloudWatch log for status changes.

Batch workloads

Kinesis Data Analytics supports running Apache Flink batch workloads. In a batch job, when an Apache Flink job gets to the FINISHED status, Kinesis Data Analytics application status is set to READY. For more information about Flink job statuses, see Jobs and Scheduling.

Application Resources

This section describes the system resources that your application uses. Understanding how Kinesis Data Analytics provisions and uses resources will help you design, create, and maintain a performant and stable Kinesis Data Analytics application.

Kinesis Data Analytics Application Resources

Kinesis Data Analytics is an AWS service that creates an environment for hosting your Apache Flink application. The Kinesis Data Analytics service provides resources using units called Kinesis Processing Units (KPUs).

One KPU represents the following system resources:

• One CPU core

• 4 GB of memory, of which one GB is native memory and three GB are heap memory

• 50 GB of disk space

KPUs run applications in distinct execution units called tasks and subtasks. You can think of a subtask as the equivalent of a thread.

The number of KPUs available to an application is equal to the application's Parallelism setting, divided by the application's ParallelismPerKPU setting.

For more information about application parallelism, see Scaling (p. 30).

Apache Flink Application Resources

The Apache Flink environment allocates resources for your application using units called task slots.

When Kinesis Data Analytics allocates resources for your application, it assigns one or more Apache Flink task slots to a single KPU. The number of slots assigned to a single KPU is equal to your application's ParallelismPerKPU setting. For more information about task slots, see Job Scheduling in the Apache Flink documentation.

(17)

DataStream API

Operator Parallelism

You can set the maximum number of subtasks that an operator can use. This value is called Operator Parallelism. By default, the parallelism of each operator in your application is equal to the application's parallelism. This means that by default, each operator in your application can use all of the available subtasks in the application if needed.

You can set the parallelism of the operators in your application using the setParallelism method.

Using this method, you can control the number of subtasks each operator can use at one time.

For more information about operator chaining, see Task chaining and resource groups in the Apache Flink Documentation.

Operator Chaining

Normally, each operator uses a separate subtask to execute, but if several operators always execute in sequence, the runtime can assign them all to the same task. This process is called Operator Chaining.

Several sequential operators can be chained into a single task if they all operate on the same data. The following are some of the criteria needed for this to be true:

• The operators do 1-to-1 simple forwarding.

• The operators all have the same operator parallelism.

When your application chains operators into a single subtask, it conserves system resources, because the service doesn't need to perform network operations and allocate subtasks for each operator. To determine if your application is using operator chaining, look at the job graph in the Kinesis Data Analytics console. Each vertex in the application represents one or more operators. The graph shows operators that have been chained as a single vertex.

DataStream API

Your Apache Flink application uses the Apache Flink DataStream API to transform data in a data stream.

This section contains the following topics:

• Using Connectors to Move Data in Kinesis Data Analytics for Apache Flink With the DataStream API (p. 9): These components move data between your application and external data sources and destinations.

• Transforming Data Using Operators in Kinesis Data Analytics for Apache Flink With the DataStream API (p. 14): These components transform or group data elements within your application.

• Tracking Events in Kinesis Data Analytics for Apache Flink Using the DataStream API (p. 15): This topic describes how Kinesis Data Analytics tracks events when using the DataStream API.

Using Connectors to Move Data in Kinesis Data

Analytics for Apache Flink With the DataStream API

In the Amazon Kinesis Data Analytics for Apache Flink DataStream API, connectors are software

components that move data into and out of a Kinesis Data Analytics application. Connectors are flexible integrations that enable you to read from files and directories. Connectors consist of complete modules for interacting with Amazon services and third-party systems.

(18)

DataStream API Connectors Types of connectors include the following:

• Sources (p. 10): Provide data to your application from a Kinesis data stream, file, or other data source.

• Sinks (p. 11): Send data from your application to a Kinesis data stream, Kinesis Data Firehose delivery stream, or other data destination.

• Asynchronous I/O (p. 14): Provides asynchronous access to a data source (such as a database) to enrich stream events.

Available Connectors

The Apache Flink framework contains connectors for accessing data from a variety of sources. For information about connectors available in the Apache Flink framework, see Connectors in the Apache Flink documentation.

Adding Streaming Data Sources to Kinesis Data Analytics for Apache Flink

Apache Flink provides connectors for reading from files, sockets, collections, and custom sources. In your application code, you use an Apache Flink source to receive data from a stream. This section describes the sources that are available for Amazon services.

Kinesis Data Streams

The FlinkKinesisConsumer source provides streaming data to your application from an Amazon Kinesis data stream.

Creating a FlinkKinesisConsumer

The following code example demonstrates creating a FlinkKinesisConsumer:

Properties inputProperties = new Properties();

inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

For more information about using a FlinkKinesisConsumer, see Download and Examine the Apache Flink Streaming Java Code (p. 83).

Creating a FlinkKinesisConsumer that uses an EFO consumer The FlinkKinesisConsumer now supports Enhanced Fan-Out (EFO).

If a Kinesis consumer uses EFO, the Kinesis Data Streams service gives it its own dedicated bandwidth, rather than having the consumer share the fixed bandwidth of the stream with the other consumers reading from the stream.

For more information about using EFO with the Kinesis consumer, see FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers.

You enable the EFO consumer by setting the following parameters on the Kinesis consumer:

RECORD_PUBLISHER_TYPE: Set this parameter to EFO for your application to use an EFO consumer to access the Kinesis Data Stream data.

(19)

DataStream API Connectors

EFO_CONSUMER_NAME: Set this parameter to a string value that is unique among the consumers of this stream. Re-using a consumer name in the same Kinesis Data Stream will cause the previous consumer using that name to be terminated.

To configure a FlinkKinesisConsumer to use EFO, add the following parameters to the consumer:

consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");

consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");

For an example of a Kinesis Data Analytics application that uses an EFO consumer, see EFO Consumer (p. 141).

Amazon MSK

The FlinkKafkaConsumer source provides streaming data to your application from an Amazon MSK topic.

Creating a FlinkKafkaConsumer

The following code example demonstrates creating a FlinkKafkaConsumer:

Properties inputProperties = new Properties();

inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

inputProperties.setProperty("bootstrap.servers", "Cluster Bootstrap Broker String");

inputProperties.setProperty("security.protocol", "SSL");

inputProperties.setProperty("ssl.truststore.location", "/usr/lib/jvm/java-11-amazon- corretto/lib/security/cacerts");

inputProperties.setProperty("ssl.truststore.password", "changeit");

DataStream<string> input = env.addSource(new FlinkKafkaConsumer<>("MyMSKTopic", new SimpleStringSchema(), inputProperties));

For more information about using a FlinkKafkaConsumer, see MSK Replication (p. 137).

Writing Data Using Sinks in Kinesis Data Analytics for Apache Flink

In your application code, you use an Apache Flink sink to write data from an Apache Flink stream to an AWS service, such as Kinesis Data Streams.

Apache Flink provides sinks for files, sockets, and custom sinks. The following sinks are available for AWS:

Kinesis Data Streams

Apache Flink provides information about the Kinesis Data Streams Connector in the Apache Flink documentation.

For an example of an application that uses a Kinesis data stream for input and output, see Getting Started (DataStream API) (p. 77).

Amazon S3

You can use the Apache Flink StreamingFileSink to write objects to an Amazon S3 bucket.

For an example about how to write objects to S3, see the section called “S3 Sink” (p. 128).

(20)

DataStream API Connectors

Kinesis Data Firehose

The FlinkKinesisFirehoseProducer is a reliable, scalable Apache Flink sink for storing application output using the Kinesis Data Firehose service. This section describes how to set up a Maven project to create and use a FlinkKinesisFirehoseProducer.

Topics

• Creating a FlinkKinesisFirehoseProducer (p. 12)

• FlinkKinesisFirehoseProducer Code Example (p. 12)

Creating a FlinkKinesisFirehoseProducer

The following code example demonstrates creating a FlinkKinesisFirehoseProducer:

Properties outputProperties = new Properties();

outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new

FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

FlinkKinesisFirehoseProducer Code Example

The following code example demonstrates how to create and configure a

FlinkKinesisFirehoseProducer and send data from an Apache Flink data stream to the Kinesis Data Firehose service.

package com.amazonaws.services.kinesisanalytics;

import

com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;

import

com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;

import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;

import java.util.Map;

import java.util.Properties;

public class StreamingJob {

private static final String region = "us-east-1";

private static final String inputStreamName = "ExampleInputStream";

private static final String outputStreamName = "ExampleOutputStream";

private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {

Properties inputProperties = new Properties();

inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

(21)

DataStream API Connectors

return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

}

private static DataStream<String>

createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties =

KinesisAnalyticsRuntime.getApplicationProperties();

return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),

applicationProperties.get("ConsumerConfigProperties")));

}

private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /*

* com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with.

*/

Properties outputProperties = new Properties();

outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new

FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

ProducerConfigConstants config = new ProducerConfigConstants();

return sink;

}

private static FlinkKinesisFirehoseProducer<String>

createFirehoseSinkFromApplicationProperties() throws IOException { /*

* com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with.

*/

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

FlinkKinesisFirehoseProducer<String> sink = new

FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties"));

return sink;

}

public static void main(String[] args) throws Exception { // set up the streaming execution environment

final StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

/* if you would like to use runtime configuration properties, uncomment the lines below

* DataStream<String> input = createSourceFromApplicationProperties(env);

*/

DataStream<String> input = createSourceFromStaticConfig(env);

// Kinesis Firehose sink

input.addSink(createFirehoseSinkFromStaticConfig());

// If you would like to use runtime configuration properties, uncomment the lines below // input.addSink(createFirehoseSinkFromApplicationProperties());

(22)

DataStream API Operators env.execute("Flink Streaming Java API Skeleton");

}}

For a complete tutorial about how to use the Kinesis Data Firehose sink, see the section called “Kinesis Data Firehose Sink” (p. 148).

Using Asynchronous I/O in Kinesis Data Analytics for Apache Flink

An Asynchronous I/O operator enriches stream data using an external data source such as a database.

Kinesis Data Analytics enriches the stream events asynchronously so that requests can be batched for greater efficiency.

For more information, see Asynchronous I/O in the Apache Flink Documentation.

Transforming Data Using Operators in Kinesis Data Analytics for Apache Flink With the DataStream API

To transform incoming data in a Kinesis Data Analytics for Apache Flink application, you use an Apache Flink operator. An Apache Flink operator transforms one or more data streams into a new data stream.

The new data stream contains modified data from the original data stream. Apache Flink provides more than 25 pre-built stream processing operators. For more information, see Operators in the Apache Flink Documentation.

This topic contains the following sections:

• Transform Operators (p. 14)

• Aggregation Operators (p. 14)

Transform Operators

The following is an example of a simple text transformation on one of the fields of a JSON data stream.

This code creates a transformed data stream. The new data stream has the same data as the original stream, with the string " Company" appended to the contents of the TICKER field.

DataStream<ObjectNode> output = input.map(

new MapFunction<ObjectNode, ObjectNode>() { @Override

public ObjectNode map(ObjectNode value) throws Exception {

return value.put("TICKER", value.get("TICKER").asText() + " Company");

} } );

Aggregation Operators

The following is an example of an aggregation operator. The code creates an aggregated data stream.

The operator creates a 5-second tumbling window and returns the sum of the PRICE values for the records in the window with the same TICKER value.

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

(23)

DataStream API Timestamps .reduce((node1, node2) -> {

double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();

node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));

return node1;

});

For a complete code example that uses operators, see Getting Started (DataStream API) (p. 77).

Source code for the Getting Started application is available at Getting Started in the Kinesis Data Analytics Java Examples GitHub repository.

Tracking Events in Kinesis Data Analytics for Apache Flink Using the DataStream API

Kinesis Data Analytics tracks events using the following timestamps:

Processing Time: Refers to the system time of the machine that is executing the respective operation.

Event Time: Refers to the time that each individual event occurred on its producing device.

Ingestion Time: Refers to the time that events enter the Kinesis Data Analytics service.

You set the time used by the streaming environment using setStreamTimeCharacteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

For more information about timestamps, see Event Time in the Apache Flink Documentation.

Table API

Your Apache Flink application uses the Apache Flink Table API to interact with data in a stream using a relational model. You use the Table API to access data using Table sources, and then use Table functions to transform and filter table data. You can transform and filter tabular data using either API functions or SQL commands.

This section contains the following topics:

• Table API Connectors (p. 15): These components move data between your application and external data sources and destinations.

• Table API Time Attributes (p. 16): This topic describes how Kinesis Data Analytics tracks events when using the Table API.

Table API Connectors

In the Apache Flink programming model, connectors are components that your application uses to read or write data from external sources, such as other AWS services.

With the Apache Flink Table API, you can use the following types of connectors:

• Table API Sources (p. 16): You use Table API source connectors to create tables within your TableEnvironment using either API calls or SQL queries.

• Table API Sinks (p. 16): You use SQL commands to write table data to external sources such as an Amazon MSK topic or an Amazon S3 bucket.

(24)

Table API Time Attributes

Table API Sources

You create a table source from a data stream. The following code creates a table from an Amazon MSK topic:

//create the table

final FlinkKafkaConsumer<StockRecord> consumer = new

FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);

consumer.setStartFromEarliest();

//Obtain stream

DataStream<StockRecord> events = env.addSource(consumer);

Table table = streamTableEnvironment.fromDataStream(events);

For more information about table sources, see Table & Connectors in the Apache Flink documentation.

Table API Sinks

To write table data to a sink, you create the sink in SQL, and then run the SQL-based sink on the StreamTableEnvironment object.

The following code example demonstrates how to write table data to an Amazon S3 sink:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," +

"ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" +

" PARTITIONED BY (ticker,dt,hr)" + " WITH" +

"(" +

" 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" +

") ";

//send to s3

streamTableEnvironment.executeSql(s3Sink);

filteredTable.executeInsert("sink_table");

You can use the format parameter to control what format Kinesis Data Analytics uses to write the output to the sink. For information about formats, see Formats in the Apache Flink documentation.

For more information about table sinks, see Table & Connectors in the Apache Flink documentation.

User-Defined Sources and Sinks

You can use existing Apache Kafka connectors for sending data to and from other AWS services, such as Amazon MSK and Amazon S3. For interacting with other data sources and destinations, you can define your own sources and sinks. For more information, see User-Defined Sources and Sinks in the Apache Flink documentation.

Table API Time Attributes

Each record in a data stream has several timestamps that define when events related to the record occurred:

(25)

Using Python

Event Time: A user-defined timestamp that defines when the event that created the record occurred.

Ingestion Time: The time when your application retrieved the record from the data stream.

Processing Time: The time when your application processed the record.

When the Apache Flink Table API creates windows based on record times, you define which of these timestamps it uses by using the setStreamTimeCharacteristic method.

For more information about using timestamps with the Table API, see Time Attributes in the Apache Flink documentation.

Using Python with Kinesis Data Analytics

Apache Flink version 1.13.2 includes support for creating applications using Python version 3.8, using the PyFlink library. You create a Kinesis Data Analytics application using Python by doing the following:

• Create your Python application code as a text file with a main method.

• Bundle your application code file and any Python or Java dependencies into a zip file, and upload it to an Amazon S3 bucket.

• Create your Kinesis Data Analytics application, specifying your Amazon S3 code location, application properties, and application settings.

At a high level, the Python Table API is a wrapper around the Java Table API. For information about the Python Table API, see Intro to the Python Table API in the Apache Flink Documentation.

Programming Your Kinesis Data Analytics for Python Application

You code your Kinesis Data Analytics for Python application using the Apache Flink Python Table API.

The Apache Flink engine translates Python Table API statements (running in the Python VM) into Java Table API statements (running in the Java VM).

You use the Python Table API by doing the following:

• Create a reference to the StreamTableEnvironment.

• Create table objects from your source streaming data by executing queries on the StreamTableEnvironment reference.

• Execute queries on your table objects to create output tables.

• Write your output tables to your destinations using a StatementSet.

To get started using the Python Table API in Kinesis Data Analytics, see Getting Started with Amazon Kinesis Data Analytics for Apache Flink for Python (p. 105).

Reading and Writing Streaming Data

To read and write streaming data, you execute SQL queries on the table environment.

Creating a Table

The following code example demonstrates a user-defined function that creates a SQL query. The SQL query creates a table that interacts with a Kinesis stream:

(26)

Programming an Application

def create_table(table_name, stream_name, region, stream_initpos):

return """ CREATE TABLE {0} (

`record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL )

PARTITIONED BY (record_id) WITH (

'connector' = 'kinesis', 'stream' = '{1}',

'aws.region' = '{2}',

'scan.stream.initpos' = '{3}',

'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json',

'json.timestamp-format.standard' = 'ISO-8601'

) """.format(table_name, stream_name, region, stream_initpos)

Reading Streaming Data

The following code example demonstrates how to use preceding CreateTableSQL query on a table environment reference to read data:

table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))

Writing Streaming Data

The following code example demonstrates how to use the SQL query from the CreateTable example to create an output table reference, and how to use a StatementSet to interact with the tables to write data to a destination Kinesis stream:

table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"

.format(output_table_name, input_table_name))

Reading Runtime Properties

You can use runtime properties to configure your application without changing your application code.

You specify application properties for your application the same way as with a Kinesis Data Analytics for Java application. You can specify runtime properties in the following ways:

• Using the CreateApplication action.

• Using the UpdateApplication action.

• Configuring your application by using the console.

You retrieve application properties in code by reading a json file called

application_properties.json that the Kinesis Data Analytics runtime creates.

The following code example demonstrates reading application properties from the application_properties.json file:

file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path):

(27)

Creating an Application with open(file_path, 'r') as file:

contents = file.read()

properties = json.loads(contents)

The following user-defined function code example demonstrates reading a property group from the application properties object: retrieves:

def property_map(properties, property_group_id):

for prop in props:

if prop["PropertyGroupId"] == property_group_id:

return prop["PropertyMap"]

The following code example demonstrates reading a property called INPUT_STREAM_KEY from a property group that the previous example returns:

input_stream = input_property_map[INPUT_STREAM_KEY]

Creating your application's code package

Once you have created your Python application, you bundle your code file and dependencies into a zip file.

Your zip file must contain a python script with a main method, and can optionally contain the following:

• Additional Python code files

• User-defined Java code in JAR files

• Java libraries in JAR files

NoteYour application zip file must contain all of the dependencies for your application. You can't reference libraries from other sources for your application.

Creating Your Python Kinesis Data Analytics Application

Specifying your Code Files

Once you have created your application's code package, you upload it to an Amazon S3 bucket. You then create your application using either the console or the CreateApplication action.

When you create your application using the CreateApplication action, you specify the code files and archives in your zip file using a special application property group called kinesis.analytics.flink.run.options. You can define the following types files:

python: A text file containing a Python main method.

jarfile: A Java JAR file containing Java user-defined functions.

pyFiles: A Python resource file containing resources to be used by the application.

pyArchives: A zip file containing resource files for the application.

For more information about Apache Flink Python code file types, see Command Line Usage in the Apache Flink Documentation.

(28)

Monitoring

NoteKinesis Data Analytics does not support the pyModule, pyExecutable, or pyRequirements file types. All of the code, requirements, and dependencies must be in your zip file. You can't specify dependencies to be installed using pip.

The following example json snippet demonstrates how to specify file locations within your application's zip file:

"ApplicationConfiguration": { "EnvironmentProperties": { "PropertyGroups": [ {

"PropertyGroupId": "kinesis.analytics.flink.run.options", "PropertyMap": {

"python": "MyApplication/main.py",

"jarfile": "MyApplication/lib/myJarFile.jar", "pyFiles": "MyApplication/lib/myDependentFile.py", "pyArchives": "MyApplication/lib/myArchive.zip"

} },

Monitoring Your Python Kinesis Data Analytics Application

You use your application's CloudWatch log to monitor your Python Kinesis Data Analytics application.

Kinesis Data Analytics logs the following messages for Python applications:

• Messages written to the console using print() in the application's main method.

• Messages sent in user-defined functions using the logging package. The following code example demonstrates writing to the application log from a user-defined function:

import logging

@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def doNothingUdf(i):

logging.info("Got {} in the doNothingUdf".format(str(i))) return i

• Error messages thrown by the application.

If the application throws an exception in the main function, it will appear in your application's logs.

The following example demonstrates a log entry for an exception thrown from Python code:

2021-03-15 16:21:20.000 --- Python Process Started ---

2021-03-15 16:21:21.000 Traceback (most recent call last):

2021-03-15 16:21:21.000 " File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/

flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/

PythonUdfUndeclared.py"", line 101, in <module>"

2021-03-15 16:21:21.000 main()

2021-03-15 16:21:21.000 " File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/

flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/

PythonUdfUndeclared.py"", line 54, in main"

2021-03-15 16:21:21.000 " table_env.register_function(""doNothingUdf"", doNothingUdf)"

2021-03-15 16:21:21.000 NameError: name 'doNothingUdf' is not defined 2021-03-15 16:21:21.000 --- Python Process Exited ---

(29)

Runtime Properties 2021-03-15 16:21:21.000 Run python process failed

2021-03-15 16:21:21.000 Error occurred when trying to start the job

NoteDue to performance issues, we recommend that you only use custom log messages during application development.

Querying Logs with CloudWatch Insights

The following CloudWatch Insights query searches for logs created by the Python entrypoint while executing the main function of your application:

fields @timestamp, message

| sort @timestamp asc

| filter logger like /PythonDriver/

| limit 1000

Runtime Properties in Kinesis Data Analytics for Apache Flink

You can use runtime properties to configure your application without recompiling your application code.

This topic contains the following sections:

• Working with Runtime Properties in the Console (p. 21)

• Working with Runtime Properties in the CLI (p. 21)

• Accessing Runtime Properties in a Kinesis Data Analytics Application (p. 23)

Working with Runtime Properties in the Console

You can add, update, or remove runtime properties from your Kinesis Data Analytics application using the console.

NoteYou can't add runtime properties when you create an application in the Kinesis Data Analytics console.

Update Runtime Properties for a Kinesis Data Analytics application

1. Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.

2. Choose your Kinesis Data Analytics application. Choose Application details.

3. On the page for your application, choose Configure.

4. Expand the Properties section.

5. Use the controls in the Properties section to define a property group with key-value pairs. Use these controls to add, update, or remove property groups and runtime properties.

6. Choose Update.

Working with Runtime Properties in the CLI

You can add, update, or remove runtime properties using the AWS CLI.

(30)

Working with Runtime Properties in the CLI

This section includes example requests for API actions for configuring runtime properties for an application. For information about how to use a JSON file for input for an API action, see Kinesis Data Analytics API Example Code (p. 323).

Note

Replace the sample account ID (012345678901) in the examples following with your account ID.

Adding Runtime Properties when Creating an Application

The following example request for the CreateApplication action adds two runtime property groups (ProducerConfigProperties and ConsumerConfigProperties) when you create an application:

{ "ApplicationName": "MyApplication",

"ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_13",

"ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": {

"ApplicationCodeConfiguration": { "CodeContent": {

"S3ContentLocation": {

"BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar"

} },

"CodeContentType": "ZIPFILE"

},

"EnvironmentProperties": { "PropertyGroups": [

{

"PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : {

"flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false"

} }, {

"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : {

"aws.region" : "us-west-2"

} } ] } } }

Adding and Updating Runtime Properties in an Existing Application

The following example request for the UpdateApplication action adds or updates runtime properties for an existing application:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [

{

(31)

Accessing Runtime Properties in a Kinesis Data Analytics Application "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : {

"flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2",

"AggregationEnabled" : "false"

} }, {

"PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : {

"aws.region" : "us-west-2"

} } ] } } }

NoteIf you use a key that has no corresponding runtime property in a property group, Kinesis Data Analytics adds the key-value pair as a new property. If you use a key for an existing runtime property in a property group, Kinesis Data Analytics updates the property value.

Removing Runtime Properties

The following example request for the UpdateApplication action removes all runtime properties and property groups from an existing application:

{

"ApplicationName": "MyApplication", "CurrentApplicationVersionId": 3, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": []

} }}

Important

If you omit an existing property group or an existing property key in a property group, that property group or property is removed.

Accessing Runtime Properties in a Kinesis Data Analytics Application

You retrieve runtime properties in your Java application code using the static

KinesisAnalyticsRuntime.getApplicationProperties() method, which returns a Map<String, Properties> object.

The following Java code example retrieves runtime properties for your application:

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

You retrieve a property group (as a Java.Util.Properties object) as follows:

Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");

參考文獻

相關文件

The broken teapots have been kept aside ______ you need them to support a claim on your suppliers

 If I buy a call option from you, I am paying you a certain amount of money in return for the right to force you to sell me a share of the stock, if I want it, at the strike price,

Your problem may be modest, but if it challenges your curiosity and brings into play your inventive faculties, and if you solve it by your own means, you may experience the tension

(a) In your group, discuss what impact the social issues in Learning Activity 1 (and any other socials issues you can think of) have on the world, Hong Kong and you.. Choose the

Two distinct real roots are computed by the Müller’s Method with different initial points... Thank you for

• Understand Membrane Instantons from Matrix Model Terminology. Thank You For

Unless prior permission in writing is given by the Commissioner of Police, you may not use the materials other than for your personal learning and in the course of your official

Unless prior permission in writing is given by the Commissioner of Police, you may not use the materials other than for your personal learning and in the course of your official