9.2 应用中 用 Spark SQL
9.2.3 SchemaRDD
读 数据和执行 会 SchemaRDD。SchemaRDD 和 数据 中的 的
。 理 ,SchemaRDD 由Row 成的RDD, 带 列数据
的 。Row 本数据 和字 等 的数 的 。
会 分中进 Row 的 。
需要特 的 , 的Spark 版本中 1.3 及 ,SchemaRDD 字 能会
DataFrame。 本书编 成时 中。
SchemaRDD RDD,所 应用 有的RDD 化 作, map()和
filter()。 而,SchemaRDD 了 的 能 。 要的 , 任
SchemaRDD 册 时 , 用HiveContext.sql SQLContext.sql 它 进行 了。 SchemaRDD 的registerTempTable()方法 , 9-9 9-11 所 。
时 用的HiveContext SQLContext 中的 时 量, 的应用
出时 时 存 了。
SchemaRDD 存 本数据 , 存 由 成的 和数 。
SchemaRDD 用HiveQL 法 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+
DDL 定义的 。 9-1 列出了 的数据 。3 表9-1:SchemaRDD中可以存储的数据类型
TINYINT Byte Byte/byte int/long ( -128 127 )
SMALLINT Short Short/short int/long ( -32768 32767
)
INT Int Int/int int long
BIGINT Long Long/long long
FLOAT Float Float /float float
DOUBLE Double Double/double float
3:编译时 -Phive打开Hive , 需打开-Phive-thriftserver 。——译者
DECIMAL Scala.math.BigDecimal java.math.BigDecimal decimal.Decimal
STRING String String string
BINARY Array[Byte] byte[] bytearray
BOOLEAN Boolean Boolean/boolean bool
TIMESTAMP java.sql.TimeStamp java.sql.TimeStamp datetime.datetime
ARRAY<DATA_TYPE> Seq List list、tuple array
MAP<KEY_TYPE,VAL_TYPE> Map Map dict
STRUCT<COL1:
COL1_TYPE, ...>
Row Row Row
, , Spark SQL 中 的Row 。所有
互 。 , 有 成的数 , 的 。
Row
Row SchemaRDD 中的记 , 本 定 的字 数 。 Scala/Java 中,
Row 有 列getter 方法, 字 的 。 的 方法get
Scala 中的apply ,读 列的序号 Object Scala 中的Any
的 , 由 的 。 Boolean、Byte、Double、Float、Int、
Long、Short和String , 有 应的getType()方法, 作 应的
。 ,getString(0)会 字 0 的 作 字 , 9-12 和 9-13 所 。
9-12: Scala 中 问 topTweet SchemaRDD 中的 text 列 第 列 val topTweetText = topTweets.map(row => row.getString(0))
9-13: Java 中 问 topTweet SchemaRDD 中的 text 列 第 列
JavaRDD<String> topTweetText = topTweets.toJavaRDD().map(new Function<Row, String>() { public String call(Row row) {
return row.getString(0);
}});
Python 中,由 有 式的 ,Row 有 同。 用row[i] 问 第i 元 。 ,Python 中的Row row.column_name的 式 用 字 问
中的字 , 9-14 所 。 定具 的列 , 会 9.3.3 中讲
出 。
9-14: Python 中 问 topTweet SchemaRDD 中的 text 列 topTweetText = topTweets.map(lambda row: row.text)
9.2.4
Spark SQL 的 存 制 Spark 中的 有 同。由 道 列的 ,所
Spark 加高效地存 数据。 了 保 用 存的 方式进行 存而
存 ,应 用 的hiveCtx.cacheTable("tableName")方法。 存数据 时,
Spark SQL 用 列式存 式 存中 数据。 存 的 会
序的生 保 存中,所 进 出, 需要 新 存数据。和 存
RDD 时的 , 同 的数据 次运行任 时, 应 数据
存 。
Spark 1.2 中,RDD 有的cache()方法 会 发 次 cacheTable()
方法的 用。
用HiveQL/SQL 存 。 需要运行CACHE TABLEtableName UNCACHE
TABLEtableName 存 者 有的 存 。 用方式 JDBC 的
行 中 用。
存的SchemaRDD RDD 的方式 Spark 的应用用 中 , 图 9-2 所 。
图9-2:Spark SQL 的 SchemaRDD 用户界面
会 9.6 Spark SQL 中的 存 制 能的 。
9.3 数据
Spark SQL 化数据 , 的读 , 数据
中读 Row 。 数据 Hive 、JSON 和 Parquet 件。 , 用 SQL 数据 中的数据 用 了 分字 时,Spark SQL 能地
用 的字 ,而 SparkContext.hadoopFile中 简 地 数据。
数 据 , 序 中 定 , 的RDD 化
SchemaRDD。 Python 者Java 运行SQL 加简 。 需要计算
数 时,SQL 加简 要同时 出 年 、 大年 、 的用
ID 数 等 。 , 自 地 RDD 和 自 Spark SQL 数据 的 SchemaRDD 进行 作。 本 中, 会讲解 数据 及 用RDD 的方式。
9.3.1 Apache Hive
Hive 中读 数据时,Spark SQL 任 Hive 的存 式 SerDe , 本 件、RCFiles、ORC、Parquet、Avro, 及 Protocol Buffer。
要 Spark SQL 的Hive , 需要 Hive 。 需要 的hive-site.xml 件 制 Spark 的 ./conf/ 。 Spark SQL 而 有 hive-site.xml 件, Spark SQL 会 用本地的Hive 元数据 , 同
地 数据读 Hive 中进行 。
9-15 9-17 了 张Hive 。Hive 有 列,分 key
和value 字 。 会 本 的 。
9-15: 用 Python Hive 读
from pyspark.sql import HiveContext hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT key, value FROM mytable") keys = rows.map(lambda row: row[0])
9-16: 用 Scala Hive 读
import org.apache.spark.sql.hive.HiveContext val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT key, value FROM mytable") val keys = rows.map(row => row.getInt(0))
9-17: 用 Java Hive 读
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SchemaRDD;
HiveContext hiveCtx = new HiveContext(sc);
SchemaRDD rows = hiveCtx.sql("SELECT key, value FROM mytable");
JavaRDD<Integer> keys = rdd.toJavaRDD().map(new Function<Row, Integer>() { public Integer call(Row row) { return row.getInt(0); }
});
9.3.2 Parquet
Parquet http://parquet.apache.org/ 行的列式存 式, 高效地存 具有 字 的记 。Parquet 式 Hadoop 生 中 用,它 Spark SQL 的 数 据 。Spark SQL 了 读 和存 Parquet 式 件的方法。
, HiveContext.parquetFile 者SQLContext.parquetFile 读 数据,
9-18 所 。
9-18:Python 中的 Parquet 数据读
# 有name和favouriteAnimal字 的Parquet 件中读 数据 rows = hiveCtx.parquetFile(parquetFile)
names = rows.map(lambda row: row.name) print "Everyone"
print names.collect()
Parquet 件 册 Spark SQL 的 时 , 张 运行 。
9-18 中 读 了数据, 9-19 所 的 数据进行 。
9-19:Python 中的 Parquet 数据
# 者
tbl = rows.registerTempTable("people")
pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal =
\"panda\"")
print "Panda friends"
print pandaFriends.map(lambda row: row.name).collect()
, 用saveAsParquetFile() SchemaRDD 的 Parquet 式保存,
9-20 所 。
9-20:Python 中的 Parquet 件保存
pandaFriends.saveAsParquetFile("hdfs://...")
要 读 JSON 数 据, 要 用hiveCtx中 的jsonFile()方 法 , 9-22 9-24
所 。 数据中 出 的 , 生成的SchemaRDD 用
printSchema方法 9-25 。
9-21: 记 {"name": "Holden"}
{"name": "Sparky The Bear", "lovesPandas":true,"knows": {"friends":["holden"]}}
9-22: Python 中 用 Spark SQL 读 JSON 数据 input = hiveCtx.jsonFile(inputFile)
9-23: Scala 中 用 Spark SQL 读 JSON 数据 val input = hiveCtx.jsonFile(inputFile) 9-24: Java 中 用 Spark SQL 读 JSON 数据
SchemaRDD input = hiveCtx.jsonFile(jsonFile);
9-25:printSchema() 出的 root
|-- knows: struct (nullable = true) | |-- friends: array (nullable = true)
| | |-- element: string (containsNull = false) |-- lovesPandas: boolean (nullable = true)
|-- name: string (nullable = true)
9-26 中 生成的 。
9-26: 的 分 root
|-- contributorsIDs: array (nullable = true) | |-- element: string (containsNull = false) |-- createdAt: string (nullable = true)
|-- currentUserRetweetId: integer (nullable = true) |-- hashtagEntities: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- end: integer (nullable = true) | | |-- start: integer (nullable = true) | | |-- text: string (nullable = true) |-- id: long (nullable = true)
|-- inReplyToScreenName: string (nullable = true) |-- inReplyToStatusId: long (nullable = true) |-- inReplyToUserId: long (nullable = true) |-- isFavorited: boolean (nullable = true)
|-- isPossiblySensitive: boolean (nullable = true) |-- isTruncated: boolean (nullable = true)
|-- mediaEntities: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- displayURL: string (nullable = true) | | |-- end: integer (nullable = true)
| | |-- expandedURL: string (nullable = true) |-- retweetCount: integer (nullable = true) ...
的 , 会自 而 地 问 字 和数 字 问题。
用Python, 数据 册 了 张SQL , . 问 的
元 toplevel.nextlevel 。而 SQL 中 用[element] 定
问数 中的元 , 9-27 所 。
9-27:用 SQL 数据 及数 元
select hashtagEntities[0].text from tweets LIMIT 1;
9.3.4 RDD
happyPeopleRDD = sc.parallelize([Row(name="holden", favouriteBeverage="coffee")]) happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD)
happyPeopleSchemaRDD.registerTempTable("happy_people")
用Scala 的 , 的 式 会 处理 的 9-29 。
9-29: Scala 中 case class SchemaRDD
case class HappyPerson(handle: String, favouriteBeverage: String) ...
// 了 人的 , 它 成SchemaRDD
val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee")))
// : 处发生了 式
// 等价 sqlCtx.createSchemaRDD(happyPeopleRDD) happyPeopleRDD.registerTempTable("happy_people")
Java 中, 用applySchema() RDD SchemaRDD, 需要 RDD 中的数 据 带有 有的getter 和 setter 方法, 序列化, 9-30 所 。
9-30: Java 中 JavaBean SchemaRDD class HappyPerson implements Serializable { private String name;
private String favouriteBeverage;
public HappyPerson() {}
public HappyPerson(String n, String b) { name = n; favouriteBeverage = b;
}
public String getName() { return name; } public void setName(String n) { name = n; }
public String getFavouriteBeverage() { return favouriteBeverage; } public void setFavouriteBeverage(String b) { favouriteBeverage = b; } };
...
ArrayList<HappyPerson> peopleList = new ArrayList<HappyPerson>();
peopleList.add(new HappyPerson("holden", "coffee"));
JavaRDD<HappyPerson> happyPeopleRDD = sc.parallelize(peopleList);
SchemaRDD happyPeopleSchemaRDD = hiveCtx.applySchema(happyPeopleRDD, HappyPerson.class);
happyPeopleSchemaRDD.registerTempTable("happy_people");
Spark SQL 的 JDBC Hive 中的 HiveServer2 。由 用了Thrift ,它 Thrift server 。 ,JDBC 需要Spark 打开Hive 的 编译。4
Spark 中 的sbin/start-thriftserver.sh 9-31 。
本 的 数 大 spark-submit 同 7.3 。 , 会
4:codegen 打开时, 有 能会 , Spark SQL 需要 分析 编译代 , , 作 能
codegen 所带 的 能 。——译者
localhost:10000 进 行 , 量 HIVE_SERVER2_THRIFT_PORT
和HIVE_SERVER2_THRIFT_BIND_HOST , Hive hive.
server2.thrift.port和hive.server2.thrift.bind.host 。 行
数--hiveconf property=value Hive 。
9-31: JDBC
./sbin/start-thriftserver.sh --master sparkMaster
Spark 自带了Beeline 序, 用它 JDBC , 9-32 和图 9-3 所 。 简 的SQL shell 运行 。
9-32: 用 Beeline JDBC
holden@hmbp2:~/repos/spark$ ./bin/beeline -u jdbc:hive2://localhost:10000 Spark assembly has been built with Hive, including Datanucleus jars on classpath scan complete in 1ms
Connecting to jdbc:hive2://localhost:10000 Connected to: Spark SQL (version 1.2.0-SNAPSHOT) Driver: spark-assembly (version 1.2.0-SNAPSHOT) Transaction isolation: TRANSACTION_REPEATABLE_READ Beeline version 1.2.0-SNAPSHOT by Apache Hive 0: jdbc:hive2://localhost:10000> show tables;
+---+
| result | +---+
| pokes | +---+
1 row selected (1.182 seconds) 0: jdbc:hive2://localhost:10000>
图9-3:启动 JDBC 服务器并使用 Beeline 客户端连接
JDBC 时,JDBC 会 台运行 所有 出 定
件中。 用JDBC 进行 的 中 了问
题, 的 。
工 具 ODBC Spark SQL。Spark SQL 的 ODBC 由Simba http://www.simba.com/ 制 作, Spark 应 处 DataBricks Cloud、Datastax 及MapR 。它 会 Microstrategy Tableau 的 能工 具所用 的工具 Spark SQL 。由 Spark SQL 用了和
Hive 同的 及 ,大 数 Hive 的 能工具
有的Hive Spark SQL 。
9.4.1 Beeline
Beeline 中, 用 的HiveQL 、列 及 数据 。
Hive 册 https://cwiki.apache.org/confluence/display/Hive/LanguageManual 中
HiveQL 的所有 法 , 的 作。
,要 本地数据 张数据 , 用CREATE TABLE 。 用LOAD DATA
进行数据读 。Hive 读 带有 定分 的 本 件, CSV 等 式的 件,
9-33 所 。 9-33:读 数据
> CREATE TABLE IF NOT EXISTS mytable (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY , ;
> LOAD DATA LOCAL INPATH learning-spark-examples/files/int_string.csv INTO TABLE mytable;
要 列 数 据 , 用SHOW TABLES 9-34 所 。 DESCRIBE
tableName 张 的 。
9-34:列 数据
> SHOW TABLES;
mytable
Time taken: 0.052 seconds
要 存数据 , 用CACHE TABLE tableName 。 存 用UNCACHE
TABLE tableName 的 存。需要 的 , , 存的 会
JDBC 的所有 共 。
, Beeline 中 计 简 , 运行EXPLAIN , 9-35 所 。
9-35:Spark SQL shell 执行EXPLAIN
spark-sql> EXPLAIN SELECT * FROM mytable where key = 1;
== Physical Plan ==
Filter (key#16 = 1)
HiveTableScan [key#16,value#17], (MetastoreRelation default, mytable, None), None Time taken: 0.551 seconds
计 ,Spark SQL HiveTableScan 用了 作。
, SQL 数据进行 。Beeline shell 用 共 的
存数据 进行快速的数据 有用的。
9.4.2
用Spark SQL 的 JDBC 的 同 序 共 存
的数据 。JDBC Thrift 序, 共 成 了 能。
中所 , 需要 册 数据 运行CACHE , 用 存了。
Spark SQL shell
了JDBC ,Spark SQL 作 的进 用的简
shell, ./bin/spark-sql 。 shell 会
conf/hive-site.xml 中的 Hive 的元数据 。 存 的元数据 ,Spark SQL
会 本地新 。 本 要 本地开发 有用。 共 的集
, 应 用JDBC , 用 beeline进行 。
9.5 数
用 自定义 数, UDF, 用Python/Java/Scala 册自定义 数, SQL
中 用。 方法 用, 用 的SQL 用 高 能 ,
用 用 册的 数而 需自 编 了。 Spark SQL 中,编
UDF 简 。Spark SQL 有自 的UDF , 有的Apache Hive UDF。
9.5.1 Spark SQL UDF
用Spark 的编 编 数, Spark SQL 的方法 进
, 捷地 册 自 的UDF。 Scala 和 Python 中, 用 生的 数和 lambda 法的 ,而 Java 中, 需要 应的UDF 。UDF 能 数据
, 用时的 数 。
Python 和 Java 中, 需要用 9-1 中列出的 SchemaRDD 应的 定
。Java 中 的 应 org.apache.spark.sql.api.java.DataType中 , 而 Python 中 需要 DataType 。
9-36 和 9-37 中, 用 计算字 的 简 的UDF,
用它 计算 的 。
9-36:Python 版本 字 UDF
# 字 的UDF
hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType()) lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10") 9-37:Scala 版本的字 UDF
registerFunction("strLenScala", (_: String).length)
val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10") Java 中定义 UDF 需要 的import 。和 定义RDD 数时 , 据
hiveCtx.udf().register("stringLengthJava", new UDF1<String, Integer>() { @Override
public Integer call(String str) throws Exception { return str.length();
}
}, DataTypes.IntegerType);
SchemaRDD tweetLength = hiveCtx.sql(
"SELECT stringLengthJava('text') FROM tweets LIMIT 10");
List<Row> lengths = tweetLength.collect();
for (Row row : result) {
System.out.println(row.get(0));
} Hive UDF, 需 用hiveCtx.sql("CREATE TEMPORARY FUNCTION name AS class.function")。
9.6 Spark SQL
本 开 所 的 ,Spark SQL 的高 及 加的 Spark
SQL 数据 加高效。
Spark SQL SQL 的用 用的。Spark SQL 有 件的 合 作
, 列进行 9-40 所 。 用Spark SQL 需要 第6 中
的 特 的 进行 作。
9-40:Spark SQL 列 和
SELECT SUM(user.favouritesCount), SUM(retweetCount), user.id FROM tweets GROUP BY user.id
Spark SQL 用 的了解 高效地 数据。 存数据时,Spark SQL 用
存式的列式存 。 了 存的 ,而 能地 了 中
字 时的数据读 。
Spark SQL 中的 分工作 。
需 Spark 中读 特定的记 , 的方法 读 数据集, 执行
件。 而, Spark SQL 中, 的数据存 读 的记
, 制 件,Spark SQL 中的 制 件 数据存
, 而大大 需要读 的数据。
Spark SQL 的 能 有 , 9-2 所列。
表9-2:Spark SQL中的性能选项
spark.sql.codegen false true时,Spark SQL 会
运行时编译 Java 进制代 。
高大 的 能, 进行
时会
spark.sql.inMemoryColumnarStorage.compressed false 自 存中的列式存 进行 spark.sql.inMemoryColumnarStorage.batchSize 1000 列式 存时的 处理的大 。
大 能会 存 的
spark.sql.parquet.compression.codec snappy 用 编 。 的
uncompressed/snappy/gzip/lzo
用JDBC 和Beeline shell 时, set 能 的
, 9-41 所 。
9-41:打开codegen 的Beeline beeline> set spark.sql.codegen=true;
SET spark.sql.codegen=true spark.sql.codegen=true Time taken: 1.196 seconds
的Spark SQL 应用中, Spark 中 Spark , 9-42 所 。 9-42: Scala 中打开codegen 的代
conf.set("spark.sql.codegen", "true")
的 需要 特 的 量。第 spark.sql.codegen,
Spark SQL 运行 编译 Java 进制代 。由 生成了 运行 定
的代 ,codegen 大 者 的 快。 而, 运行特 快
1 2 的 时 时,codegen 有 能会增加 开 , codegen 需要
编译的 。5codegen 的 能, 所有大 的
者 运行的 中 用codegen。
时 能需要 的第 spark.sql.inMemoryColumnarStorage.batchSize。
存SchemaRDD 时,Spark SQL 会 制定的大 1000 记 分
, 分 。 的 处理大 会 ,而 处理大 大的 ,
次处理的数据 存所能 的大 时, 有 能会 发问题。 中的
记 大 数 字 者 网 大的字 字 , 能需要
处理大 存 OOM 的 。 的 , 的 处
理大 合适的, 1000 记 时 本 法 高的 了。
9.7
, 学 了Spark 用Spark SQL 进行 化和 化数据处理的方式。 了本 的 ,第3 第6 中讲 的 作RDD 的方法同 适用 Spark SQL
中的SchemaRDD。 时 , 会 SQL 的编 合 用, 分
用SQL 的简 和编 辑的 。而 用Spark SQL 时,Spark 执
行 能 据数据的 进行 化, 中 。
5: ,codegen 打开时 开 的 会 , Spark SQL 需要 化它的编译 。所
codegen 的 开 应 运行4 5 。
第 1 0 章
Spark Streaming
应用需要 时处理收 的数据, 用 时 问 计的应用、 学
的应用, 有自 的应用。Spark Streaming Spark 应用而 计的
。它 用 用 和 处理 的API 编 式计算应用, 大
量 用 处理应用的技术 代 。
和Spark RDD 的 ,Spark Streaming 用离散化流 discretized stream 作
, 作DStream。DStream 时 而收 的数据的序列。 , 时 区 收 的数据 作 RDD 存 ,而 DStream 由 RDD 所 成的序列
化 。DStream , Flume、Kafka 者HDFS。
出 的DStream 作, 转化操作 transformation ,会生成 新的 DStream, 输出操作 output operation , 数据 中。DStream
了 RDD 所 的 作 的 作 , 增加了 时 的新 作,
。
和 处理 序 同,Spark Streaming 应用需要进行 保 24/7 工作。本
会 检查点 checkpointing 制, 数据存 件 HDFS
的 制, Spark Streaming 用 工作的 要方式。 , 会讲
时 应用, 及 应用 自 式。
, Spark 1.1 ,Spark Streaming Java 和 Scala 中 用。 的Python Spark 1.2 中 , 本数据。本 用Java 和 Scala 所有的 API, 的 Python 适用的。
10.1
10-1:Spark Streaming 的 Maven groupId = org.apache.spark artifactId = spark-streaming_2.10 version = 1.2.0
StreamingContext 开 ,它 计算 能的 要 。StreamingContext 会 出SparkContext,用 处理数据。 造 数 收用 定 时 处理 次新数据的批次间隔 batch interval 作 , 它 1 。 ,
用socketTextStream() 出 本地7777 收 的 本数据的DStream。
用socketTextStream() 出 本地7777 收 的 本数据的DStream。