Amazon Kinesis Data Analytics for SQL
Applications Developer Guide
SQL Developer Guide
Amazon Kinesis Data Analytics for SQL Applications Developer Guide:
SQL Developer Guide
Copyright © Amazon Web Services, Inc. and/or its affiliates. All rights reserved.
Amazon's trademarks and trade dress may not be used in connection with any product or service that is not Amazon's, in any manner that is likely to cause confusion among customers, or in any manner that disparages or discredits Amazon. All other trademarks not owned by Amazon are the property of their respective owners, who may or may not be affiliated with, connected to, or sponsored by Amazon.
Table of Contents
What Is Amazon Kinesis Data Analytics for SQL Applications? ... 1
When Should I Use Amazon Kinesis Data Analytics? ... 1
Are You a First-Time User of Amazon Kinesis Data Analytics? ... 1
How It Works ... 3
Input ... 5
Configuring a Streaming Source ... 5
Configuring a Reference Source ... 7
Working with JSONPath ... 9
Mapping Streaming Source Elements to SQL Input Columns ... 13
Using the Schema Discovery Feature on Streaming Data ... 17
Using the Schema Discovery Feature on Static Data ... 18
Preprocessing Data Using a Lambda Function ... 21
Parallelizing Input Streams for Increased Throughput ... 28
Application Code ... 31
Output ... 33
Creating an Output Using the AWS CLI ... 33
Using a Lambda Function as Output ... 34
Application Output Delivery Model ... 40
Error Handling ... 41
Reporting Errors Using an In-Application Error Stream ... 41
Auto Scaling Applications ... 42
Tagging ... 42
Adding Tags when an Application is Created ... 43
Adding or Updating Tags for an Existing Application ... 43
Listing Tags for an Application ... 43
Removing Tags from an Application ... 43
Getting Started ... 45
Step 1: Set Up an Account ... 45
Sign Up for AWS ... 45
Create an IAM User ... 46
Next Step ... 46
Step 2: Set Up the AWS CLI ... 46
Next Step ... 47
Step 3: Create Your Starter Analytics Application ... 47
Step 3.1: Create an Application ... 49
Step 3.2: Configure Input ... 50
Step 3.3: Add Real-Time Analytics (Add Application Code) ... 53
Step 3.4: (Optional) Update the Application Code ... 56
Step 4 (Optional) Edit the Schema and SQL Code Using the Console ... 58
Working with the Schema Editor ... 58
Working with the SQL Editor ... 65
Streaming SQL Concepts ... 68
In-Application Streams and Pumps ... 68
Timestamps and the ROWTIME Column ... 69
Understanding Various Times in Streaming Analytics ... 69
Continuous Queries ... 71
Windowed Queries ... 72
Stagger Windows ... 72
Tumbling Windows ... 77
Sliding Windows ... 78
Stream Joins ... 82
Example 1: Report Orders Where There Are Trades Within One Minute of the Order Being Placed ... 82
Examples ... 84
Transforming Data ... 84
Preprocessing Streams with Lambda ... 84
Transforming String Values ... 84
Transforming DateTime Values ... 98
Transforming Multiple Data Types ... 102
Windows and Aggregation ... 107
Stagger Window ... 107
Tumbling Window Using ROWTIME ... 110
Tumbling Window Using an Event Timestamp ... 112
Most Frequently Occurring Values (TOP_K_ITEMS_TUMBLING) ... 115
Aggregating Partial Results ... 118
Joins ... 120
Example: Add Reference Data Source ... 120
Machine Learning ... 123
Detecting Anomalies ... 123
Example: Detect Anomalies and Get an Explanation ... 129
Example: Detect Hotspots ... 132
Alerts and Errors ... 142
Simple Alerts ... 142
Throttled Alerts ... 143
In-Application Error Stream ... 144
Solution Accelerators ... 145
Real-time insights on AWS account activity ... 145
Real-time AWS IoT device monitoring with Kinesis Data Analytics ... 146
Real-time web analytics with Kinesis Data Analytics ... 146
Amazon Connected Vehicle Solution ... 146
Security ... 147
Data Protection ... 147
Data Encryption ... 147
Identity and Access Management ... 148
Trust Policy ... 148
Permissions Policy ... 149
Monitoring ... 151
Compliance Validation ... 151
Resilience ... 151
Disaster Recovery ... 152
Infrastructure Security ... 152
Security Best Practices ... 152
Implement least privilege access ... 152
Use IAM roles to access other Amazon services ... 152
Implement Server-Side Encryption in Dependent Resources ... 153
Use CloudTrail to Monitor API Calls ... 153
Monitoring ... 154
Monitoring Tools ... 154
Automated Tools ... 155
Manual Tools ... 155
Monitoring with Amazon CloudWatch ... 155
Metrics and Dimensions ... 156
Viewing Metrics and Dimensions ... 157
Alarms ... 158
Logs ... 159
Using AWS CloudTrail ... 163
Kinesis Data Analytics Information in CloudTrail ... 164
Understanding Kinesis Data Analytics Log File Entries ... 164
Limits ... 166
Best Practices ... 168
Managing Applications ... 168
Scaling Applications ... 169
Defining Input Schema ... 169
Connecting to Outputs ... 170
Authoring Application Code ... 170
Testing Applications ... 171
Setting up a Test Application ... 171
Testing Schema Changes ... 171
Testing Code Changes ... 171
Troubleshooting ... 172
Unable to Run SQL Code ... 172
Unable to Detect or Discover My Schema ... 172
Reference Data is Out of Date ... 173
Application Not Writing to Destination ... 173
Important Application Health Parameters to Monitor ... 173
Invalid Code Errors When Running an Application ... 174
Application is Writing Errors to the Error Stream ... 174
Insufficient Throughput or High MillisBehindLatest ... 174
Authentication and Access Control ... 176
Authentication ... 176
Access Control ... 177
Overview of Managing Access ... 177
Amazon Kinesis Data Analytics Resources and Operations ... 178
Understanding Resource Ownership ... 178
Managing Access to Resources ... 178
Specifying Policy Elements: Actions, Effects, and Principals ... 180
Specifying Conditions in a Policy ... 180
Using Identity-Based Policies (IAM Policies) ... 181
Permissions Required to Use the Amazon Kinesis Data Analytics Console ... 181
Amazon-Managed (Predefined) Policies for Amazon Kinesis Data Analytics ... 182
Customer Managed Policy Examples ... 183
Amazon Kinesis Data Analytics API Permissions Reference ... 186
GetApplicationState ... 187
SQL Reference ... 188
API Reference ... 189
Actions ... 189
AddApplicationCloudWatchLoggingOption ... 190
AddApplicationInput ... 192
AddApplicationInputProcessingConfiguration ... 195
AddApplicationOutput ... 198
AddApplicationReferenceDataSource ... 201
CreateApplication ... 204
DeleteApplication ... 210
DeleteApplicationCloudWatchLoggingOption ... 212
DeleteApplicationInputProcessingConfiguration ... 214
DeleteApplicationOutput ... 216
DeleteApplicationReferenceDataSource ... 219
DescribeApplication ... 221
DiscoverInputSchema ... 225
ListApplications ... 229
ListTagsForResource ... 231
StartApplication ... 233
StopApplication ... 235
TagResource ... 237
UntagResource ... 239
UpdateApplication ... 241
Data Types ... 244
ApplicationDetail ... 246
ApplicationSummary ... 249
ApplicationUpdate ... 250
CloudWatchLoggingOption ... 251
CloudWatchLoggingOptionDescription ... 252
CloudWatchLoggingOptionUpdate ... 253
CSVMappingParameters ... 254
DestinationSchema ... 255
Input ... 256
InputConfiguration ... 258
InputDescription ... 259
InputLambdaProcessor ... 261
InputLambdaProcessorDescription ... 262
InputLambdaProcessorUpdate ... 263
InputParallelism ... 264
InputParallelismUpdate ... 265
InputProcessingConfiguration ... 266
InputProcessingConfigurationDescription ... 267
InputProcessingConfigurationUpdate ... 268
InputSchemaUpdate ... 269
InputStartingPositionConfiguration ... 270
InputUpdate ... 271
JSONMappingParameters ... 273
KinesisFirehoseInput ... 274
KinesisFirehoseInputDescription ... 275
KinesisFirehoseInputUpdate ... 276
KinesisFirehoseOutput ... 277
KinesisFirehoseOutputDescription ... 278
KinesisFirehoseOutputUpdate ... 279
KinesisStreamsInput ... 280
KinesisStreamsInputDescription ... 281
KinesisStreamsInputUpdate ... 282
KinesisStreamsOutput ... 283
KinesisStreamsOutputDescription ... 284
KinesisStreamsOutputUpdate ... 285
LambdaOutput ... 286
LambdaOutputDescription ... 287
LambdaOutputUpdate ... 288
MappingParameters ... 289
Output ... 290
OutputDescription ... 292
OutputUpdate ... 294
RecordColumn ... 296
RecordFormat ... 297
ReferenceDataSource ... 298
ReferenceDataSourceDescription ... 299
ReferenceDataSourceUpdate ... 300
S3Configuration ... 302
S3ReferenceDataSource ... 303
S3ReferenceDataSourceDescription ... 304
S3ReferenceDataSourceUpdate ... 305
SourceSchema ... 306
Tag ... 307
Document History ... 308
AWS glossary ... 311
When Should I Use Amazon Kinesis Data Analytics?
What Is Amazon Kinesis Data Analytics for SQL Applications?
With Amazon Kinesis Data Analytics for SQL Applications, you can process and analyze streaming data using standard SQL. The service enables you to quickly author and run powerful SQL code against streaming sources to perform time series analytics, feed real-time dashboards, and create real-time metrics.
To get started with Kinesis Data Analytics, you create a Kinesis data analytics application that continuously reads and processes streaming data. The service supports ingesting data from Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose streaming sources. Then, you author your SQL code using the interactive editor and test it with live streaming data. You can also configure destinations where you want Kinesis Data Analytics to send the results.
Kinesis Data Analytics supports Amazon Kinesis Data Firehose (Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk), AWS Lambda, and Amazon Kinesis Data Streams as destinations.
When Should I Use Amazon Kinesis Data Analytics?
Amazon Kinesis Data Analytics enables you to quickly author SQL code that continuously reads,
processes, and stores data in near real time. Using standard SQL queries on the streaming data, you can construct applications that transform and provide insights into your data. Following are some of example scenarios for using Kinesis Data Analytics:
• Generate time-series analytics – You can calculate metrics over time windows, and then stream values to Amazon S3 or Amazon Redshift through a Kinesis data delivery stream.
• Feed real-time dashboards – You can send aggregated and processed streaming data results downstream to feed real-time dashboards.
• Create real-time metrics – You can create custom metrics and triggers for use in real-time monitoring, notifications, and alarms.
For information about the SQL language elements that are supported by Kinesis Data Analytics, see Amazon Kinesis Data Analytics SQL Reference.
Are You a First-Time User of Amazon Kinesis Data Analytics?
If you are a first-time user of Amazon Kinesis Data Analytics, we recommend that you read the following sections in order:
1.Read the How It Works section of this guide. This section introduces various Kinesis Data Analytics components that you work with to create an end-to-end experience. For more information, see Amazon Kinesis Data Analytics for SQL Applications: How It Works (p. 3).
Are You a First-Time User of Amazon Kinesis Data Analytics?
2.Try the Getting Started exercises. For more information, see Getting Started with Amazon Kinesis Data Analytics for SQL Applications (p. 45).
3.Explore the streaming SQL concepts. For more information, see Streaming SQL Concepts (p. 68).
4.Try additional examples. For more information, see Example Applications (p. 84).
Amazon Kinesis Data Analytics for SQL Applications: How It Works
An application is the primary resource in Amazon Kinesis Data Analytics that you can create in your account. You can create and manage applications using the AWS Management Console or the Kinesis Data Analytics API. Kinesis Data Analytics provides API operations to manage applications. For a list of API operations, see Actions (p. 189).
Kinesis Data Analytics applications continuously read and process streaming data in real time. You write application code using SQL to process the incoming streaming data and produce output. Then, Kinesis Data Analytics writes the output to a configured destination. The following diagram illustrates a typical application architecture.
Each application has a name, description, version ID, and status. Amazon Kinesis Data Analytics assigns a version ID when you first create an application. This version ID is updated when you update any application configuration. For example, if you add an input configuration, add or delete a reference data source, add or delete an output configuration, or update application code, Kinesis Data Analytics updates the current application version ID. Kinesis Data Analytics also maintains timestamps for when an application was created and last updated.
In addition to these basic properties, each application consists of the following:
• Input – The streaming source for your application. You can select either a Kinesis data stream or a Kinesis Data Firehose data delivery stream as the streaming source. In the input configuration, you map the streaming source to an in-application input stream. The in-application stream is like a continuously updating table upon which you can perform the SELECT and INSERT SQL operations.
In your application code, you can create additional in-application streams to store intermediate query results.
You can optionally partition a single streaming source in multiple in-application input streams to improve the throughput. For more information, see Limits (p. 166) and Configuring Application Input (p. 5).
Amazon Kinesis Data Analytics provides a timestamp column in each application stream called Timestamps and the ROWTIME Column (p. 69). You can use this column in time-based windowed queries. For more information, see Windowed Queries (p. 72).
You can optionally configure a reference data source to enrich your input data stream within the application. It results in an in-application reference table. You must store your reference data as an object in your S3 bucket. When the application starts, Amazon Kinesis Data Analytics reads the Amazon S3 object and creates an in-application table. For more information, see Configuring Application Input (p. 5).
• Application code – A series of SQL statements that process input and produce output. You can write SQL statements against in-application streams and reference tables. You can also write JOIN queries to combine data from both of these sources.
For information about the SQL language elements that are supported by Kinesis Data Analytics, see Amazon Kinesis Data Analytics SQL Reference.
In its simplest form, application code can be a single SQL statement that selects from a streaming input and inserts results into a streaming output. It can also be a series of SQL statements where output of one feeds into the input of the next SQL statement. Further, you can write application code to split an input stream into multiple streams. You can then apply additional queries to process these streams. For more information, see Application Code (p. 31).
• Output – In application code, query results go to in-application streams. In your application code, you can create one or more in-application streams to hold intermediate results. You can then optionally configure the application output to persist data in the in-application streams that hold your application output (also referred to as in-application output streams) to external destinations.
External destinations can be a Kinesis Data Firehose delivery stream or a Kinesis data stream. Note the following about these destinations:
• You can configure a Kinesis Data Firehose delivery stream to write results to Amazon S3, Amazon Redshift, or Amazon OpenSearch Service (OpenSearch Service).
• You can also write application output to a custom destination instead of Amazon S3 or Amazon Redshift. To do that, you specify a Kinesis data stream as the destination in your output configuration. Then, you configure AWS Lambda to poll the stream and invoke your Lambda function. Your Lambda function code receives stream data as input. In your Lambda function code, you can write the incoming data to your custom destination. For more information, see Using AWS Lambda with Amazon Kinesis Data Analytics.
For more information, see Configuring Application Output (p. 33).
Input In addition, note the following:
• Amazon Kinesis Data Analytics needs permissions to read records from a streaming source and write application output to the external destinations. You use IAM roles to grant these permissions.
• Kinesis Data Analytics automatically provides an in-application error stream for each application. If your application has issues while processing certain records (for example, because of a type mismatch or late arrival), that record is written to the error stream. You can configure application output to direct Kinesis Data Analytics to persist the error stream data to an external destination for further evaluation.
For more information, see Error Handling (p. 41).
• Amazon Kinesis Data Analytics ensures that your application output records are written to the
configured destination. It uses an "at least once" processing and delivery model, even if you experience an application interruption. For more information, see Delivery Model for Persisting Application Output to an External Destination (p. 40).
Topics
• Configuring Application Input (p. 5)
• Application Code (p. 31)
• Configuring Application Output (p. 33)
• Error Handling (p. 41)
• Automatically Scaling Applications to Increase Throughput (p. 42)
• Using Tagging (p. 42)
Configuring Application Input
Your Amazon Kinesis Data Analytics application can receive input from a single streaming source and, optionally, use one reference data source. For more information, see Amazon Kinesis Data Analytics for SQL Applications: How It Works (p. 3). The sections in this topic describe the application input sources.
Topics
• Configuring a Streaming Source (p. 5)
• Configuring a Reference Source (p. 7)
• Working with JSONPath (p. 9)
• Mapping Streaming Source Elements to SQL Input Columns (p. 13)
• Using the Schema Discovery Feature on Streaming Data (p. 17)
• Using the Schema Discovery Feature on Static Data (p. 18)
• Preprocessing Data Using a Lambda Function (p. 21)
• Parallelizing Input Streams for Increased Throughput (p. 28)
Configuring a Streaming Source
At the time that you create an application, you specify a streaming source. You can also modify an input after you create the application. Amazon Kinesis Data Analytics supports the following streaming sources for your application:
Configuring a Streaming Source
• A Kinesis data stream
• A Kinesis Data Firehose delivery stream
Note
If the Kinesis data stream is encrypted, Kinesis Data Analytics accesses the data in the encrypted stream seamlessly with no further configuration needed. Kinesis Data Analytics does not store unencrypted data read from Kinesis Data Streams. For more information, see What Is Server- Side Encryption For Kinesis Data Streams?.
Kinesis Data Analytics continuously polls the streaming source for new data and ingests it in in- application streams according to the input configuration.
Note
Adding a Kinesis Stream as your application's input does not affect the data in the stream.
If another resource such as a Kinesis Data Firehose delivery stream also accessed the same Kinesis stream, both the Kinesis Data Firehose delivery stream and the Kinesis Data Analytics application would receive the same data. Throughput and throttling might be affected, however.
Your application code can query the in-application stream. As part of input configuration you provide the following:
• Streaming source – You provide the Amazon Resource Name (ARN) of the stream and an IAM role that Kinesis Data Analytics can assume to access the stream on your behalf.
• In-application stream name prefix – When you start the application, Kinesis Data Analytics creates the specified in-application stream. In your application code, you access the in-application stream using this name.
You can optionally map a streaming source to multiple in-application streams. For more information, see Limits (p. 166). In this case, Amazon Kinesis Data Analytics creates the specified number of in- application streams with names as follows: prefix_001, prefix_002, and prefix_003. By default, Kinesis Data Analytics maps the streaming source to one in-application stream named prefix_001.
There is a limit on the rate that you can insert rows in an in-application stream. Therefore, Kinesis Data Analytics supports multiple such in-application streams so that you can bring records into your application at a much faster rate. If you find that your application is not keeping up with the data in the streaming source, you can add units of parallelism to improve performance.
• Mapping schema – You describe the record format (JSON, CSV) on the streaming source. You also describe how each record on the stream maps to columns in the in-application stream that is created.
This is where you provide column names and data types.
NoteKinesis Data Analytics adds quotation marks around the identifiers (stream name and column names) when creating the input in-application stream. When querying this stream and the columns, you must specify them in quotation marks using the same casing (matching lowercase and uppercase letters exactly). For more information about identifiers, see Identifiers in the Amazon Kinesis Data Analytics SQL Reference.
You can create an application and configure inputs in the Amazon Kinesis Data Analytics console. The console then makes the necessary API calls. You can configure application input when you create a new application API or add input configuration to an existing application. For more information, see CreateApplication (p. 204) and AddApplicationInput (p. 192). The following is the input configuration part of the Createapplication API request body:
"Inputs": [ {
"InputSchema": {
Configuring a Reference Source "RecordColumns": [
{
"Mapping": "string", "Name": "string", "SqlType": "string"
} ],
"RecordEncoding": "string", "RecordFormat": {
"MappingParameters": { "CSVMappingParameters": {
"RecordColumnDelimiter": "string", "RecordRowDelimiter": "string"
},
"JSONMappingParameters": { "RecordRowPath": "string"
} },
"RecordFormatType": "string"
} },
"KinesisFirehoseInput": { "ResourceARN": "string", "RoleARN": "string"
},
"KinesisStreamsInput": { "ResourceARN": "string", "RoleARN": "string"
},
"Name": "string"
} ]
Configuring a Reference Source
You can also optionally add a reference data source to an existing application to enrich the data coming in from streaming sources. You must store reference data as an object in your Amazon S3 bucket. When the application starts, Amazon Kinesis Data Analytics reads the Amazon S3 object and creates an in- application reference table. Your application code can then join it with an in-application stream.
You store reference data in the Amazon S3 object using supported formats (CSV, JSON). For example, suppose that your application performs analytics on stock orders. Assume the following record format on the streaming source:
Ticker, SalePrice, OrderId AMZN $700 1003 XYZ $250 1004 ...
In this case, you might then consider maintaining a reference data source to provide details for each stock ticker, such as company name.
Ticker, Company AMZN, Amazon XYZ, SomeCompany ...
You can add an application reference data source either with the API or with the console. Amazon Kinesis Data Analytics provides the following API actions to manage reference data sources:
Configuring a Reference Source
• AddApplicationReferenceDataSource (p. 201)
• UpdateApplication (p. 241)
For information about adding reference data using the console, see Example: Adding Reference Data to a Kinesis Data Analytics Application (p. 120).
Note the following:
• If the application is running, Kinesis Data Analytics creates an in-application reference table, and then loads the reference data immediately.
• If the application is not running (for example, it's in the ready state), Kinesis Data Analytics saves only the updated input configuration. When the application starts running, Kinesis Data Analytics loads the reference data in your application as a table.
Suppose that you want to refresh the data after Kinesis Data Analytics creates the in-application reference table. Perhaps you updated the Amazon S3 object, or you want to use a different Amazon S3 object. In this case, you can either explicitly call UpdateApplication (p. 241), or choose Actions, Synchronize reference data table in the console. Kinesis Data Analytics does not refresh the in- application reference table automatically.
There is a limit on the size of the Amazon S3 object that you can create as a reference data source. For more information, see Limits (p. 166). If the object size exceeds the limit, Kinesis Data Analytics can't load the data. The application state appears as running, but the data is not being read.
When you add a reference data source, you provide the following information:
• S3 bucket and object key name – In addition to the bucket name and object key, you also provide an IAM role that Kinesis Data Analytics can assume to read the object on your behalf.
• In-application reference table name – Kinesis Data Analytics creates this in-application table and populates it by reading the Amazon S3 object. This is the table name you specify in your application code.
• Mapping schema – You describe the record format (JSON, CSV), encoding of data stored in the Amazon S3 object. You also describe how each data element maps to columns in the in-application reference table.
The following shows the request body in the AddApplicationReferenceDataSource API request.
{ "applicationName": "string",
"CurrentapplicationVersionId": number, "ReferenceDataSource": {
"ReferenceSchema": { "RecordColumns": [ {
"IsDropped": boolean, "Mapping": "string", "Name": "string", "SqlType": "string"
} ],
"RecordEncoding": "string", "RecordFormat": {
"MappingParameters": { "CSVMappingParameters": {
"RecordColumnDelimiter": "string", "RecordRowDelimiter": "string"
},
Working with JSONPath "JSONMappingParameters": { "RecordRowPath": "string"
} },
"RecordFormatType": "string"
} },
"S3ReferenceDataSource": { "BucketARN": "string", "FileKey": "string",
"ReferenceRoleARN": "string"
},
"TableName": "string"
} }
Working with JSONPath
JSONPath is a standardized way to query elements of a JSON object. JSONPath uses path expressions to navigate elements, nested elements, and arrays in a JSON document. For more information about JSON, see Introducing JSON.
Amazon Kinesis Data Analytics uses JSONPath expressions in the application's source schema to identify data elements in a streaming source that contains JSON-format data.
For more information about how to map streaming data to your application's input stream, see the section called “Mapping Streaming Source Elements to SQL Input Columns” (p. 13).
Accessing JSON Elements with JSONPath
Following, you can find how to use JSONPath expressions to access various elements in JSON-formatted data. For the examples in this section, assume that the source stream contains the following JSON record:
{ "customerName":"John Doe", "address":
{ "streetAddress":
[
"number":"123", "street":"AnyStreet"
],
"city":"Anytown"
} "orders":
[ { "orderId":"23284", "itemName":"Widget", "itemPrice":"33.99" }, { "orderId":"63122", "itemName":"Gadget", "itemPrice":"22.50" }, { "orderId":"77284", "itemName":"Sprocket", "itemPrice":"12.00" } ]
}
Accessing JSON Elements
To query an element in JSON data using JSONPath, use the following syntax. Here, $ represents the root of the data hierarchy and elementName is the name of the element node to query.
$.elementName
Working with JSONPath
The following expression queries the customerName element in the preceding JSON example.
$.customerName
The preceding expression returns the following from the preceding JSON record.
John Doe
NotePath expressions are case sensitive. The expression $.customername returns null from the preceding JSON example.
NoteIf no element appears at the location where the path expression specifies, the expression returns null. The following expression returns null from the preceding JSON example, because there is no matching element.
$.customerId
Accessing Nested JSON Elements
To query a nested JSON element, use the following syntax.
$.parentElement.element
The following expression queries the city element in the preceding JSON example.
$.address.city
The preceding expression returns the following from the preceding JSON record.
Anytown
You can query further levels of subelements using the following syntax.
$.parentElement.element.subElement
The following expression queries the street element in the preceding JSON example.
$.address.streetAddress.street
The preceding expression returns the following from the preceding JSON record.
AnyStreet
Accessing Arrays
You can access the data in a JSON array in the following ways:
Working with JSONPath
• Retrieve all the elements in the array as a single row.
• Retrieve each element in the array as a separate row.
Retrieve All Elements in an Array in a Single Row
To query the entire contents of an array as a single row, use the following syntax.
$.arrayObject[0:]
The following expression queries the entire contents of the orders element in the preceding JSON example used in this section. It returns the array contents in a single column in a single row.
$.orders[0:]
The preceding expression returns the following from the example JSON record used in this section.
[{"orderId":"23284","itemName":"Widget","itemPrice":"33.99"}, {"orderId":"61322","itemName":"Gadget","itemPrice":"22.50"}, {"orderId":"77284","itemName":"Sprocket","itemPrice":"12.00"}]
Retrieve All Elements in an Array in Separate Rows
To query the individual elements in an array as separate rows, use the following syntax.
$.arrayObject[0:].element
The following expression queries the orderId elements in the preceding JSON example, and returns each array element as a separate row.
$.orders[0:].orderId
The preceding expression returns the following from the preceding JSON record, with each data item returned as a separate row.
23284 63122 77284
NoteIf expressions that query nonarray elements are included in a schema that queries individual array elements, the nonarray elements are repeated for each element in the array. For example, suppose that a schema for the preceding JSON example includes the following expressions:
• $.customerName
• $.orders[0:].orderId
In this case, the returned data rows from the sample input stream element resemble the following, with the name element repeated for every orderId element.
Working with JSONPath
John Doe 23284
John Doe 63122
John Doe 77284
NoteThe following limitations apply to array expressions in Amazon Kinesis Data Analytics:
• Only one level of dereferencing is supported in an array expression. The following expression format is not supported.
$.arrayObject[0:].element[0:].subElement
• Only one array can be flattened in a schema. Multiple arrays can be referenced—returned as one row containing all of the elements in the array. However, only one array can have each of its elements returned as individual rows.
A schema containing elements in the following format is valid. This format returns the contents of the second array as a single column, repeated for every element in the first array.
$.arrayObjectOne[0:].element
$.arrayObjectTwo[0:]
A schema containing elements in the following format is not valid.
$.arrayObjectOne[0:].element
$.arrayObjectTwo[0:].element
Other Considerations
Additional considerations for working with JSONPath are as follows:
• If no arrays are accessed by an individual element in the JSONPath expressions in the application schema, then a single row is created in the application's input stream for each JSON record processed.
• When an array is flattened (that is, its elements are returned as individual rows), any missing elements result in a null value being created in the in-application stream.
• An array is always flattened to at least one row. If no values would be returned (that is, the array is empty or none of its elements are queried), a single row with all null values is returned.
The following expression returns records with null values from the preceding JSON example, because there is no matching element at the specified path.
$.orders[0:].itemId
The preceding expression returns the following from the preceding JSON example record.
null null null
Mapping Streaming Source Elements to SQL Input Columns
Related Topics
• Introducing JSON
Mapping Streaming Source Elements to SQL Input Columns
With Amazon Kinesis Data Analytics, you can process and analyze streaming data in either JSON or CSV formats using standard SQL.
• To process and analyze streaming CSV data, you assign column names and data types for the columns of the input stream. Your application imports one column from the input stream per column definition, in order.
You don't have to include all of the columns in the application input stream, but you cannot skip columns from the source stream. For example, you can import the first three columns from an input stream containing five elements, but you cannot import only columns 1, 2, and 4.
• To process and analyze streaming JSON data, you use JSONPath expressions to map JSON elements from a streaming source to SQL columns in an input stream. For more information about using JSONPath with Amazon Kinesis Data Analytics, see Working with JSONPath (p. 9). The columns in the SQL table have data types that are mapped from JSON types. For supported data types, see Data Types. For details about converting JSON data to SQL data, see Mapping JSON Data Types to SQL Data Types (p. 15).
For more information about how to configure input streams, see Configuring Application Input (p. 5).
Mapping JSON Data to SQL Columns
You can map JSON elements to input columns using the AWS Management Console or the Kinesis Data Analytics API.
• To map elements to columns using the console, see Working with the Schema Editor (p. 58).
• To map elements to columns using the Kinesis Data Analytics API, see the following section.
To map JSON elements to columns in the in-application input stream, you need a schema with the following information for each column:
• Source Expression: The JSONPath expression that identifies the location of the data for the column.
• Column Name: The name that your SQL queries use to reference the data.
• Data Type: The SQL data type for the column.
Using the API
To map elements from a streaming source to input columns, you can use the Kinesis Data Analytics API CreateApplication (p. 204) action. To create the in-application stream, specify a schema to transform your data into a schematized version used in SQL. The CreateApplication (p. 204) action configures your application to receive input from a single streaming source. To map JSON elements or CSV columns to SQL columns, you create a RecordColumn (p. 296) object in the SourceSchema (p. 306) RecordColumns array. The RecordColumn (p. 296) object has the following schema:
{
Mapping Streaming Source Elements to SQL Input Columns "Mapping": "String",
"Name": "String", "SqlType": "String"
}
The fields in the RecordColumn (p. 296) object have the following values:
• Mapping: The JSONPath expression that identifies the location of the data in the input stream record.
This value is not present for an input schema for a source stream in CSV format.
• Name: The column name in the in-application SQL data stream.
• SqlType: The data type of the data in the in-application SQL data stream.
JSON Input Schema Example
The following example demonstrates the format of the InputSchema value for a JSON schema.
"InputSchema": { "RecordColumns": [ {
"SqlType": "VARCHAR(4)", "Name": "TICKER_SYMBOL", "Mapping": "$.TICKER_SYMBOL"
}, {
"SqlType": "VARCHAR(16)", "Name": "SECTOR",
"Mapping": "$.SECTOR"
}, {
"SqlType": "TINYINT", "Name": "CHANGE", "Mapping": "$.CHANGE"
}, {
"SqlType": "DECIMAL(5,2)", "Name": "PRICE",
"Mapping": "$.PRICE"
} ],
"RecordFormat": {
"MappingParameters": {
"JSONMappingParameters": { "RecordRowPath": "$"
} },
"RecordFormatType": "JSON"
},
"RecordEncoding": "UTF-8"
}
CSV Input Schema Example
The following example demonstrates the format of the InputSchema value for a schema in comma- separated value (CSV) format.
"InputSchema": { "RecordColumns": [ {
Mapping Streaming Source Elements to SQL Input Columns "SqlType": "VARCHAR(16)",
"Name": "LastName"
}, {
"SqlType": "VARCHAR(16)", "Name": "FirstName"
}, {
"SqlType": "INTEGER", "Name": "CustomerId"
} ],
"RecordFormat": {
"MappingParameters": { "CSVMappingParameters": {
"RecordColumnDelimiter": ",", "RecordRowDelimiter": "\n"
} },
"RecordFormatType": "CSV"
},
"RecordEncoding": "UTF-8"
}
Mapping JSON Data Types to SQL Data Types
JSON data types are converted to corresponding SQL data types according to the application's input schema. For information about supported SQL data types, see Data Types. Amazon Kinesis Data Analytics converts JSON data types to SQL data types according to the following rules.
Null Literal
A null literal in the JSON input stream (that is, "City":null) converts to a SQL null regardless of destination data type.
Boolean Literal
A Boolean literal in the JSON input stream (that is, "Contacted":true) converts to SQL data as follows:
• Numeric (DECIMAL, INT, and so on): true converts to 1; false converts to 0.
• Binary (BINARY or VARBINARY):
• true: Result has lowest bit set and remaining bits cleared.
• false: Result has all bits cleared.
Conversion to VARBINARY results in a value 1 byte in length.
• BOOLEAN: Converts to the corresponding SQL BOOLEAN value.
• Character (CHAR or VARCHAR): Converts to the corresponding string value (true or false). The value is truncated to fit the length of the field.
• Datetime (DATE, TIME, or TIMESTAMP): Conversion fails and a coercion error is written to the error stream.
Number
A number literal in the JSON input stream (that is, "CustomerId":67321) converts to SQL data as follows:
Mapping Streaming Source Elements to SQL Input Columns
• Numeric (DECIMAL, INT, and so on): Converts directly. If the converted value exceeds the size or precision of the target data type (that is, converting 123.4 to INT), conversion fails and a coercion error is written to the error stream.
• Binary (BINARY or VARBINARY): Conversion fails and a coercion error is written to the error stream.
• BOOLEAN:
• 0: Converts to false.
• All other numbers: Converts to true.
• Character (CHAR or VARCHAR): Converts to a string representation of the number.
• Datetime (DATE, TIME, or TIMESTAMP): Conversion fails and a coercion error is written to the error stream.
String
A string value in the JSON input stream (that is, "CustomerName":"John Doe") converts to SQL data as follows:
• Numeric (DECIMAL, INT, and so on): Amazon Kinesis Data Analytics attempts to convert the value to the target data type. If the value cannot be converted, conversion fails and a coercion error is written to the error stream.
• Binary (BINARY or VARBINARY): If the source string is a valid binary literal (that is, X'3F67A23A', with an even number of f), the value is converted to the target data type. Otherwise, conversion fails and a coercion error is written to the error stream.
• BOOLEAN: If the source string is "true", converts to true. This comparison is case-insensitive.
Otherwise, converts to false.
• Character (CHAR or VARCHAR): Converts to the string value in the input. If the value is longer than the target data type, it is truncated and no error is written to the error stream.
• Datetime (DATE, TIME, or TIMESTAMP): If the source string is in a format that can be converted to the target value, the value is converted. Otherwise, conversion fails and a coercion error is written to the error stream.
Valid datetime formats include:
• "1992-02-14"
• "1992-02-14 18:35:44.0"
Array or Object
An array or object in the JSON input stream converts to SQL data as follows:
• Character (CHAR or VARCHAR): Converts to the source text of the array or object. See Accessing Arrays (p. 10).
• All other data types: Conversion fails and a coercion error is written to the error stream.
For an example of a JSON array, see Working with JSONPath (p. 9).
Related Topics
• Configuring Application Input (p. 5)
• Data Types
• Working with the Schema Editor (p. 58)
• CreateApplication (p. 204)
Using the Schema Discovery Feature on Streaming Data
• RecordColumn (p. 296)
• SourceSchema (p. 306)
Using the Schema Discovery Feature on Streaming Data
Providing an input schema that describes how records on the streaming input map to an in-application stream can be cumbersome and error prone. You can use the DiscoverInputSchema (p. 225) API (called the discovery API) to infer a schema. Using random samples of records on the streaming source, the API can infer a schema (that is, column names, data types, and position of the data element in the incoming data).
Note
To use the Discovery API to generate a schema from a file stored in Amazon S3, see Using the Schema Discovery Feature on Static Data (p. 18).
The console uses the Discovery API to generate a schema for a specified streaming source. Using the console, you can also update the schema, including adding or removing columns, changing column names or data types, and so on. However, make changes carefully to ensure that you do not create an invalid schema.
After you finalize a schema for your in-application stream, there are functions you can use to manipulate string and datetime values. You can use these functions in your application code when working with rows in the resulting in-application stream. For more information, see Example: Transforming DateTime Values (p. 98).
Column Naming During Schema Discovery
During schema discovery, Amazon Kinesis Data Analytics tries to retain as much of the original column name as possible from the streaming input source, except in the following cases:
• The source stream column name is a reserved SQL keyword, such as TIMESTAMP, USER, VALUES, or YEAR.
• The source stream column name contains unsupported characters. Only letters, numbers, and the underscore character ( _ ) are supported.
• The source stream column name begins with a number.
• The source stream column name is longer than 100 characters.
If a column is renamed, the renamed schema column name begins with COL_. In some cases, none of the original column name can be retained—for example, if the entire name is unsupported characters.
In such a case, the column is named COL_#, with # being a number indicating the column's place in the column order.
After discovery completes, you can update the schema using the console to add or remove columns, or change column names, data types, or data size.
Examples of Discovery-Suggested Column Names
Source Stream Column Name Discovery-Suggested Column Name
USER COL_USER
Using the Schema Discovery Feature on Static Data
Source Stream Column Name Discovery-Suggested Column Name
USER@DOMAIN COL_USERDOMAIN
@@ COL_0
Schema Discovery Issues
What happens if Kinesis Data Analytics does not infer a schema for a given streaming source?
Kinesis Data Analytics infers your schema for common formats, such as CSV and JSON, which are UTF-8 encoded. Kinesis Data Analytics supports any UTF-8 encoded records (including raw text like application logs and records) with a custom column and row delimiter. If Kinesis Data Analytics doesn't infer a schema, you can define a schema manually using the schema editor on the console (or using the API).
If your data does not follow a pattern (which you can specify using the schema editor), you can define a schema as a single column of type VARCHAR(N), where N is the largest number of characters you expect your record to include. From there, you can use string and date-time manipulation to structure your data after it is in an in-application stream. For examples, see Example: Transforming DateTime Values (p. 98).
Using the Schema Discovery Feature on Static Data
The schema discovery feature can generate a schema from either the data in a stream or data in a static file that is stored in an Amazon S3 bucket. Suppose that you want to generate a schema for a Kinesis Data Analytics application for reference purposes or when live streaming data isn't available. You can use the schema discovery feature on a static file that contains a sample of the data in the expected format of your streaming or reference data. Kinesis Data Analytics can run schema discovery on sample data from a JSON or CSV file that's stored in an Amazon S3 bucket. Using schema discovery on a data file uses either the console, or the DiscoverInputSchema (p. 225) API with the S3Configuration parameter specified.
Running Schema Discovery Using the Console
To run discovery on a static file using the console, do the following:
1. Add a reference data object to an S3 bucket.
2. Choose Connect reference data in the application's main page in the Kinesis Data Analytics console.
3. Provide the bucket, path and IAM role data for accessing the Amazon S3 object containing the reference data.
4. Choose Discover schema.
For more information on how to add reference data and discover schema in the console, see Example:
Adding Reference Data to a Kinesis Data Analytics Application (p. 120).
Running Schema Discovery Using the API
To run discovery on a static file using the API, you provide the API with an S3Configuration structure with the following information:
• BucketARN: The Amazon Resource Name (ARN) of the Amazon S3 bucket that contains the file. For the format of an Amazon S3 bucket ARN, see Amazon Resource Names (ARNs) and Amazon Service Namespaces: Amazon Simple Storage Service (Amazon S3).
Using the Schema Discovery Feature on Static Data
• RoleARN: The ARN of an IAM role with the AmazonS3ReadOnlyAccess policy. For information about how to add a policy to a role, see Modifying a Role.
• FileKey: The file name of the object.
To generate a schema from an Amazon S3 object using the DiscoverInputSchema API 1. Make sure that you have the AWS CLI set up. For more information, see Step 2: Set Up the AWS
Command Line Interface (AWS CLI) (p. 46) in the Getting Started section.
2. Create a file named data.csv with the following contents:
year,month,state,producer_type,energy_source,units,consumption 2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615
2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535 2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890 2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601
2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681 3. Sign in to the Amazon S3 console at https://console.aws.amazon.com/s3/.
4. Create an Amazon S3 bucket and upload the data.csv file you created. Note the ARN of the created bucket. For information about creating an Amazon S3 bucket and uploading a file, see Getting Started with Amazon Simple Storage Service.
5. Open the IAM console at https://console.aws.amazon.com/iam/. Create a role with the
AmazonS3ReadOnlyAccess policy. Note the ARN of the new role. For information about creating a role, see Creating a Role to Delegate Permissions to an Amazon Service. For information about how to add a policy to a role, see Modifying a Role.
6. Run the following DiscoverInputSchema command in the AWS CLI, substituting the ARNs for your Amazon S3 bucket and IAM role:
$aws kinesisanalytics discover-input-schema --s3-configuration '{ "RoleARN":
"arn:aws:iam::123456789012:role/service-role/your-IAM-role", "BucketARN":
"arn:aws:s3:::your-bucket-name", "FileKey": "data.csv" }' 7. The response looks similar to the following:
{ "InputSchema": {
"RecordEncoding": "UTF-8", "RecordColumns": [
{
"SqlType": "INTEGER", "Name": "COL_year"
}, {
"SqlType": "INTEGER", "Name": "COL_month"
}, {
"SqlType": "VARCHAR(4)", "Name": "state"
}, {
"SqlType": "VARCHAR(64)", "Name": "producer_type"
}, {
"SqlType": "VARCHAR(4)", "Name": "energy_source"
}, {
Using the Schema Discovery Feature on Static Data "SqlType": "VARCHAR(16)",
"Name": "units"
}, {
"SqlType": "INTEGER", "Name": "consumption"
} ],
"RecordFormat": {
"RecordFormatType": "CSV", "MappingParameters": {
"CSVMappingParameters": {
"RecordRowDelimiter": "\r\n", "RecordColumnDelimiter": ","
} } } },
"RawInputRecords": [
"year,month,state,producer_type,energy_source,units,consumption
\r\n2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615\r
\n2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535\r
\n2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890\r
\n2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601\r
\n2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681"
],
"ParsedInputRecords": [ [
null, null, "state",
"producer_type", "energy_source", "units",
null ], [
"2001", "1", "AK",
"TotalElectricPowerIndustry", "Coal",
"ShortTons", "47615"
], [
"2001", "1", "AK",
"ElectricGeneratorsElectricUtilities", "Coal",
"ShortTons", "16535"
], [
"2001", "1", "AK",
"CombinedHeatandPowerElectricPower", "Coal",
"ShortTons", "22890"
], [
"2001", "1",
Preprocessing Data Using a Lambda Function "AL",
"TotalElectricPowerIndustry", "Coal",
"ShortTons", "3020601"
], [
"2001", "1", "AL",
"ElectricGeneratorsElectricUtilities", "Coal",
"ShortTons", "2987681"
] ] }
Preprocessing Data Using a Lambda Function
If the data in your stream needs format conversion, transformation, enrichment, or filtering, you can preprocess the data using an AWS Lambda function. You can do this before your application SQL code executes or before your application creates a schema from your data stream.
Using a Lambda function for preprocessing records is useful in the following scenarios:
• Transforming records from other formats (such as KPL or GZIP) into formats that Kinesis Data Analytics can analyze. Kinesis Data Analytics currently supports JSON or CSV data formats.
• Expanding data into a format that is more accessible for operations such as aggregation or anomaly detection. For instance, if several data values are stored together in a string, you can expand the data into separate columns.
• Data enrichment with other Amazon services, such as extrapolation or error correction.
• Applying complex string transformation to record fields.
• Data filtering for cleaning up the data.
Using a Lambda Function for Preprocessing Records
When creating your Kinesis Data Analytics application, you enable Lambda preprocessing in the Connect to a Source page.
To use a Lambda function to preprocess records in a Kinesis Data Analytics application 1. Sign in to the AWS Management Console and open the Kinesis Data Analytics console at https://
console.aws.amazon.com/kinesisanalytics.
2. On the Connect to a Source page for your application, choose Enabled in the Record pre- processing with AWS Lambda section.
3. To use a Lambda function that you have already created, choose the function in the Lambda function drop-down list.
4. To create a new Lambda function from one of the Lambda preprocessing templates, choose the template from the drop-down list. Then choose View <template name> in Lambda to edit the function.
5. To create a new Lambda function, choose Create new. For information about creating a Lambda function, see Create a HelloWorld Lambda Function and Explore the Console in the AWS Lambda Developer Guide.
Preprocessing Data Using a Lambda Function
6. Choose the version of the Lambda function to use. To use the latest version, choose $LATEST.
When you choose or create a Lambda function for record preprocessing, the records are preprocessed before your application SQL code executes or your application generates a schema from the records.
Lambda Preprocessing Permissions
To use Lambda preprocessing, the application's IAM role requires the following permissions policy:
{
"Sid": "UseLambdaFunction", "Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:GetFunctionConfiguration"
],
"Resource": "<FunctionARN>"
}
For more information about adding permissions policies, see Authentication and Access Control for Amazon Kinesis Data Analytics for SQL Applications (p. 176).
Lambda Preprocessing Metrics
You can use Amazon CloudWatch to monitor the number of Lambda invocations, bytes processed, successes and failures, and so on. For information about CloudWatch metrics that are emitted by Kinesis Data Analytics Lambda preprocessing, see Amazon Kinesis Analytics Metrics.
Using AWS Lambda with the Kinesis Producer Library
The Kinesis Producer Library (KPL) aggregates small user-formatted records into larger records up to 1 MB to make better use of Amazon Kinesis Data Streams throughput. The Kinesis Client Library (KCL) for Java supports deaggregating these records. However, you must use a special module to deaggregate the records when you use AWS Lambda as the consumer of your streams.
To get the necessary project code and instructions, see the Kinesis Producer Library Deaggregation Modules for AWS Lambda on GitHub. You can use the components in this project to process KPL serialized data within AWS Lambda in Java, Node.js, and Python. You can also use these components as part of a multi-lang KCL application.
Data Preprocessing Event Input Data Model/Record Response Model
To preprocess records, your Lambda function must be compliant with the required event input data and record response models.
Event Input Data Model
Kinesis Data Analytics continuously reads data from your Kinesis data stream or Kinesis Data Firehose delivery stream. For each batch of records it retrieves, the service manages how each batch gets passed to your Lambda function. Your function receives a list of records as input. Within your function, you iterate through the list and apply your business logic to accomplish your preprocessing requirements (such as data format conversion or enrichment).
The input model to your preprocessing function varies slightly, depending on whether the data was received from a Kinesis data stream or a Kinesis Data Firehose delivery stream.
Preprocessing Data Using a Lambda Function
If the source is a Kinesis Data Firehose delivery stream, the event input data model is as follows:
Kinesis Data Firehose Request Data Model
Field Description
invocationId The Lambda invocation Id (random GUID).
applicationArn Kinesis Data Analytics application Amazon Resource Name (ARN)
streamArn Delivery stream ARN
records
Field Description
recordId record ID (random GUID)
kinesisFirehoseRecordMetadata
Field Description
approximateArrivalTimestampDelivery stream record approximate arrival time
data Base64-encoded source record payload
The following example shows input from a Firehose delivery stream:
{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",
"applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda- test",
"streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[
{
"recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=",
"kinesisFirehoseRecordMetadata":{
"approximateArrivalTimestamp":1520280173 }
} ]}
If the source is a Kinesis data stream, the event input data model is as follows:
Kinesis Streams Request Data Model
Field Description
invocationId The Lambda invocation Id (random GUID).
applicationArn Kinesis Data Analytics application ARN
streamArn Delivery stream ARN
records
Preprocessing Data Using a Lambda Function
Field Description
Field Description
recordId record ID based off of Kinesis record sequence number
kinesisStreamRecordMetadata
Field Description sequenceNumberSequence number
from the Kinesis stream record
partitionKeyPartition key from the Kinesis stream record shardId ShardId from the
Kinesis stream record approximateArrivalTimestampDelivery stream record
approximate arrival time
data Base64-encoded source record payload
The following example shows input from a Kinesis data stream:
{
"invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",
"applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda- test",
"streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [
{
"recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=",
"kinesisStreamRecordMetadata":{
"shardId" :"shardId-000000000003", "partitionKey":"7400791606",
"sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173
} } ]}
Record Response Model
All records returned from your Lambda preprocessing function (with record IDs) that are sent to the Lambda function must be returned. They must contain the following parameters, or Kinesis Data Analytics rejects them and treats it as a data preprocessing failure. The data payload part of the record can be transformed to accomplish preprocessing requirements.
Response Data Model
records
Preprocessing Data Using a Lambda Function
Field Description
recordId The record ID is passed from Kinesis Data Analytics to Lambda during the invocation. The transformed record must contain the same record ID. Any mismatch between the ID of the original record and the ID of the transformed record is treated as a data preprocessing failure.
result The status of the data transformation of the record. The possible values are:
• Ok: The record was transformed successfully. Kinesis Data Analytics ingests the record for SQL processing.
• Dropped: The record was dropped intentionally by your processing logic. Kinesis Data Analytics drops the record from SQL processing. The data payload field is optional for a Dropped record.
• ProcessingFailed: The record could not be transformed.
Kinesis Data Analytics considers it unsuccessfully processed by your Lambda function and writes an error to the error stream. For more information about the error stream, see Error Handling (p. 41). The data payload field is optional for a ProcessingFailed record.
data The transformed data payload, after base64-encoding. Each data payload can contain multiple JSON documents if the application ingestion data format is JSON. Or each can contain multiple CSV rows (with a row delimiter specified in each row) if the application ingestion data format is CSV. The Kinesis Data Analytics service successfully parses and processes data with either multiple JSON documents or CSV rows within the same data payload.
The following example shows output from a Lambda function:
{ "records": [ {
"recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok",
"data": "SEVMTE8gV09STEQ="
} ] }
Common Data Preprocessing Failures
The following are common reasons why preprocessing can fail.
• Not all records (with record IDs) in a batch that are sent to the Lambda function are returned back to the Kinesis Data Analytics service.
• The response is missing either the record ID, status, or data payload field. The data payload field is optional for a Dropped or ProcessingFailed record.
• The Lambda function timeouts are not sufficient to preprocess the data.
• The Lambda function response exceeds the response limits imposed by the AWS Lambda service.
Preprocessing Data Using a Lambda Function
For data preprocessing failures, Kinesis Data Analytics continues to retry Lambda invocations on the same set of records until successful. You can monitor the following CloudWatch metrics to gain insight into failures.
• Kinesis Data Analytics application MillisBehindLatest: Indicates how far behind an application is reading from the streaming source.
• Kinesis Data Analytics application InputPreprocessing CloudWatch metrics: Indicates the number of successes and failures, among other statistics. For more information, see Amazon Kinesis Analytics Metrics.
• AWS Lambda function CloudWatch metrics and logs.
Creating Lambda Functions for Preprocessing
Your Amazon Kinesis Data Analytics application can use Lambda functions for preprocessing records as they are ingested into the application. Kinesis Data Analytics provides the following templates on the console to use as a starting point for preprocessing your data.
Topics
• Creating a Preprocessing Lambda Function in Node.js (p. 26)
• Creating a Preprocessing Lambda Function in Python (p. 26)
• Creating a Preprocessing Lambda Function in Java (p. 27)
• Creating a Preprocessing Lambda Function in .NET (p. 27)
Creating a Preprocessing Lambda Function in Node.js
The following templates for creating preprocessing Lambda function in Node.js are available on the Kinesis Data Analytics console:
Lambda Blueprint Language and version Description General Kinesis
Data Analytics Input Processing
Node.js 6.10 A Kinesis Data Analytics record preprocessor that receives JSON or CSV records as input and then returns them with a processing status. Use this processor as a starting point for custom transformation logic.
Compressed Input
Processing Node.js 6.10 A Kinesis Data Analytics record processor that receives compressed (GZIP or Deflate compressed) JSON or CSV records as input and returns
decompressed records with a processing status.
Creating a Preprocessing Lambda Function in Python
The following templates for creating preprocessing Lambda function in Python are available on the console:
Lambda Blueprint Language and version Description General Kinesis
Analytics Input Processing
Python 2.7 A Kinesis Data Analytics record preprocessor that receives JSON or CSV records as input and then returns them with a processing status. Use