• 沒有找到結果。

Python Examples

在文檔中 Amazon Kinesis Data Analytics (頁 184-187)

2. In the Kinesis Data Streams panel, choose ExampleInputStream.

3. In the ExampleInputStream page, choose Delete Kinesis Stream and then confirm the deletion.

4. In the Kinesis streams page, choose the ExampleOutputStream, choose Actions, choose Delete, and then confirm the deletion.

Delete Your Amazon S3 Object and Bucket

1. Open the Amazon S3 console at https://console.aws.amazon.com/s3/.

2. Choose the ka-app-code-<username> bucket.

3. Choose Delete and then enter the bucket name to confirm deletion.

Delete Your IAM Resources

1. Open the IAM console at https://console.aws.amazon.com/iam/.

2. In the navigation bar, choose Policies.

3. In the filter control, enter kinesis.

4. Choose the kinesis-analytics-service-MyApplication-<your-region> policy.

5. Choose Policy Actions and then choose Delete.

6. In the navigation bar, choose Roles.

7. Choose the kinesis-analytics-MyApplication-<your-region> role.

8. Choose Delete role and then confirm the deletion.

Delete Your CloudWatch Resources

1. Open the CloudWatch console at https://console.aws.amazon.com/cloudwatch/.

2. In the navigation bar, choose Logs.

3. Choose the /aws/kinesis-analytics/MyApplication log group.

4. Choose Delete Log Group and then confirm the deletion.

Next Steps

Now that you've created and run a basic Kinesis Data Analytics application that transforms data using Apache Beam, see the following application for an example of a more advanced Kinesis Data Analytics solution.

Beam on Kinesis Data Analytics Streaming Workshop: In this workshop, we explore an end to end example that combines batch and streaming aspects in one uniform Apache Beam pipeline.

Python Examples

The following examples demonstrate how to create applications using Python with the Apache Flink Table API.

Topics

• Example: Creating a Tumbling Window in Python (p. 177)

• Example: Creating a Sliding Window in Python (p. 183)

Tumbling Window

• Example: Send Streaming Data to Amazon S3 in Python (p. 190)

Example: Creating a Tumbling Window in Python

In this exercise, you create a Python Kinesis Data Analytics application that aggregates data using a tumbling window.

NoteTo set up required prerequisites for this exercise, first complete the Getting Started (Python) (p. 105) exercise.

This topic contains the following sections:

• Create Dependent Resources (p. 177)

• Write Sample Records to the Input Stream (p. 177)

• Download and Examine the Application Code (p. 178)

• Compress and Upload the Apache Flink Streaming Python Code (p. 179)

• Create and Run the Kinesis Data Analytics Application (p. 179)

• Clean Up AWS Resources (p. 182)

Create Dependent Resources

Before you create a Kinesis Data Analytics application for this exercise, you create the following dependent resources:

• Two Kinesis data streams (ExampleInputStream and ExampleOutputStream)

• An Amazon S3 bucket to store the application's code (ka-app-code-<username>)

You can create the Kinesis streams and Amazon S3 bucket using the console. For instructions for creating these resources, see the following topics:

• Creating and Updating Data Streams in the Amazon Kinesis Data Streams Developer Guide. Name your data streams ExampleInputStream and ExampleOutputStream.

• How Do I Create an S3 Bucket? in the Amazon Simple Storage Service User Guide. Give the Amazon S3 bucket a globally unique name by appending your login name, such as ka-app-code-<username>.

Write Sample Records to the Input Stream

In this section, you use a Python script to write sample records to the stream for the application to process.

NoteThis section requires the AWS SDK for Python (Boto).

NoteThe Python script in this section uses the AWS CLI. You must configure your AWS CLI to use your account credentials and default region. To configure your AWS CLI, enter the following:

aws configure

1. Create a file named stock.py with the following contents:

import datetime

Tumbling Window import json

import random import boto3

STREAM_NAME = "ExampleInputStream"

def get_data():

return {

'EVENT_TIME': datetime.datetime.now().isoformat(),

'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)}

def generate(stream_name, kinesis_client):

while True:

data = get_data() print(data)

kinesis_client.put_record(

StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey")

if __name__ == '__main__':

generate(STREAM_NAME, boto3.client('kinesis')) 2. Run the stock.py script:

$ python stock.py

Keep the script running while completing the rest of the tutorial.

Download and Examine the Application Code

The Python application code for this example is available from GitHub. To download the application code, do the following:

1. Install the Git client if you haven't already. For more information, see Installing Git.

2. Clone the remote repository with the following command:

git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples 3. Navigate to the amazon-kinesis-data-analytics-java-examples/python/

TumblingWindow directory.

The application code is located in the tumbling-windows.py file. Note the following about the application code:

• The application uses a Kinesis table source to read from the source stream. The following snippet calls the create_table function to create the Kinesis table source:

table_env.execute_sql(

create_table(input_table_name, input_stream, input_region, stream_initpos) )

The create_table function uses a SQL command to create a table that is backed by the streaming source:

Tumbling Window

def create_table(table_name, stream_name, region, stream_initpos):

return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE,

event_time TIMESTAMP(3),

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND )

PARTITIONED BY (ticker) WITH (

'connector' = 'kinesis', 'stream' = '{1}',

'aws.region' = '{2}',

'scan.stream.initpos' = '{3}',

'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json',

'json.timestamp-format.standard' = 'ISO-8601' ) """.format(

table_name, stream_name, region, stream_initpos )

• The application uses the Tumble operator to aggregate records within a specified tumbling window, and return the aggregated records as a table object:

tumbling_window_table = ( input_table.window(

Tumble.over("10.seconds").on("event_time").alias("ten_second_window") )

.group_by("ticker, ten_second_window")

.select("ticker, price.sum as price, ten_second_window.end as event_time") )

• The application uses the Kinesis Flink connector, from the amazon-kinesis-sql-connector-flink-2.12/1.13.2.jar .

在文檔中 Amazon Kinesis Data Analytics (頁 184-187)