Step 1: Create a Table Based on the Original Dataset
The example in this topic uses an Amazon S3 readable subset of the publicly available NOAA Global Historical Climatology Network Daily (GHCN-D) dataset. The data on Amazon S3 has the following characteristics.
Location: s3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/
Total objects: 41727
Size of CSV dataset: 11.3 GB Region: us-east-1
The original data is stored in Amazon S3 with no partitions. The data is in CSV format in files like the following.
2019-10-31 13:06:57 413.1 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0000 2019-10-31 13:06:57 412.0 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0001 2019-10-31 13:06:57 34.4 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0002 2019-10-31 13:06:57 412.2 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0100 2019-10-31 13:06:57 412.7 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0101
The file sizes in this sample are relatively small. By merging them into larger files, you can reduce the total number of files, enabling better query performance. You can use CTAS and INSERT INTO statements to enhance query performance.
Step 2: Use CTAS to Partition, Convert, and Compress the Data To create a database and table based on the sample dataset
1. In the Athena console, choose the US East (N. Virginia) AWS Region. Be sure to run all queries in this tutorial in us-east-1.
2. In the Athena query editor, run the CREATE DATABASE (p. 541) command to create a database.
CREATE DATABASE blogdb
3. Run the following statement to create a table (p. 542).
CREATE EXTERNAL TABLE `blogdb`.`original_csv` ( `id` string, FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT
Step 2: Use CTAS to Partition, Convert, and Compress the Data
After you create a table, you can use a single CTAS (p. 130) statement to convert the data to Parquet format with Snappy compression and to partition the data by year.
The table you created in Step 1 has a date field with the date formatted as YYYYMMDD (for example, 20100104). Because the new table will be partitioned on year, the sample statement in the following procedure uses the Presto function substr("date",1,4) to extract the year value from the date field.
To convert the data to Parquet format with Snappy compression, partitioning by year
• Run the following CTAS statement, replacing your-bucket with your Amazon S3 bucket location.
CREATE table new_parquet
Step 3: Use INSERT INTO to Add Data
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) >= 2015 AND cast(substr("date",1,4) AS bigint) <= 2019
Note
In this example, the table that you create includes only the data from 2015 to 2019. In Step 3, you add new data to this table using the INSERT INTO command.
When the query completes, use the following procedure to verify the output in the Amazon S3 location that you specified in the CTAS statement.
To see the partitions and parquet files created by the CTAS statement
1. To show the partitions created, run the following AWS CLI command. Be sure to include the final forward slash (/).
aws s3 ls s3://your-bucket/optimized-data/
The output shows the partitions.
PRE year=2015/
PRE year=2016/
PRE year=2017/
PRE year=2018/
PRE year=2019/
2. To see the Parquet files, run the following command. Note that the | head -5 option, which restricts the output to the first five results, is not available on Windows.
aws s3 ls s3://your-bucket/optimized-data/ --recursive --human-readable | head -5
The output resembles the following.
2019-10-31 14:51:05 7.3 MiB optimized-data/
year=2015/20191031_215021_00001_3f42d_1be48df2-3154-438b-b61d-8fb23809679d 2019-10-31 14:51:05 7.0 MiB optimized-data/
year=2015/20191031_215021_00001_3f42d_2a57f4e2-ffa0-4be3-9c3f-28b16d86ed5a 2019-10-31 14:51:05 9.9 MiB optimized-data/
year=2015/20191031_215021_00001_3f42d_34381db1-00ca-4092-bd65-ab04e06dc799 2019-10-31 14:51:05 7.5 MiB optimized-data/
year=2015/20191031_215021_00001_3f42d_354a2bc1-345f-4996-9073-096cb863308d 2019-10-31 14:51:05 6.9 MiB optimized-data/
year=2015/20191031_215021_00001_3f42d_42da4cfd-6e21-40a1-8152-0b902da385a1
Step 3: Use INSERT INTO to Add Data
In Step 2, you used CTAS to create a table with partitions for the years 2015 to 2019. However, the original dataset also contains data for the years 2010 to 2014. Now you add that data using an INSERT INTO (p. 505) statement.
To add data to the table using one or more INSERT INTO statements
1. Run the following INSERT INTO command, specifying the years before 2015 in the WHERE clause.
INSERT INTO new_parquet
Step 3: Use INSERT INTO to Add Data
WHERE cast(substr("date",1,4) AS bigint) < 2015
2. Run the aws s3 ls command again, using the following syntax.
aws s3 ls s3://your-bucket/optimized-data/
The output shows the new partitions.
PRE year=2010/
3. To see the reduction in the size of the dataset obtained by using compression and columnar storage in Parquet format, run the following command.
aws s3 ls s3://your-bucket/optimized-data/ --recursive --human-readable --summarize
The following results show that the size of the dataset after parquet with Snappy compression is 1.2 GB.
...
2020-01-22 18:12:02 2.8 MiB optimized-data/
year=2019/20200122_181132_00003_nja5r_f0182e6c-38f4-4245-afa2-9f5bfa8d6d8f 2020-01-22 18:11:59 3.7 MiB optimized-data/
year=2019/20200122_181132_00003_nja5r_fd9906b7-06cf-4055-a05b-f050e139946e Total Objects: 300
Total Size: 1.2 GiB
4. If more CSV data is added to original table, you can add that data to the parquet table by using INSERT INTO statements. For example, if you had new data for the year 2020, you could run the following INSERT INTO statement. The statement adds the data and the relevant partition to the new_parquet table.
Step 4: Measure Performance and Cost Differences
FROM original_csv
WHERE cast(substr("date",1,4) AS bigint) = 2020
NoteThe INSERT INTO statement supports writing a maximum of 100 partitions to the
destination table. However, to add more than 100 partitions, you can run multiple INSERT INTO statements. For more information, see Using CTAS and INSERT INTO to Create a Table with More Than 100 Partitions (p. 143).
Step 4: Measure Performance and Cost Differences
After you transform the data, you can measure the performance gains and cost savings by running the same queries on the new and old tables and comparing the results.
NoteFor Athena per-query cost information, see Amazon Athena pricing.
To measure performance gains and cost differences
1. Run the following query on the original table. The query finds the number of distinct IDs for every value of the year.
SELECT substr("date",1,4) as year, COUNT(DISTINCT id)
FROM original_csv
GROUP BY 1 ORDER BY 1 DESC
2. Note the time that the query ran and the amount of data scanned.
3. Run the same query on the new table, noting the query runtime and amount of data scanned.
SELECT year,
COUNT(DISTINCT id) FROM new_parquet
GROUP BY 1 ORDER BY 1 DESC
4. Compare the results and calculate the performance and cost difference. The following sample results show that the test query on the new table was faster and cheaper than the query on the old table.
Table Runtime Data Scanned
Original 16.88 seconds 11.35 GB
New 3.79 seconds 428.05 MB
5. Run the following sample query on the original table. The query calculates the average maximum temperature (Celsius), average minimum temperature (Celsius), and average rainfall (mm) for the Earth in 2018.
SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM original_csv
WHERE element IN ('TMIN', 'TMAX', 'PRCP') AND substr("date",1,4) = '2018' GROUP BY 1
6. Note the time that the query ran and the amount of data scanned.
7. Run the same query on the new table, noting the query runtime and amount of data scanned.
Summary
SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM new_parquet
WHERE element IN ('TMIN', 'TMAX', 'PRCP') and year = '2018' GROUP BY 1
8. Compare the results and calculate the performance and cost difference. The following sample results show that the test query on the new table was faster and cheaper than the query on the old table.
Table Runtime Data Scanned
Original 18.65 seconds 11.35 GB
New 1.92 seconds 68 MB
Summary
This topic showed you how to perform ETL operations using CTAS and INSERT INTO statements in Athena. You performed the first set of transformations using a CTAS statement that converted data to the Parquet format with Snappy compression. The CTAS statement also converted the dataset from non-partitioned to non-partitioned. This reduced its size and lowered the costs of running the queries. When new data becomes available, you can use an INSERT INTO statement to transform and load the data into the table that you created with the CTAS statement.
Using CTAS and INSERT INTO to Create a Table with More Than 100 Partitions
You can create up to 100 partitions per query with a CREATE TABLE AS SELECT (CTAS (p. 130)) query. Similarly, you can add a maximum of 100 partitions to a destination table with an INSERT INTO statement. To work around these limitations, you can use a CTAS statement and a series of INSERT INTO statements that create or insert up to 100 partitions each.
The example in this topic uses a database called tpch100 whose data resides in the Amazon S3 bucket location s3://<my-tpch-bucket>/.
To use CTAS and INSERT INTO to create a table of more than 100 partitions
1. Use a CREATE EXTERNAL TABLE statement to create a table partitioned on the field that you want.
The following example statement partitions the data by the column l_shipdate. The table has 2525 partitions.
CREATE EXTERNAL TABLE `tpch100.lineitem_parq_partitioned`(
`l_orderkey` int, `l_partkey` int, `l_suppkey` int, `l_linenumber` int, `l_quantity` double, `l_extendedprice` double, `l_discount` double, `l_tax` double,
`l_returnflag` string, `l_linestatus` string,
Creating a Table with More Than 100 Partitions
`l_commitdate` string, `l_receiptdate` string, `l_shipinstruct` string, `l_comment` string) PARTITIONED BY ( `l_shipdate` string) ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://<my-tpch-bucket>/lineitem/'
2. Run a SHOW PARTITIONS <table_name> command like the following to list the partitions.
SHOW PARTITIONS lineitem_parq_partitioned
Following are partial sample results.
/*l_shipdate=1992-01-02
3. Run a CTAS query to create a partitioned table.
The following example creates a table called my_lineitem_parq_partitioned and uses the WHERE clause to restrict the DATE to earlier than 1992-02-01. Because the sample dataset starts with January 1992, only partitions for January 1992 are created.
CREATE table my_lineitem_parq_partitioned WITH (partitioned_by = ARRAY['l_shipdate']) AS SELECT l_orderkey,
WHERE cast(l_shipdate as timestamp) < DATE ('1992-02-01');
Creating a Table with More Than 100 Partitions
4. Run the SHOW PARTITIONS command to verify that the table contains the partitions that you want.
SHOW PARTITIONS my_lineitem_parq_partitioned;
The partitions in the example are from January 1992.
/*
5. Use an INSERT INTO statement to add partitions to the table.
The following example adds partitions for the dates from the month of February 1992.
INSERT INTO my_lineitem_parq_partitioned
WHERE cast(l_shipdate as timestamp) >= DATE ('1992-02-01') AND cast(l_shipdate as timestamp) < DATE ('1992-03-01');
Creating a Table with More Than 100 Partitions
6. Run SHOW PARTITIONS again.
SHOW PARTITIONS my_lineitem_parq_partitioned;
The sample table now has partitions from both January and February 1992.
/*l_shipdate=1992-01-02 l_shipdate=1992-01-03 l_shipdate=1992-01-04 l_shipdate=1992-01-05 l_shipdate=1992-01-06 ...
l_shipdate=1992-02-20 l_shipdate=1992-02-21 l_shipdate=1992-02-22 l_shipdate=1992-02-23 l_shipdate=1992-02-24 l_shipdate=1992-02-25 l_shipdate=1992-02-26 l_shipdate=1992-02-27 l_shipdate=1992-02-28 l_shipdate=1992-02-29
*/
7. Continue using INSERT INTO statements that read and add no more than 100 partitions each.
Continue until you reach the number of partitions that you require.
Important
When setting the WHERE condition, be sure that the queries don't overlap. Otherwise, some partitions might have duplicated data.
Athena Compression Support
Athena supports a variety of compression formats for reading and writing data, including reading from a table that uses multiple compression formats. For example, Athena can successfully read the data in a table that uses Parquet file format when some Parquet files are compressed with Snappy and other Parquet files are compressed with GZIP. The same principle applies for ORC, Textfile, and JSON storage formats.
Athena supports the following compression formats:
• BZIP2 – Format that uses the Burrows-Wheeler algorithm.
NoteIn rare cases, a known issue in Athena engine version 1 can cause records to be silently dropped when the BZIP2 format is used. For this reason, use of the BZIP2 format in Athena engine version 1 is not recommended.
• DEFLATE – Compression algorithm based on LZSS and Huffman coding. Deflate is relevant only for the Avro file format.
• GZIP – Compression algorithm based on Deflate. GZIP is the default write compression format for files in the Parquet and Textfile storage formats. Files in the tar.gz format are not supported.
• LZ4 – This member of the Lempel-Ziv 77 (LZ7) family also focuses on compression and decompression speed rather than maximum compression of data. LZ4 has the following framing formats:
• LZ4 Raw/Unframed – An unframed, standard implementation of the LZ4 block compression format.
For more information, see the LZ4 Block Format Description on GitHub.
• LZ4 Framed – The usual framing implementation of LZ4. For more information, see the LZ4 Frame Format Description on GitHub.
• LZ4 Hadoop-Compatible – The Apache Hadoop implementation of LZ4. This implementation wraps LZ4 compression with the BlockCompressorStream.java class.
• LZO – Format that uses the Lempel–Ziv–Oberhumer algorithm, which focuses on high compression and decompression speed rather than the maximum compression of data. LZO has two
implementations:
• Standard LZO – For more information, see the LZO abstract on the Oberhumer website.
• LZO Hadoop-Compatible – This implementation wraps the LZO algorithm with the BlockCompressorStream.java class.
• SNAPPY – Compression algorithm that is part of the Lempel-Ziv 77 (LZ7) family. Snappy focuses on high compression and decompression speed rather than the maximum compression of data.
Some implementations of Snappy allow for framing. Framing enables decompression of streaming or file data that cannot be entirely maintained in memory. The following framing implementations are relevant for Athena:
• Snappy Raw/Unframed – The standard implementation of the Snappy format that does not use framing. For more information, see the Snappy format description on GitHub.
• Snappy-Framed – The framing implementation of the Snappy format. For more information, see the Snappy framing format description on GitHub.
• Snappy Hadoop-Compatible – The framing implementation of Snappy that the Apache Hadoop Project uses. For more information, see BlockCompressorStream.java on GitHub.
For information about the Snappy framing methods that Athena supports for each file format, see the table later on this page.
• ZLIB – Based on Deflate, ZLIB is the default write compression format for files in the ORC data storage format. For more information, see the zlib page on GitHub.
Compression Support in Athena by File Format
• ZSTD – The Zstandard real-time data compression algorithm is a fast compression algorithm that provides high compression ratios. The Zstandard library is provided as open source software using a BSD license. Athena supports reading and writing ZStandard compressed ORC, Parquet, and textfile data. When writing ZStandard compressed data, Athena uses ZStandard compression level 3.
Compression Support in Athena by File Format
The following table summarizes the compression format support in Athena for each storage file format.
Textfile format includes TSV, CSV, JSON, and custom SerDes for text.
Avro ORC Parquet Textfile
BZIP2 Read support
only. Write not supported.
No No Yes
DEFLATE Yes No No No
GZIP No No Yes Yes
LZO No No Yes
Hadoop-compatible read
ZSTD No Yes Yes Yes
Specifying Compression Formats
When you write CREATE TABLE or CTAS statements, you can specify compression properties that specify the compression type to use when Athena writes to those tables.
• For CTAS, see CTAS Table Properties (p. 546). For examples, see Examples of CTAS Queries (p. 134).
• For CREATE TABLE, see ALTER TABLE SET TBLPROPERTIES (p. 540) for a list of compression table properties.
Notes and Resources
• Currently, uppercase file extensions such as .GZ or .BZIP2 are not recognized by Athena. Avoid using datasets with uppercase file extensions, or rename the data file extensions to lowercase.
Notes and Resources
• For data in CSV, TSV, and JSON, Athena determines the compression type from the file extension. If no file extension is present, Athena treats the data as uncompressed plain text. If your data is compressed, make sure the file name includes the compression extension, such as gz.
• The ZIP file format is not supported.
• For querying Amazon Kinesis Data Firehose logs from Athena, supported formats include GZIP compression or ORC files with SNAPPY compression.
• For more information on using compression, see section 3 ("Compress and split files") of the AWS Big Data Blog post Top 10 Performance Tuning Tips for Amazon Athena.
Using a SerDe
SerDe Reference
Athena supports several SerDe libraries for parsing data from different data formats, such as CSV, JSON, Parquet, and ORC. Athena does not support custom SerDes.
Topics
• Using a SerDe (p. 150)
• Supported SerDes and Data Formats (p. 151)
Using a SerDe
A SerDe (Serializer/Deserializer) is a way in which Athena interacts with data in various formats.
It is the SerDe you specify, and not the DDL, that defines the table schema. In other words, the SerDe can override the DDL configuration that you specify in Athena when you create your table.
To Use a SerDe in Queries
To use a SerDe when creating a table in Athena, use one of the following methods:
• Specify ROW FORMAT DELIMITED and then use DDL statements to specify field delimiters, as in the following example. When you specify ROW FORMAT DELIMITED, Athena uses the LazySimpleSerDe by default.
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\'
COLLECTION ITEMS TERMINATED BY '|' MAP KEYS TERMINATED BY ':'
For examples of ROW FORMAT DELIMITED, see the following topics:
LazySimpleSerDe for CSV, TSV, and Custom-Delimited Files (p. 167) Querying Amazon CloudFront Logs (p. 276)
Querying Amazon EMR Logs (p. 284) Querying Amazon VPC Flow Logs (p. 296)
Using CTAS and INSERT INTO for ETL and Data Analysis (p. 138)
• Use ROW FORMAT SERDE to explicitly specify the type of SerDe that Athena should use when it reads and writes data to the table. The following example specifies the LazySimpleSerDe. To specify the delimiters, use WITH SERDEPROPERTIES. The properties specified by WITH SERDEPROPERTIES correspond to the separate statements (like FIELDS TERMINATED BY) in the ROW FORMAT DELIMITED example.
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES (
'serialization.format' = ',', 'field.delim' = ',',
'collection.delim' = '|', 'mapkey.delim' = ':', 'escape.delim' = '\\'
Supported SerDes and Data Formats
)
For examples of ROW FORMAT SERDE, see the following topics:
Avro SerDe (p. 153) CloudTrail SerDe (p. 155) Grok SerDe (p. 160)
JSON SerDe Libraries (p. 163)
OpenCSVSerDe for Processing CSV (p. 157) Regex SerDe (p. 155)
Supported SerDes and Data Formats
Athena supports creating tables and querying data from CSV, TSV, custom-delimited, and JSON formats;
data from Hadoop-related formats: ORC, Apache Avro and Parquet; logs from Logstash, AWS CloudTrail logs, and Apache WebServer logs.
Note
The formats listed in this section are used by Athena for reading data. For information about formats that Athena uses for writing data when it runs CTAS queries, see Creating a Table from Query Results (CTAS) (p. 130).
To create tables and query data in these formats in Athena, specify a serializer-deserializer class (SerDe) so that Athena knows which format is used and how to parse the data.
This table lists the data formats supported in Athena and their corresponding SerDe libraries.
A SerDe is a custom library that tells the data catalog used by Athena how to handle the data. You specify a SerDe type by listing it explicitly in the ROW FORMAT part of your CREATE TABLE statement in Athena. In some cases, you can omit the SerDe name because Athena uses some SerDe types by default for certain types of data formats.
Supported Data Formats and SerDes
Data Format Description SerDe types supported in
Athena CSV (Comma-Separated Values) For data in CSV, each line
represents a data record, and each record consists of one or more fields, separated by commas.
• Use the LazySimpleSerDe for CSV, TSV, and Custom-Delimited Files (p. 167) if your data does not include values enclosed in quotes or if it uses the java.sql.Timestamp format.
• Use the OpenCSVSerDe for Processing CSV (p. 157) when your data includes quotes in values or uses the UNIX numeric format for TIMESTAMP (for example, 1564610311).
Supported SerDes and Data Formats
Data Format Description SerDe types supported in
Athena TSV (Tab-Separated Values) For data in TSV, each line
represents a data record, and each record consists of one or more fields, separated by tabs.
represents a data record, and each record consists of one or more fields, separated by tabs.