• 沒有找到結果。

Amazon Kinesis Data Analytics for SQL Applications Developer Guide

N/A
N/A
Protected

Academic year: 2022

Share "Amazon Kinesis Data Analytics for SQL Applications Developer Guide"

Copied!
317
0
0

加載中.... (立即查看全文)

全文

(1)

Amazon Kinesis Data Analytics for SQL

Applications Developer Guide

SQL Developer Guide

(2)

Amazon Kinesis Data Analytics for SQL Applications Developer Guide:

SQL Developer Guide

Copyright © Amazon Web Services, Inc. and/or its affiliates. All rights reserved.

Amazon's trademarks and trade dress may not be used in connection with any product or service that is not Amazon's, in any manner that is likely to cause confusion among customers, or in any manner that disparages or discredits Amazon. All other trademarks not owned by Amazon are the property of their respective owners, who may or may not be affiliated with, connected to, or sponsored by Amazon.

(3)

Table of Contents

What Is Amazon Kinesis Data Analytics for SQL Applications? ... 1

When Should I Use Amazon Kinesis Data Analytics? ... 1

Are You a First-Time User of Amazon Kinesis Data Analytics? ... 1

How It Works ... 3

Input ... 5

Configuring a Streaming Source ... 5

Configuring a Reference Source ... 7

Working with JSONPath ... 9

Mapping Streaming Source Elements to SQL Input Columns ... 13

Using the Schema Discovery Feature on Streaming Data ... 17

Using the Schema Discovery Feature on Static Data ... 18

Preprocessing Data Using a Lambda Function ... 21

Parallelizing Input Streams for Increased Throughput ... 28

Application Code ... 31

Output ... 33

Creating an Output Using the AWS CLI ... 33

Using a Lambda Function as Output ... 34

Application Output Delivery Model ... 40

Error Handling ... 41

Reporting Errors Using an In-Application Error Stream ... 41

Auto Scaling Applications ... 42

Tagging ... 42

Adding Tags when an Application is Created ... 43

Adding or Updating Tags for an Existing Application ... 43

Listing Tags for an Application ... 43

Removing Tags from an Application ... 43

Getting Started ... 45

Step 1: Set Up an Account ... 45

Sign Up for AWS ... 45

Create an IAM User ... 46

Next Step ... 46

Step 2: Set Up the AWS CLI ... 46

Next Step ... 47

Step 3: Create Your Starter Analytics Application ... 47

Step 3.1: Create an Application ... 49

Step 3.2: Configure Input ... 50

Step 3.3: Add Real-Time Analytics (Add Application Code) ... 53

Step 3.4: (Optional) Update the Application Code ... 56

Step 4 (Optional) Edit the Schema and SQL Code Using the Console ... 58

Working with the Schema Editor ... 58

Working with the SQL Editor ... 65

Streaming SQL Concepts ... 68

In-Application Streams and Pumps ... 68

Timestamps and the ROWTIME Column ... 69

Understanding Various Times in Streaming Analytics ... 69

Continuous Queries ... 71

Windowed Queries ... 72

Stagger Windows ... 72

Tumbling Windows ... 77

Sliding Windows ... 78

Stream Joins ... 82

Example 1: Report Orders Where There Are Trades Within One Minute of the Order Being Placed ... 82

Examples ... 84

(4)

Transforming Data ... 84

Preprocessing Streams with Lambda ... 84

Transforming String Values ... 84

Transforming DateTime Values ... 98

Transforming Multiple Data Types ... 102

Windows and Aggregation ... 107

Stagger Window ... 107

Tumbling Window Using ROWTIME ... 110

Tumbling Window Using an Event Timestamp ... 112

Most Frequently Occurring Values (TOP_K_ITEMS_TUMBLING) ... 115

Aggregating Partial Results ... 118

Joins ... 120

Example: Add Reference Data Source ... 120

Machine Learning ... 123

Detecting Anomalies ... 123

Example: Detect Anomalies and Get an Explanation ... 129

Example: Detect Hotspots ... 132

Alerts and Errors ... 142

Simple Alerts ... 142

Throttled Alerts ... 143

In-Application Error Stream ... 144

Solution Accelerators ... 145

Real-time insights on AWS account activity ... 145

Real-time AWS IoT device monitoring with Kinesis Data Analytics ... 146

Real-time web analytics with Kinesis Data Analytics ... 146

Amazon Connected Vehicle Solution ... 146

Security ... 147

Data Protection ... 147

Data Encryption ... 147

Identity and Access Management ... 148

Trust Policy ... 148

Permissions Policy ... 149

Monitoring ... 151

Compliance Validation ... 151

Resilience ... 151

Disaster Recovery ... 152

Infrastructure Security ... 152

Security Best Practices ... 152

Implement least privilege access ... 152

Use IAM roles to access other Amazon services ... 152

Implement Server-Side Encryption in Dependent Resources ... 153

Use CloudTrail to Monitor API Calls ... 153

Monitoring ... 154

Monitoring Tools ... 154

Automated Tools ... 155

Manual Tools ... 155

Monitoring with Amazon CloudWatch ... 155

Metrics and Dimensions ... 156

Viewing Metrics and Dimensions ... 157

Alarms ... 158

Logs ... 159

Using AWS CloudTrail ... 163

Kinesis Data Analytics Information in CloudTrail ... 164

Understanding Kinesis Data Analytics Log File Entries ... 164

Limits ... 166

Best Practices ... 168

Managing Applications ... 168

(5)

Scaling Applications ... 169

Defining Input Schema ... 169

Connecting to Outputs ... 170

Authoring Application Code ... 170

Testing Applications ... 171

Setting up a Test Application ... 171

Testing Schema Changes ... 171

Testing Code Changes ... 171

Troubleshooting ... 172

Unable to Run SQL Code ... 172

Unable to Detect or Discover My Schema ... 172

Reference Data is Out of Date ... 173

Application Not Writing to Destination ... 173

Important Application Health Parameters to Monitor ... 173

Invalid Code Errors When Running an Application ... 174

Application is Writing Errors to the Error Stream ... 174

Insufficient Throughput or High MillisBehindLatest ... 174

Authentication and Access Control ... 176

Authentication ... 176

Access Control ... 177

Overview of Managing Access ... 177

Amazon Kinesis Data Analytics Resources and Operations ... 178

Understanding Resource Ownership ... 178

Managing Access to Resources ... 178

Specifying Policy Elements: Actions, Effects, and Principals ... 180

Specifying Conditions in a Policy ... 180

Using Identity-Based Policies (IAM Policies) ... 181

Permissions Required to Use the Amazon Kinesis Data Analytics Console ... 181

Amazon-Managed (Predefined) Policies for Amazon Kinesis Data Analytics ... 182

Customer Managed Policy Examples ... 183

Amazon Kinesis Data Analytics API Permissions Reference ... 186

GetApplicationState ... 187

SQL Reference ... 188

API Reference ... 189

Actions ... 189

AddApplicationCloudWatchLoggingOption ... 190

AddApplicationInput ... 192

AddApplicationInputProcessingConfiguration ... 195

AddApplicationOutput ... 198

AddApplicationReferenceDataSource ... 201

CreateApplication ... 204

DeleteApplication ... 210

DeleteApplicationCloudWatchLoggingOption ... 212

DeleteApplicationInputProcessingConfiguration ... 214

DeleteApplicationOutput ... 216

DeleteApplicationReferenceDataSource ... 219

DescribeApplication ... 221

DiscoverInputSchema ... 225

ListApplications ... 229

ListTagsForResource ... 231

StartApplication ... 233

StopApplication ... 235

TagResource ... 237

UntagResource ... 239

UpdateApplication ... 241

Data Types ... 244

ApplicationDetail ... 246

(6)

ApplicationSummary ... 249

ApplicationUpdate ... 250

CloudWatchLoggingOption ... 251

CloudWatchLoggingOptionDescription ... 252

CloudWatchLoggingOptionUpdate ... 253

CSVMappingParameters ... 254

DestinationSchema ... 255

Input ... 256

InputConfiguration ... 258

InputDescription ... 259

InputLambdaProcessor ... 261

InputLambdaProcessorDescription ... 262

InputLambdaProcessorUpdate ... 263

InputParallelism ... 264

InputParallelismUpdate ... 265

InputProcessingConfiguration ... 266

InputProcessingConfigurationDescription ... 267

InputProcessingConfigurationUpdate ... 268

InputSchemaUpdate ... 269

InputStartingPositionConfiguration ... 270

InputUpdate ... 271

JSONMappingParameters ... 273

KinesisFirehoseInput ... 274

KinesisFirehoseInputDescription ... 275

KinesisFirehoseInputUpdate ... 276

KinesisFirehoseOutput ... 277

KinesisFirehoseOutputDescription ... 278

KinesisFirehoseOutputUpdate ... 279

KinesisStreamsInput ... 280

KinesisStreamsInputDescription ... 281

KinesisStreamsInputUpdate ... 282

KinesisStreamsOutput ... 283

KinesisStreamsOutputDescription ... 284

KinesisStreamsOutputUpdate ... 285

LambdaOutput ... 286

LambdaOutputDescription ... 287

LambdaOutputUpdate ... 288

MappingParameters ... 289

Output ... 290

OutputDescription ... 292

OutputUpdate ... 294

RecordColumn ... 296

RecordFormat ... 297

ReferenceDataSource ... 298

ReferenceDataSourceDescription ... 299

ReferenceDataSourceUpdate ... 300

S3Configuration ... 302

S3ReferenceDataSource ... 303

S3ReferenceDataSourceDescription ... 304

S3ReferenceDataSourceUpdate ... 305

SourceSchema ... 306

Tag ... 307

Document History ... 308

AWS glossary ... 311

(7)

When Should I Use Amazon Kinesis Data Analytics?

What Is Amazon Kinesis Data Analytics for SQL Applications?

With Amazon Kinesis Data Analytics for SQL Applications, you can process and analyze streaming data using standard SQL. The service enables you to quickly author and run powerful SQL code against streaming sources to perform time series analytics, feed real-time dashboards, and create real-time metrics.

To get started with Kinesis Data Analytics, you create a Kinesis data analytics application that continuously reads and processes streaming data. The service supports ingesting data from Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose streaming sources. Then, you author your SQL code using the interactive editor and test it with live streaming data. You can also configure destinations where you want Kinesis Data Analytics to send the results.

Kinesis Data Analytics supports Amazon Kinesis Data Firehose (Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk), AWS Lambda, and Amazon Kinesis Data Streams as destinations.

When Should I Use Amazon Kinesis Data Analytics?

Amazon Kinesis Data Analytics enables you to quickly author SQL code that continuously reads,

processes, and stores data in near real time. Using standard SQL queries on the streaming data, you can construct applications that transform and provide insights into your data. Following are some of example scenarios for using Kinesis Data Analytics:

Generate time-series analytics – You can calculate metrics over time windows, and then stream values to Amazon S3 or Amazon Redshift through a Kinesis data delivery stream.

Feed real-time dashboards – You can send aggregated and processed streaming data results downstream to feed real-time dashboards.

Create real-time metrics – You can create custom metrics and triggers for use in real-time monitoring, notifications, and alarms.

For information about the SQL language elements that are supported by Kinesis Data Analytics, see Amazon Kinesis Data Analytics SQL Reference.

Are You a First-Time User of Amazon Kinesis Data Analytics?

If you are a first-time user of Amazon Kinesis Data Analytics, we recommend that you read the following sections in order:

1.Read the How It Works section of this guide. This section introduces various Kinesis Data Analytics components that you work with to create an end-to-end experience. For more information, see Amazon Kinesis Data Analytics for SQL Applications: How It Works (p. 3).

(8)

Are You a First-Time User of Amazon Kinesis Data Analytics?

2.Try the Getting Started exercises. For more information, see Getting Started with Amazon Kinesis Data Analytics for SQL Applications (p. 45).

3.Explore the streaming SQL concepts. For more information, see Streaming SQL Concepts (p. 68).

4.Try additional examples. For more information, see Example Applications (p. 84).

(9)

Amazon Kinesis Data Analytics for SQL Applications: How It Works

An application is the primary resource in Amazon Kinesis Data Analytics that you can create in your account. You can create and manage applications using the AWS Management Console or the Kinesis Data Analytics API. Kinesis Data Analytics provides API operations to manage applications. For a list of API operations, see Actions (p. 189).

Kinesis Data Analytics applications continuously read and process streaming data in real time. You write application code using SQL to process the incoming streaming data and produce output. Then, Kinesis Data Analytics writes the output to a configured destination. The following diagram illustrates a typical application architecture.

Each application has a name, description, version ID, and status. Amazon Kinesis Data Analytics assigns a version ID when you first create an application. This version ID is updated when you update any application configuration. For example, if you add an input configuration, add or delete a reference data source, add or delete an output configuration, or update application code, Kinesis Data Analytics updates the current application version ID. Kinesis Data Analytics also maintains timestamps for when an application was created and last updated.

In addition to these basic properties, each application consists of the following:

Input – The streaming source for your application. You can select either a Kinesis data stream or a Kinesis Data Firehose data delivery stream as the streaming source. In the input configuration, you map the streaming source to an in-application input stream. The in-application stream is like a continuously updating table upon which you can perform the SELECT and INSERT SQL operations.

In your application code, you can create additional in-application streams to store intermediate query results.

 

(10)

You can optionally partition a single streaming source in multiple in-application input streams to improve the throughput. For more information, see Limits (p. 166) and Configuring Application Input (p. 5).

 

Amazon Kinesis Data Analytics provides a timestamp column in each application stream called Timestamps and the ROWTIME Column (p. 69). You can use this column in time-based windowed queries. For more information, see Windowed Queries (p. 72).

 

You can optionally configure a reference data source to enrich your input data stream within the application. It results in an in-application reference table. You must store your reference data as an object in your S3 bucket. When the application starts, Amazon Kinesis Data Analytics reads the Amazon S3 object and creates an in-application table. For more information, see Configuring Application Input (p. 5).

 

Application code – A series of SQL statements that process input and produce output. You can write SQL statements against in-application streams and reference tables. You can also write JOIN queries to combine data from both of these sources.

 

For information about the SQL language elements that are supported by Kinesis Data Analytics, see Amazon Kinesis Data Analytics SQL Reference.

 

In its simplest form, application code can be a single SQL statement that selects from a streaming input and inserts results into a streaming output. It can also be a series of SQL statements where output of one feeds into the input of the next SQL statement. Further, you can write application code to split an input stream into multiple streams. You can then apply additional queries to process these streams. For more information, see Application Code (p. 31).

 

Output – In application code, query results go to in-application streams. In your application code, you can create one or more in-application streams to hold intermediate results. You can then optionally configure the application output to persist data in the in-application streams that hold your application output (also referred to as in-application output streams) to external destinations.

External destinations can be a Kinesis Data Firehose delivery stream or a Kinesis data stream. Note the following about these destinations:

• You can configure a Kinesis Data Firehose delivery stream to write results to Amazon S3, Amazon Redshift, or Amazon OpenSearch Service (OpenSearch Service).

 

• You can also write application output to a custom destination instead of Amazon S3 or Amazon Redshift. To do that, you specify a Kinesis data stream as the destination in your output configuration. Then, you configure AWS Lambda to poll the stream and invoke your Lambda function. Your Lambda function code receives stream data as input. In your Lambda function code, you can write the incoming data to your custom destination. For more information, see Using AWS Lambda with Amazon Kinesis Data Analytics.

For more information, see Configuring Application Output (p. 33).

(11)

Input In addition, note the following:

• Amazon Kinesis Data Analytics needs permissions to read records from a streaming source and write application output to the external destinations. You use IAM roles to grant these permissions.

 

• Kinesis Data Analytics automatically provides an in-application error stream for each application. If your application has issues while processing certain records (for example, because of a type mismatch or late arrival), that record is written to the error stream. You can configure application output to direct Kinesis Data Analytics to persist the error stream data to an external destination for further evaluation.

For more information, see Error Handling (p. 41).

 

• Amazon Kinesis Data Analytics ensures that your application output records are written to the

configured destination. It uses an "at least once" processing and delivery model, even if you experience an application interruption. For more information, see Delivery Model for Persisting Application Output to an External Destination (p. 40).

Topics

• Configuring Application Input (p. 5)

• Application Code (p. 31)

• Configuring Application Output (p. 33)

• Error Handling (p. 41)

• Automatically Scaling Applications to Increase Throughput (p. 42)

• Using Tagging (p. 42)

Configuring Application Input

Your Amazon Kinesis Data Analytics application can receive input from a single streaming source and, optionally, use one reference data source. For more information, see Amazon Kinesis Data Analytics for SQL Applications: How It Works (p. 3). The sections in this topic describe the application input sources.

Topics

• Configuring a Streaming Source (p. 5)

• Configuring a Reference Source (p. 7)

• Working with JSONPath (p. 9)

• Mapping Streaming Source Elements to SQL Input Columns (p. 13)

• Using the Schema Discovery Feature on Streaming Data (p. 17)

• Using the Schema Discovery Feature on Static Data (p. 18)

• Preprocessing Data Using a Lambda Function (p. 21)

• Parallelizing Input Streams for Increased Throughput (p. 28)

Configuring a Streaming Source

At the time that you create an application, you specify a streaming source. You can also modify an input after you create the application. Amazon Kinesis Data Analytics supports the following streaming sources for your application:

(12)

Configuring a Streaming Source

• A Kinesis data stream

• A Kinesis Data Firehose delivery stream

Note

If the Kinesis data stream is encrypted, Kinesis Data Analytics accesses the data in the encrypted stream seamlessly with no further configuration needed. Kinesis Data Analytics does not store unencrypted data read from Kinesis Data Streams. For more information, see What Is Server- Side Encryption For Kinesis Data Streams?.

Kinesis Data Analytics continuously polls the streaming source for new data and ingests it in in- application streams according to the input configuration.

Note

Adding a Kinesis Stream as your application's input does not affect the data in the stream.

If another resource such as a Kinesis Data Firehose delivery stream also accessed the same Kinesis stream, both the Kinesis Data Firehose delivery stream and the Kinesis Data Analytics application would receive the same data. Throughput and throttling might be affected, however.

Your application code can query the in-application stream. As part of input configuration you provide the following:

Streaming source – You provide the Amazon Resource Name (ARN) of the stream and an IAM role that Kinesis Data Analytics can assume to access the stream on your behalf.

In-application stream name prefix – When you start the application, Kinesis Data Analytics creates the specified in-application stream. In your application code, you access the in-application stream using this name.

You can optionally map a streaming source to multiple in-application streams. For more information, see Limits (p. 166). In this case, Amazon Kinesis Data Analytics creates the specified number of in- application streams with names as follows: prefix_001, prefix_002, and prefix_003. By default, Kinesis Data Analytics maps the streaming source to one in-application stream named prefix_001.

There is a limit on the rate that you can insert rows in an in-application stream. Therefore, Kinesis Data Analytics supports multiple such in-application streams so that you can bring records into your application at a much faster rate. If you find that your application is not keeping up with the data in the streaming source, you can add units of parallelism to improve performance.

Mapping schema – You describe the record format (JSON, CSV) on the streaming source. You also describe how each record on the stream maps to columns in the in-application stream that is created.

This is where you provide column names and data types.

NoteKinesis Data Analytics adds quotation marks around the identifiers (stream name and column names) when creating the input in-application stream. When querying this stream and the columns, you must specify them in quotation marks using the same casing (matching lowercase and uppercase letters exactly). For more information about identifiers, see Identifiers in the Amazon Kinesis Data Analytics SQL Reference.

You can create an application and configure inputs in the Amazon Kinesis Data Analytics console. The console then makes the necessary API calls. You can configure application input when you create a new application API or add input configuration to an existing application. For more information, see CreateApplication (p. 204) and AddApplicationInput (p. 192). The following is the input configuration part of the Createapplication API request body:

"Inputs": [ {

"InputSchema": {

(13)

Configuring a Reference Source "RecordColumns": [

{

"Mapping": "string", "Name": "string", "SqlType": "string"

} ],

"RecordEncoding": "string", "RecordFormat": {

"MappingParameters": { "CSVMappingParameters": {

"RecordColumnDelimiter": "string", "RecordRowDelimiter": "string"

},

"JSONMappingParameters": { "RecordRowPath": "string"

} },

"RecordFormatType": "string"

} },

"KinesisFirehoseInput": { "ResourceARN": "string", "RoleARN": "string"

},

"KinesisStreamsInput": { "ResourceARN": "string", "RoleARN": "string"

},

"Name": "string"

} ]

Configuring a Reference Source

You can also optionally add a reference data source to an existing application to enrich the data coming in from streaming sources. You must store reference data as an object in your Amazon S3 bucket. When the application starts, Amazon Kinesis Data Analytics reads the Amazon S3 object and creates an in- application reference table. Your application code can then join it with an in-application stream.

You store reference data in the Amazon S3 object using supported formats (CSV, JSON). For example, suppose that your application performs analytics on stock orders. Assume the following record format on the streaming source:

Ticker, SalePrice, OrderId AMZN $700 1003 XYZ $250 1004 ...

In this case, you might then consider maintaining a reference data source to provide details for each stock ticker, such as company name.

Ticker, Company AMZN, Amazon XYZ, SomeCompany ...

You can add an application reference data source either with the API or with the console. Amazon Kinesis Data Analytics provides the following API actions to manage reference data sources:

(14)

Configuring a Reference Source

• AddApplicationReferenceDataSource (p. 201)

• UpdateApplication (p. 241)

For information about adding reference data using the console, see Example: Adding Reference Data to a Kinesis Data Analytics Application (p. 120).

Note the following:

• If the application is running, Kinesis Data Analytics creates an in-application reference table, and then loads the reference data immediately.

• If the application is not running (for example, it's in the ready state), Kinesis Data Analytics saves only the updated input configuration. When the application starts running, Kinesis Data Analytics loads the reference data in your application as a table.

Suppose that you want to refresh the data after Kinesis Data Analytics creates the in-application reference table. Perhaps you updated the Amazon S3 object, or you want to use a different Amazon S3 object. In this case, you can either explicitly call UpdateApplication (p. 241), or choose Actions, Synchronize reference data table in the console. Kinesis Data Analytics does not refresh the in- application reference table automatically.

There is a limit on the size of the Amazon S3 object that you can create as a reference data source. For more information, see Limits (p. 166). If the object size exceeds the limit, Kinesis Data Analytics can't load the data. The application state appears as running, but the data is not being read.

When you add a reference data source, you provide the following information:

S3 bucket and object key name – In addition to the bucket name and object key, you also provide an IAM role that Kinesis Data Analytics can assume to read the object on your behalf.

In-application reference table name – Kinesis Data Analytics creates this in-application table and populates it by reading the Amazon S3 object. This is the table name you specify in your application code.

Mapping schema – You describe the record format (JSON, CSV), encoding of data stored in the Amazon S3 object. You also describe how each data element maps to columns in the in-application reference table.

The following shows the request body in the AddApplicationReferenceDataSource API request.

{ "applicationName": "string",

"CurrentapplicationVersionId": number, "ReferenceDataSource": {

"ReferenceSchema": { "RecordColumns": [ {

"IsDropped": boolean, "Mapping": "string", "Name": "string", "SqlType": "string"

} ],

"RecordEncoding": "string", "RecordFormat": {

"MappingParameters": { "CSVMappingParameters": {

"RecordColumnDelimiter": "string", "RecordRowDelimiter": "string"

},

(15)

Working with JSONPath "JSONMappingParameters": { "RecordRowPath": "string"

} },

"RecordFormatType": "string"

} },

"S3ReferenceDataSource": { "BucketARN": "string", "FileKey": "string",

"ReferenceRoleARN": "string"

},

"TableName": "string"

} }

Working with JSONPath

JSONPath is a standardized way to query elements of a JSON object. JSONPath uses path expressions to navigate elements, nested elements, and arrays in a JSON document. For more information about JSON, see Introducing JSON.

Amazon Kinesis Data Analytics uses JSONPath expressions in the application's source schema to identify data elements in a streaming source that contains JSON-format data.

For more information about how to map streaming data to your application's input stream, see the section called “Mapping Streaming Source Elements to SQL Input Columns” (p. 13).

Accessing JSON Elements with JSONPath

Following, you can find how to use JSONPath expressions to access various elements in JSON-formatted data. For the examples in this section, assume that the source stream contains the following JSON record:

{ "customerName":"John Doe", "address":

{ "streetAddress":

[

"number":"123", "street":"AnyStreet"

],

"city":"Anytown"

} "orders":

[ { "orderId":"23284", "itemName":"Widget", "itemPrice":"33.99" }, { "orderId":"63122", "itemName":"Gadget", "itemPrice":"22.50" }, { "orderId":"77284", "itemName":"Sprocket", "itemPrice":"12.00" } ]

}

Accessing JSON Elements

To query an element in JSON data using JSONPath, use the following syntax. Here, $ represents the root of the data hierarchy and elementName is the name of the element node to query.

$.elementName

(16)

Working with JSONPath

The following expression queries the customerName element in the preceding JSON example.

$.customerName

The preceding expression returns the following from the preceding JSON record.

John Doe

NotePath expressions are case sensitive. The expression $.customername returns null from the preceding JSON example.

NoteIf no element appears at the location where the path expression specifies, the expression returns null. The following expression returns null from the preceding JSON example, because there is no matching element.

$.customerId

Accessing Nested JSON Elements

To query a nested JSON element, use the following syntax.

$.parentElement.element

The following expression queries the city element in the preceding JSON example.

$.address.city

The preceding expression returns the following from the preceding JSON record.

Anytown

You can query further levels of subelements using the following syntax.

$.parentElement.element.subElement

The following expression queries the street element in the preceding JSON example.

$.address.streetAddress.street

The preceding expression returns the following from the preceding JSON record.

AnyStreet

Accessing Arrays

You can access the data in a JSON array in the following ways:

(17)

Working with JSONPath

• Retrieve all the elements in the array as a single row.

• Retrieve each element in the array as a separate row.

Retrieve All Elements in an Array in a Single Row

To query the entire contents of an array as a single row, use the following syntax.

$.arrayObject[0:]

The following expression queries the entire contents of the orders element in the preceding JSON example used in this section. It returns the array contents in a single column in a single row.

$.orders[0:]

The preceding expression returns the following from the example JSON record used in this section.

[{"orderId":"23284","itemName":"Widget","itemPrice":"33.99"}, {"orderId":"61322","itemName":"Gadget","itemPrice":"22.50"}, {"orderId":"77284","itemName":"Sprocket","itemPrice":"12.00"}]

Retrieve All Elements in an Array in Separate Rows

To query the individual elements in an array as separate rows, use the following syntax.

$.arrayObject[0:].element

The following expression queries the orderId elements in the preceding JSON example, and returns each array element as a separate row.

$.orders[0:].orderId

The preceding expression returns the following from the preceding JSON record, with each data item returned as a separate row.

23284 63122 77284

NoteIf expressions that query nonarray elements are included in a schema that queries individual array elements, the nonarray elements are repeated for each element in the array. For example, suppose that a schema for the preceding JSON example includes the following expressions:

• $.customerName

• $.orders[0:].orderId

In this case, the returned data rows from the sample input stream element resemble the following, with the name element repeated for every orderId element.

(18)

Working with JSONPath

John Doe 23284

John Doe 63122

John Doe 77284

NoteThe following limitations apply to array expressions in Amazon Kinesis Data Analytics:

• Only one level of dereferencing is supported in an array expression. The following expression format is not supported.

$.arrayObject[0:].element[0:].subElement

• Only one array can be flattened in a schema. Multiple arrays can be referenced—returned as one row containing all of the elements in the array. However, only one array can have each of its elements returned as individual rows.

A schema containing elements in the following format is valid. This format returns the contents of the second array as a single column, repeated for every element in the first array.

$.arrayObjectOne[0:].element

$.arrayObjectTwo[0:]

A schema containing elements in the following format is not valid.

$.arrayObjectOne[0:].element

$.arrayObjectTwo[0:].element

Other Considerations

Additional considerations for working with JSONPath are as follows:

• If no arrays are accessed by an individual element in the JSONPath expressions in the application schema, then a single row is created in the application's input stream for each JSON record processed.

• When an array is flattened (that is, its elements are returned as individual rows), any missing elements result in a null value being created in the in-application stream.

• An array is always flattened to at least one row. If no values would be returned (that is, the array is empty or none of its elements are queried), a single row with all null values is returned.

The following expression returns records with null values from the preceding JSON example, because there is no matching element at the specified path.

$.orders[0:].itemId

The preceding expression returns the following from the preceding JSON example record.

null null null

(19)

Mapping Streaming Source Elements to SQL Input Columns

Related Topics

• Introducing JSON

Mapping Streaming Source Elements to SQL Input Columns

With Amazon Kinesis Data Analytics, you can process and analyze streaming data in either JSON or CSV formats using standard SQL.

• To process and analyze streaming CSV data, you assign column names and data types for the columns of the input stream. Your application imports one column from the input stream per column definition, in order.

You don't have to include all of the columns in the application input stream, but you cannot skip columns from the source stream. For example, you can import the first three columns from an input stream containing five elements, but you cannot import only columns 1, 2, and 4.

• To process and analyze streaming JSON data, you use JSONPath expressions to map JSON elements from a streaming source to SQL columns in an input stream. For more information about using JSONPath with Amazon Kinesis Data Analytics, see Working with JSONPath (p. 9). The columns in the SQL table have data types that are mapped from JSON types. For supported data types, see Data Types. For details about converting JSON data to SQL data, see Mapping JSON Data Types to SQL Data Types (p. 15).

For more information about how to configure input streams, see Configuring Application Input (p. 5).

Mapping JSON Data to SQL Columns

You can map JSON elements to input columns using the AWS Management Console or the Kinesis Data Analytics API.

• To map elements to columns using the console, see Working with the Schema Editor (p. 58).

• To map elements to columns using the Kinesis Data Analytics API, see the following section.

To map JSON elements to columns in the in-application input stream, you need a schema with the following information for each column:

Source Expression: The JSONPath expression that identifies the location of the data for the column.

Column Name: The name that your SQL queries use to reference the data.

Data Type: The SQL data type for the column.

Using the API

To map elements from a streaming source to input columns, you can use the Kinesis Data Analytics API CreateApplication (p. 204) action. To create the in-application stream, specify a schema to transform your data into a schematized version used in SQL. The CreateApplication (p. 204) action configures your application to receive input from a single streaming source. To map JSON elements or CSV columns to SQL columns, you create a RecordColumn (p. 296) object in the SourceSchema (p. 306) RecordColumns array. The RecordColumn (p. 296) object has the following schema:

{

(20)

Mapping Streaming Source Elements to SQL Input Columns "Mapping": "String",

"Name": "String", "SqlType": "String"

}

The fields in the RecordColumn (p. 296) object have the following values:

• Mapping: The JSONPath expression that identifies the location of the data in the input stream record.

This value is not present for an input schema for a source stream in CSV format.

• Name: The column name in the in-application SQL data stream.

• SqlType: The data type of the data in the in-application SQL data stream.

JSON Input Schema Example

The following example demonstrates the format of the InputSchema value for a JSON schema.

"InputSchema": { "RecordColumns": [ {

"SqlType": "VARCHAR(4)", "Name": "TICKER_SYMBOL", "Mapping": "$.TICKER_SYMBOL"

}, {

"SqlType": "VARCHAR(16)", "Name": "SECTOR",

"Mapping": "$.SECTOR"

}, {

"SqlType": "TINYINT", "Name": "CHANGE", "Mapping": "$.CHANGE"

}, {

"SqlType": "DECIMAL(5,2)", "Name": "PRICE",

"Mapping": "$.PRICE"

} ],

"RecordFormat": {

"MappingParameters": {

"JSONMappingParameters": { "RecordRowPath": "$"

} },

"RecordFormatType": "JSON"

},

"RecordEncoding": "UTF-8"

}

CSV Input Schema Example

The following example demonstrates the format of the InputSchema value for a schema in comma- separated value (CSV) format.

"InputSchema": { "RecordColumns": [ {

(21)

Mapping Streaming Source Elements to SQL Input Columns "SqlType": "VARCHAR(16)",

"Name": "LastName"

}, {

"SqlType": "VARCHAR(16)", "Name": "FirstName"

}, {

"SqlType": "INTEGER", "Name": "CustomerId"

} ],

"RecordFormat": {

"MappingParameters": { "CSVMappingParameters": {

"RecordColumnDelimiter": ",", "RecordRowDelimiter": "\n"

} },

"RecordFormatType": "CSV"

},

"RecordEncoding": "UTF-8"

}

Mapping JSON Data Types to SQL Data Types

JSON data types are converted to corresponding SQL data types according to the application's input schema. For information about supported SQL data types, see Data Types. Amazon Kinesis Data Analytics converts JSON data types to SQL data types according to the following rules.

Null Literal

A null literal in the JSON input stream (that is, "City":null) converts to a SQL null regardless of destination data type.

Boolean Literal

A Boolean literal in the JSON input stream (that is, "Contacted":true) converts to SQL data as follows:

• Numeric (DECIMAL, INT, and so on): true converts to 1; false converts to 0.

• Binary (BINARY or VARBINARY):

• true: Result has lowest bit set and remaining bits cleared.

• false: Result has all bits cleared.

Conversion to VARBINARY results in a value 1 byte in length.

• BOOLEAN: Converts to the corresponding SQL BOOLEAN value.

• Character (CHAR or VARCHAR): Converts to the corresponding string value (true or false). The value is truncated to fit the length of the field.

• Datetime (DATE, TIME, or TIMESTAMP): Conversion fails and a coercion error is written to the error stream.

Number

A number literal in the JSON input stream (that is, "CustomerId":67321) converts to SQL data as follows:

(22)

Mapping Streaming Source Elements to SQL Input Columns

• Numeric (DECIMAL, INT, and so on): Converts directly. If the converted value exceeds the size or precision of the target data type (that is, converting 123.4 to INT), conversion fails and a coercion error is written to the error stream.

• Binary (BINARY or VARBINARY): Conversion fails and a coercion error is written to the error stream.

• BOOLEAN:

• 0: Converts to false.

• All other numbers: Converts to true.

• Character (CHAR or VARCHAR): Converts to a string representation of the number.

• Datetime (DATE, TIME, or TIMESTAMP): Conversion fails and a coercion error is written to the error stream.

String

A string value in the JSON input stream (that is, "CustomerName":"John Doe") converts to SQL data as follows:

• Numeric (DECIMAL, INT, and so on): Amazon Kinesis Data Analytics attempts to convert the value to the target data type. If the value cannot be converted, conversion fails and a coercion error is written to the error stream.

• Binary (BINARY or VARBINARY): If the source string is a valid binary literal (that is, X'3F67A23A', with an even number of f), the value is converted to the target data type. Otherwise, conversion fails and a coercion error is written to the error stream.

• BOOLEAN: If the source string is "true", converts to true. This comparison is case-insensitive.

Otherwise, converts to false.

• Character (CHAR or VARCHAR): Converts to the string value in the input. If the value is longer than the target data type, it is truncated and no error is written to the error stream.

• Datetime (DATE, TIME, or TIMESTAMP): If the source string is in a format that can be converted to the target value, the value is converted. Otherwise, conversion fails and a coercion error is written to the error stream.

Valid datetime formats include:

• "1992-02-14"

• "1992-02-14 18:35:44.0"

Array or Object

An array or object in the JSON input stream converts to SQL data as follows:

• Character (CHAR or VARCHAR): Converts to the source text of the array or object. See Accessing Arrays (p. 10).

• All other data types: Conversion fails and a coercion error is written to the error stream.

For an example of a JSON array, see Working with JSONPath (p. 9).

Related Topics

• Configuring Application Input (p. 5)

• Data Types

• Working with the Schema Editor (p. 58)

• CreateApplication (p. 204)

(23)

Using the Schema Discovery Feature on Streaming Data

• RecordColumn (p. 296)

• SourceSchema (p. 306)

Using the Schema Discovery Feature on Streaming Data

Providing an input schema that describes how records on the streaming input map to an in-application stream can be cumbersome and error prone. You can use the DiscoverInputSchema (p. 225) API (called the discovery API) to infer a schema. Using random samples of records on the streaming source, the API can infer a schema (that is, column names, data types, and position of the data element in the incoming data).

Note

To use the Discovery API to generate a schema from a file stored in Amazon S3, see Using the Schema Discovery Feature on Static Data (p. 18).

The console uses the Discovery API to generate a schema for a specified streaming source. Using the console, you can also update the schema, including adding or removing columns, changing column names or data types, and so on. However, make changes carefully to ensure that you do not create an invalid schema.

After you finalize a schema for your in-application stream, there are functions you can use to manipulate string and datetime values. You can use these functions in your application code when working with rows in the resulting in-application stream. For more information, see Example: Transforming DateTime Values (p. 98).

Column Naming During Schema Discovery

During schema discovery, Amazon Kinesis Data Analytics tries to retain as much of the original column name as possible from the streaming input source, except in the following cases:

• The source stream column name is a reserved SQL keyword, such as TIMESTAMP, USER, VALUES, or YEAR.

• The source stream column name contains unsupported characters. Only letters, numbers, and the underscore character ( _ ) are supported.

• The source stream column name begins with a number.

• The source stream column name is longer than 100 characters.

If a column is renamed, the renamed schema column name begins with COL_. In some cases, none of the original column name can be retained—for example, if the entire name is unsupported characters.

In such a case, the column is named COL_#, with # being a number indicating the column's place in the column order.

After discovery completes, you can update the schema using the console to add or remove columns, or change column names, data types, or data size.

Examples of Discovery-Suggested Column Names

Source Stream Column Name Discovery-Suggested Column Name

USER COL_USER

(24)

Using the Schema Discovery Feature on Static Data

Source Stream Column Name Discovery-Suggested Column Name

USER@DOMAIN COL_USERDOMAIN

@@ COL_0

Schema Discovery Issues

What happens if Kinesis Data Analytics does not infer a schema for a given streaming source?

Kinesis Data Analytics infers your schema for common formats, such as CSV and JSON, which are UTF-8 encoded. Kinesis Data Analytics supports any UTF-8 encoded records (including raw text like application logs and records) with a custom column and row delimiter. If Kinesis Data Analytics doesn't infer a schema, you can define a schema manually using the schema editor on the console (or using the API).

If your data does not follow a pattern (which you can specify using the schema editor), you can define a schema as a single column of type VARCHAR(N), where N is the largest number of characters you expect your record to include. From there, you can use string and date-time manipulation to structure your data after it is in an in-application stream. For examples, see Example: Transforming DateTime Values (p. 98).

Using the Schema Discovery Feature on Static Data

The schema discovery feature can generate a schema from either the data in a stream or data in a static file that is stored in an Amazon S3 bucket. Suppose that you want to generate a schema for a Kinesis Data Analytics application for reference purposes or when live streaming data isn't available. You can use the schema discovery feature on a static file that contains a sample of the data in the expected format of your streaming or reference data. Kinesis Data Analytics can run schema discovery on sample data from a JSON or CSV file that's stored in an Amazon S3 bucket. Using schema discovery on a data file uses either the console, or the DiscoverInputSchema (p. 225) API with the S3Configuration parameter specified.

Running Schema Discovery Using the Console

To run discovery on a static file using the console, do the following:

1. Add a reference data object to an S3 bucket.

2. Choose Connect reference data in the application's main page in the Kinesis Data Analytics console.

3. Provide the bucket, path and IAM role data for accessing the Amazon S3 object containing the reference data.

4. Choose Discover schema.

For more information on how to add reference data and discover schema in the console, see Example:

Adding Reference Data to a Kinesis Data Analytics Application (p. 120).

Running Schema Discovery Using the API

To run discovery on a static file using the API, you provide the API with an S3Configuration structure with the following information:

• BucketARN: The Amazon Resource Name (ARN) of the Amazon S3 bucket that contains the file. For the format of an Amazon S3 bucket ARN, see Amazon Resource Names (ARNs) and Amazon Service Namespaces: Amazon Simple Storage Service (Amazon S3).

(25)

Using the Schema Discovery Feature on Static Data

• RoleARN: The ARN of an IAM role with the AmazonS3ReadOnlyAccess policy. For information about how to add a policy to a role, see Modifying a Role.

• FileKey: The file name of the object.

To generate a schema from an Amazon S3 object using the DiscoverInputSchema API 1. Make sure that you have the AWS CLI set up. For more information, see Step 2: Set Up the AWS

Command Line Interface (AWS CLI) (p. 46) in the Getting Started section.

2. Create a file named data.csv with the following contents:

year,month,state,producer_type,energy_source,units,consumption 2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615

2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535 2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890 2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601

2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681 3. Sign in to the Amazon S3 console at https://console.aws.amazon.com/s3/.

4. Create an Amazon S3 bucket and upload the data.csv file you created. Note the ARN of the created bucket. For information about creating an Amazon S3 bucket and uploading a file, see Getting Started with Amazon Simple Storage Service.

5. Open the IAM console at https://console.aws.amazon.com/iam/. Create a role with the

AmazonS3ReadOnlyAccess policy. Note the ARN of the new role. For information about creating a role, see Creating a Role to Delegate Permissions to an Amazon Service. For information about how to add a policy to a role, see Modifying a Role.

6. Run the following DiscoverInputSchema command in the AWS CLI, substituting the ARNs for your Amazon S3 bucket and IAM role:

$aws kinesisanalytics discover-input-schema --s3-configuration '{ "RoleARN":

"arn:aws:iam::123456789012:role/service-role/your-IAM-role", "BucketARN":

"arn:aws:s3:::your-bucket-name", "FileKey": "data.csv" }' 7. The response looks similar to the following:

{ "InputSchema": {

"RecordEncoding": "UTF-8", "RecordColumns": [

{

"SqlType": "INTEGER", "Name": "COL_year"

}, {

"SqlType": "INTEGER", "Name": "COL_month"

}, {

"SqlType": "VARCHAR(4)", "Name": "state"

}, {

"SqlType": "VARCHAR(64)", "Name": "producer_type"

}, {

"SqlType": "VARCHAR(4)", "Name": "energy_source"

}, {

(26)

Using the Schema Discovery Feature on Static Data "SqlType": "VARCHAR(16)",

"Name": "units"

}, {

"SqlType": "INTEGER", "Name": "consumption"

} ],

"RecordFormat": {

"RecordFormatType": "CSV", "MappingParameters": {

"CSVMappingParameters": {

"RecordRowDelimiter": "\r\n", "RecordColumnDelimiter": ","

} } } },

"RawInputRecords": [

"year,month,state,producer_type,energy_source,units,consumption

\r\n2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615\r

\n2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535\r

\n2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890\r

\n2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601\r

\n2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681"

],

"ParsedInputRecords": [ [

null, null, "state",

"producer_type", "energy_source", "units",

null ], [

"2001", "1", "AK",

"TotalElectricPowerIndustry", "Coal",

"ShortTons", "47615"

], [

"2001", "1", "AK",

"ElectricGeneratorsElectricUtilities", "Coal",

"ShortTons", "16535"

], [

"2001", "1", "AK",

"CombinedHeatandPowerElectricPower", "Coal",

"ShortTons", "22890"

], [

"2001", "1",

(27)

Preprocessing Data Using a Lambda Function "AL",

"TotalElectricPowerIndustry", "Coal",

"ShortTons", "3020601"

], [

"2001", "1", "AL",

"ElectricGeneratorsElectricUtilities", "Coal",

"ShortTons", "2987681"

] ] }

Preprocessing Data Using a Lambda Function

If the data in your stream needs format conversion, transformation, enrichment, or filtering, you can preprocess the data using an AWS Lambda function. You can do this before your application SQL code executes or before your application creates a schema from your data stream.

Using a Lambda function for preprocessing records is useful in the following scenarios:

• Transforming records from other formats (such as KPL or GZIP) into formats that Kinesis Data Analytics can analyze. Kinesis Data Analytics currently supports JSON or CSV data formats.

• Expanding data into a format that is more accessible for operations such as aggregation or anomaly detection. For instance, if several data values are stored together in a string, you can expand the data into separate columns.

• Data enrichment with other Amazon services, such as extrapolation or error correction.

• Applying complex string transformation to record fields.

• Data filtering for cleaning up the data.

Using a Lambda Function for Preprocessing Records

When creating your Kinesis Data Analytics application, you enable Lambda preprocessing in the Connect to a Source page.

To use a Lambda function to preprocess records in a Kinesis Data Analytics application 1. Sign in to the AWS Management Console and open the Kinesis Data Analytics console at https://

console.aws.amazon.com/kinesisanalytics.

2. On the Connect to a Source page for your application, choose Enabled in the Record pre- processing with AWS Lambda section.

3. To use a Lambda function that you have already created, choose the function in the Lambda function drop-down list.

4. To create a new Lambda function from one of the Lambda preprocessing templates, choose the template from the drop-down list. Then choose View <template name> in Lambda to edit the function.

5. To create a new Lambda function, choose Create new. For information about creating a Lambda function, see Create a HelloWorld Lambda Function and Explore the Console in the AWS Lambda Developer Guide.

(28)

Preprocessing Data Using a Lambda Function

6. Choose the version of the Lambda function to use. To use the latest version, choose $LATEST.

When you choose or create a Lambda function for record preprocessing, the records are preprocessed before your application SQL code executes or your application generates a schema from the records.

Lambda Preprocessing Permissions

To use Lambda preprocessing, the application's IAM role requires the following permissions policy:

{

"Sid": "UseLambdaFunction", "Effect": "Allow",

"Action": [

"lambda:InvokeFunction",

"lambda:GetFunctionConfiguration"

],

"Resource": "<FunctionARN>"

}

For more information about adding permissions policies, see Authentication and Access Control for Amazon Kinesis Data Analytics for SQL Applications (p. 176).

Lambda Preprocessing Metrics

You can use Amazon CloudWatch to monitor the number of Lambda invocations, bytes processed, successes and failures, and so on. For information about CloudWatch metrics that are emitted by Kinesis Data Analytics Lambda preprocessing, see Amazon Kinesis Analytics Metrics.

Using AWS Lambda with the Kinesis Producer Library

The Kinesis Producer Library (KPL) aggregates small user-formatted records into larger records up to 1 MB to make better use of Amazon Kinesis Data Streams throughput. The Kinesis Client Library (KCL) for Java supports deaggregating these records. However, you must use a special module to deaggregate the records when you use AWS Lambda as the consumer of your streams.

To get the necessary project code and instructions, see the Kinesis Producer Library Deaggregation Modules for AWS Lambda on GitHub. You can use the components in this project to process KPL serialized data within AWS Lambda in Java, Node.js, and Python. You can also use these components as part of a multi-lang KCL application.

Data Preprocessing Event Input Data Model/Record Response Model

To preprocess records, your Lambda function must be compliant with the required event input data and record response models.

Event Input Data Model

Kinesis Data Analytics continuously reads data from your Kinesis data stream or Kinesis Data Firehose delivery stream. For each batch of records it retrieves, the service manages how each batch gets passed to your Lambda function. Your function receives a list of records as input. Within your function, you iterate through the list and apply your business logic to accomplish your preprocessing requirements (such as data format conversion or enrichment).

The input model to your preprocessing function varies slightly, depending on whether the data was received from a Kinesis data stream or a Kinesis Data Firehose delivery stream.

(29)

Preprocessing Data Using a Lambda Function

If the source is a Kinesis Data Firehose delivery stream, the event input data model is as follows:

Kinesis Data Firehose Request Data Model

Field Description

invocationId The Lambda invocation Id (random GUID).

applicationArn Kinesis Data Analytics application Amazon Resource Name (ARN)

streamArn Delivery stream ARN

records

Field Description

recordId record ID (random GUID)

kinesisFirehoseRecordMetadata

Field Description

approximateArrivalTimestampDelivery stream record approximate arrival time

data Base64-encoded source record payload

The following example shows input from a Firehose delivery stream:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",

"applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda- test",

"streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[

{

"recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=",

"kinesisFirehoseRecordMetadata":{

"approximateArrivalTimestamp":1520280173 }

} ]}

If the source is a Kinesis data stream, the event input data model is as follows:

Kinesis Streams Request Data Model

Field Description

invocationId The Lambda invocation Id (random GUID).

applicationArn Kinesis Data Analytics application ARN

streamArn Delivery stream ARN

records

(30)

Preprocessing Data Using a Lambda Function

Field Description

Field Description

recordId record ID based off of Kinesis record sequence number

kinesisStreamRecordMetadata

Field Description sequenceNumberSequence number

from the Kinesis stream record

partitionKeyPartition key from the Kinesis stream record shardId ShardId from the

Kinesis stream record approximateArrivalTimestampDelivery stream record

approximate arrival time

data Base64-encoded source record payload

The following example shows input from a Kinesis data stream:

{

"invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",

"applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda- test",

"streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [

{

"recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=",

"kinesisStreamRecordMetadata":{

"shardId" :"shardId-000000000003", "partitionKey":"7400791606",

"sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173

} } ]}

Record Response Model

All records returned from your Lambda preprocessing function (with record IDs) that are sent to the Lambda function must be returned. They must contain the following parameters, or Kinesis Data Analytics rejects them and treats it as a data preprocessing failure. The data payload part of the record can be transformed to accomplish preprocessing requirements.

Response Data Model

records

(31)

Preprocessing Data Using a Lambda Function

Field Description

recordId The record ID is passed from Kinesis Data Analytics to Lambda during the invocation. The transformed record must contain the same record ID. Any mismatch between the ID of the original record and the ID of the transformed record is treated as a data preprocessing failure.

result The status of the data transformation of the record. The possible values are:

• Ok: The record was transformed successfully. Kinesis Data Analytics ingests the record for SQL processing.

• Dropped: The record was dropped intentionally by your processing logic. Kinesis Data Analytics drops the record from SQL processing. The data payload field is optional for a Dropped record.

• ProcessingFailed: The record could not be transformed.

Kinesis Data Analytics considers it unsuccessfully processed by your Lambda function and writes an error to the error stream. For more information about the error stream, see Error Handling (p. 41). The data payload field is optional for a ProcessingFailed record.

data The transformed data payload, after base64-encoding. Each data payload can contain multiple JSON documents if the application ingestion data format is JSON. Or each can contain multiple CSV rows (with a row delimiter specified in each row) if the application ingestion data format is CSV. The Kinesis Data Analytics service successfully parses and processes data with either multiple JSON documents or CSV rows within the same data payload.

The following example shows output from a Lambda function:

{ "records": [ {

"recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok",

"data": "SEVMTE8gV09STEQ="

} ] }

Common Data Preprocessing Failures

The following are common reasons why preprocessing can fail.

• Not all records (with record IDs) in a batch that are sent to the Lambda function are returned back to the Kinesis Data Analytics service.

• The response is missing either the record ID, status, or data payload field. The data payload field is optional for a Dropped or ProcessingFailed record.

• The Lambda function timeouts are not sufficient to preprocess the data.

• The Lambda function response exceeds the response limits imposed by the AWS Lambda service.

(32)

Preprocessing Data Using a Lambda Function

For data preprocessing failures, Kinesis Data Analytics continues to retry Lambda invocations on the same set of records until successful. You can monitor the following CloudWatch metrics to gain insight into failures.

• Kinesis Data Analytics application MillisBehindLatest: Indicates how far behind an application is reading from the streaming source.

• Kinesis Data Analytics application InputPreprocessing CloudWatch metrics: Indicates the number of successes and failures, among other statistics. For more information, see Amazon Kinesis Analytics Metrics.

• AWS Lambda function CloudWatch metrics and logs.

Creating Lambda Functions for Preprocessing

Your Amazon Kinesis Data Analytics application can use Lambda functions for preprocessing records as they are ingested into the application. Kinesis Data Analytics provides the following templates on the console to use as a starting point for preprocessing your data.

Topics

• Creating a Preprocessing Lambda Function in Node.js (p. 26)

• Creating a Preprocessing Lambda Function in Python (p. 26)

• Creating a Preprocessing Lambda Function in Java (p. 27)

• Creating a Preprocessing Lambda Function in .NET (p. 27)

Creating a Preprocessing Lambda Function in Node.js

The following templates for creating preprocessing Lambda function in Node.js are available on the Kinesis Data Analytics console:

Lambda Blueprint Language and version Description General Kinesis

Data Analytics Input Processing

Node.js 6.10 A Kinesis Data Analytics record preprocessor that receives JSON or CSV records as input and then returns them with a processing status. Use this processor as a starting point for custom transformation logic.

Compressed Input

Processing Node.js 6.10 A Kinesis Data Analytics record processor that receives compressed (GZIP or Deflate compressed) JSON or CSV records as input and returns

decompressed records with a processing status.

Creating a Preprocessing Lambda Function in Python

The following templates for creating preprocessing Lambda function in Python are available on the console:

Lambda Blueprint Language and version Description General Kinesis

Analytics Input Processing

Python 2.7 A Kinesis Data Analytics record preprocessor that receives JSON or CSV records as input and then returns them with a processing status. Use

參考文獻

相關文件

Writing texts to convey information, ideas, personal experiences and opinions on familiar topics with elaboration. Writing texts to convey information, ideas, personal

Now, nearly all of the current flows through wire S since it has a much lower resistance than the light bulb. The light bulb does not glow because the current flowing through it

During early childhood, developing proficiency in the mother-tongue is of primary importance. Cantonese is most Hong Kong children’s mother-tongue and should also be the medium

Writing texts to convey simple information, ideas, personal experiences and opinions on familiar topics with some elaboration. Writing texts to convey information, ideas,

This kind of algorithm has also been a powerful tool for solving many other optimization problems, including symmetric cone complementarity problems [15, 16, 20–22], symmetric

To facilitate the Administrator to create student accounts, a set of procedures is prepared for the Administrator to extract the student accounts from WebSAMS. For detailed

Following the supply by the school of a copy of personal data in compliance with a data access request, the requestor is entitled to ask for correction of the personal data

• Use table to create a table for column-oriented or tabular data that is often stored as columns in a spreadsheet.. • Use detectImportOptions to create import options based on