• 沒有找到結果。

SchemaRDD

在文檔中 快速大数据分析 (頁 166-0)

9.2  应用中 用 Spark SQL

9.2.3 SchemaRDD

读 数据和执行 会 SchemaRDD。SchemaRDD 和 数据 中的 的

。 理 ,SchemaRDD 由Row 成的RDD, 带 列数据

的 。Row 本数据 和字 等 的数 的 。

会 分中进 Row 的 。

需要特 的 , 的Spark 版本中 1.3 及 ,SchemaRDD 字 能会

DataFrame。 本书编 成时 中。

SchemaRDD RDD,所 应用 有的RDD 化 作, map()和

filter()。 而,SchemaRDD 了 的 能 。 要的 , 任

SchemaRDD 册 时 , 用HiveContext.sql SQLContext.sql 它 进行 了。 SchemaRDD 的registerTempTable()方法 , 9-9 9-11 所 。

时 用的HiveContext SQLContext 中的 时 量, 的应用

出时 时 存 了。

SchemaRDD 存 本数据 , 存 由 成的 和数 。

SchemaRDD 用HiveQL 法 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+

DDL 定义的 。 9-1 列出了 的数据 。3 表9-1:SchemaRDD中可以存储的数据类型

TINYINT Byte Byte/byte int/long ( -128 127 )

SMALLINT Short Short/short int/long ( -32768 32767

)

INT Int Int/int int long

BIGINT Long Long/long long

FLOAT Float Float /float float

DOUBLE Double Double/double float

3:编译时 -Phive打开Hive , 需打开-Phive-thriftserver 。——译者

DECIMAL Scala.math.BigDecimal java.math.BigDecimal decimal.Decimal

STRING String String string

BINARY Array[Byte] byte[] bytearray

BOOLEAN Boolean Boolean/boolean bool

TIMESTAMP java.sql.TimeStamp java.sql.TimeStamp datetime.datetime

ARRAY<DATA_TYPE> Seq List listtuple array

MAP<KEY_TYPE,VAL_TYPE> Map Map dict

STRUCT<COL1:

COL1_TYPE, ...>

Row Row Row

, , Spark SQL 中 的Row 。所有

互 。 , 有 成的数 , 的 。

Row

Row SchemaRDD 中的记 , 本 定 的字 数 。 Scala/Java 中,

Row 有 列getter 方法, 字 的 。 的 方法get

Scala 中的apply ,读 列的序号 Object Scala 中的Any

的 , 由 的 。 Boolean、Byte、Double、Float、Int、

Long、Short和String , 有 应的getType()方法, 作 应的

。 ,getString(0)会 字 0 的 作 字 , 9-12 和 9-13 所 。

9-12: Scala 中 问 topTweet SchemaRDD 中的 text 列 第 列 val topTweetText = topTweets.map(row => row.getString(0))

9-13: Java 中 问 topTweet SchemaRDD 中的 text 列 第 列

JavaRDD<String> topTweetText = topTweets.toJavaRDD().map(new Function<Row, String>() { public String call(Row row) {

return row.getString(0);

}});

Python 中,由 有 式的 ,Row 有 同。 用row[i] 问 第i 元 。 ,Python 中的Row row.column_name的 式 用 字 问

中的字 , 9-14 所 。 定具 的列 , 会 9.3.3 中讲

出 。

9-14: Python 中 问 topTweet SchemaRDD 中的 text 列 topTweetText = topTweets.map(lambda row: row.text)

  

9.2.4

Spark SQL 的 存 制 Spark 中的 有 同。由 道 列的 ,所

Spark 加高效地存 数据。 了 保 用 存的 方式进行 存而

存 ,应 用 的hiveCtx.cacheTable("tableName")方法。 存数据 时,

Spark SQL 用 列式存 式 存中 数据。 存 的 会

序的生 保 存中,所 进 出, 需要 新 存数据。和 存

RDD 时的 , 同 的数据 次运行任 时, 应 数据

存 。

Spark 1.2 中,RDD 有的cache()方法 会 发 次 cacheTable()

方法的 用。

用HiveQL/SQL 存 。 需要运行CACHE TABLEtableName UNCACHE

TABLEtableName 存 者 有的 存 。 用方式 JDBC 的

行 中 用。

存的SchemaRDD RDD 的方式 Spark 的应用用 中 , 图 9-2 所 。

图9-2:Spark SQL 的 SchemaRDD 用户界面

会 9.6 Spark SQL 中的 存 制 能的 。

9.3 数据

Spark SQL 化数据 , 的读 , 数据

中读 Row 。 数据 Hive 、JSON 和 Parquet 件。 , 用 SQL 数据 中的数据 用 了 分字 时,Spark SQL 能地

用 的字 ,而 SparkContext.hadoopFile中 简 地 数据。

数 据 , 序 中 定 , 的RDD 化

SchemaRDD。 Python 者Java 运行SQL 加简 。 需要计算

数 时,SQL 加简 要同时 出 年 、 大年 、 的用

ID 数 等 。 , 自 地 RDD 和 自 Spark SQL 数据 的 SchemaRDD 进行 作。 本 中, 会讲解 数据 及 用RDD 的方式。

9.3.1 Apache Hive

Hive 中读 数据时,Spark SQL 任 Hive 的存 式 SerDe , 本 件、RCFiles、ORC、Parquet、Avro, 及 Protocol Buffer。

要 Spark SQL 的Hive , 需要 Hive 。 需要 的hive-site.xml 件 制 Spark 的 ./conf/ 。 Spark SQL 而 有 hive-site.xml 件, Spark SQL 会 用本地的Hive 元数据 , 同

地 数据读 Hive 中进行 。

9-15 9-17 了 张Hive 。Hive 有 列,分 key

和value 字 。 会 本 的 。

9-15: 用 Python Hive 读

from pyspark.sql import HiveContext hiveCtx = HiveContext(sc)

rows = hiveCtx.sql("SELECT key, value FROM mytable") keys = rows.map(lambda row: row[0])

9-16: 用 Scala Hive 读

import org.apache.spark.sql.hive.HiveContext val hiveCtx = new HiveContext(sc)

val rows = hiveCtx.sql("SELECT key, value FROM mytable") val keys = rows.map(row => row.getInt(0))

9-17: 用 Java Hive 读

import org.apache.spark.sql.hive.HiveContext;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SchemaRDD;

HiveContext hiveCtx = new HiveContext(sc);

SchemaRDD rows = hiveCtx.sql("SELECT key, value FROM mytable");

JavaRDD<Integer> keys = rdd.toJavaRDD().map(new Function<Row, Integer>() { public Integer call(Row row) { return row.getInt(0); }

});

9.3.2 Parquet

Parquet http://parquet.apache.org/ 行的列式存 式, 高效地存 具有 字 的记 。Parquet 式 Hadoop 生 中 用,它 Spark SQL 的 数 据 。Spark SQL 了 读 和存 Parquet 式 件的方法。

, HiveContext.parquetFile 者SQLContext.parquetFile 读 数据,

9-18 所 。

9-18:Python 中的 Parquet 数据读

# 有name和favouriteAnimal字 的Parquet 件中读 数据 rows = hiveCtx.parquetFile(parquetFile)

names = rows.map(lambda row: row.name) print "Everyone"

print names.collect()

Parquet 件 册 Spark SQL 的 时 , 张 运行 。

9-18 中 读 了数据, 9-19 所 的 数据进行 。

9-19:Python 中的 Parquet 数据

# 者

tbl = rows.registerTempTable("people")

pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal =

\"panda\"")

print "Panda friends"

print pandaFriends.map(lambda row: row.name).collect()

, 用saveAsParquetFile() SchemaRDD 的 Parquet 式保存,

9-20 所 。

9-20:Python 中的 Parquet 件保存

pandaFriends.saveAsParquetFile("hdfs://...")

要 读 JSON 数 据, 要 用hiveCtx中 的jsonFile()方 法 , 9-22 9-24

所 。 数据中 出 的 , 生成的SchemaRDD 用

printSchema方法 9-25 。

9-21: 记 {"name": "Holden"}

{"name": "Sparky The Bear", "lovesPandas":true,"knows": {"friends":["holden"]}}

9-22: Python 中 用 Spark SQL 读 JSON 数据 input = hiveCtx.jsonFile(inputFile)

9-23: Scala 中 用 Spark SQL 读 JSON 数据 val input = hiveCtx.jsonFile(inputFile) 9-24: Java 中 用 Spark SQL 读 JSON 数据

SchemaRDD input = hiveCtx.jsonFile(jsonFile);

9-25:printSchema() 出的 root

|-- knows: struct (nullable = true) | |-- friends: array (nullable = true)

| | |-- element: string (containsNull = false) |-- lovesPandas: boolean (nullable = true)

|-- name: string (nullable = true)

9-26 中 生成的 。

9-26: 的 分 root

|-- contributorsIDs: array (nullable = true) | |-- element: string (containsNull = false) |-- createdAt: string (nullable = true)

|-- currentUserRetweetId: integer (nullable = true) |-- hashtagEntities: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- end: integer (nullable = true) | | |-- start: integer (nullable = true) | | |-- text: string (nullable = true) |-- id: long (nullable = true)

|-- inReplyToScreenName: string (nullable = true) |-- inReplyToStatusId: long (nullable = true) |-- inReplyToUserId: long (nullable = true) |-- isFavorited: boolean (nullable = true)

|-- isPossiblySensitive: boolean (nullable = true) |-- isTruncated: boolean (nullable = true)

|-- mediaEntities: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- displayURL: string (nullable = true) | | |-- end: integer (nullable = true)

| | |-- expandedURL: string (nullable = true) |-- retweetCount: integer (nullable = true) ...

的 , 会自 而 地 问 字 和数 字 问题。

用Python, 数据 册 了 张SQL , . 问 的

toplevel.nextlevel 。而 SQL 中 用[element]

问数 中的元 , 9-27 所 。

9-27:用 SQL 数据 及数 元

select hashtagEntities[0].text from tweets LIMIT 1;

9.3.4 RDD

happyPeopleRDD = sc.parallelize([Row(name="holden", favouriteBeverage="coffee")]) happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD)

happyPeopleSchemaRDD.registerTempTable("happy_people")

用Scala 的 , 的 式 会 处理 的 9-29 。

9-29: Scala 中 case class SchemaRDD

case class HappyPerson(handle: String, favouriteBeverage: String) ...

// 了 人的 , 它 成SchemaRDD

val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee")))

// : 处发生了 式

// 等价 sqlCtx.createSchemaRDD(happyPeopleRDD) happyPeopleRDD.registerTempTable("happy_people")

Java 中, 用applySchema() RDD SchemaRDD, 需要 RDD 中的数 据 带有 有的getter 和 setter 方法, 序列化, 9-30 所 。

9-30: Java 中 JavaBean SchemaRDD class HappyPerson implements Serializable { private String name;

private String favouriteBeverage;

public HappyPerson() {}

public HappyPerson(String n, String b) { name = n; favouriteBeverage = b;

}

public String getName() { return name; } public void setName(String n) { name = n; }

public String getFavouriteBeverage() { return favouriteBeverage; } public void setFavouriteBeverage(String b) { favouriteBeverage = b; } };

...

ArrayList<HappyPerson> peopleList = new ArrayList<HappyPerson>();

peopleList.add(new HappyPerson("holden", "coffee"));

JavaRDD<HappyPerson> happyPeopleRDD = sc.parallelize(peopleList);

SchemaRDD happyPeopleSchemaRDD = hiveCtx.applySchema(happyPeopleRDD, HappyPerson.class);

happyPeopleSchemaRDD.registerTempTable("happy_people");

Spark SQL 的 JDBC Hive 中的 HiveServer2 。由 用了Thrift ,它 Thrift server 。 ,JDBC 需要Spark 打开Hive 的 编译。4

Spark 中 的sbin/start-thriftserver.sh 9-31 。

本 的 数 大 spark-submit 同 7.3 。 , 会

4:codegen 打开时, 有 能会 , Spark SQL 需要 分析 编译代 , , 作 能

codegen 所带 的 能 。——译者

localhost:10000 进 行 , 量 HIVE_SERVER2_THRIFT_PORT

和HIVE_SERVER2_THRIFT_BIND_HOST , Hive hive.

server2.thrift.port和hive.server2.thrift.bind.host 。 行

数--hiveconf property=value Hive 。

9-31: JDBC

./sbin/start-thriftserver.sh --master sparkMaster

Spark 自带了Beeline 序, 用它 JDBC , 9-32 和图 9-3 所 。 简 的SQL shell 运行 。

9-32: 用 Beeline JDBC

holden@hmbp2:~/repos/spark$ ./bin/beeline -u jdbc:hive2://localhost:10000 Spark assembly has been built with Hive, including Datanucleus jars on classpath scan complete in 1ms

Connecting to jdbc:hive2://localhost:10000 Connected to: Spark SQL (version 1.2.0-SNAPSHOT) Driver: spark-assembly (version 1.2.0-SNAPSHOT) Transaction isolation: TRANSACTION_REPEATABLE_READ Beeline version 1.2.0-SNAPSHOT by Apache Hive 0: jdbc:hive2://localhost:10000> show tables;

+---+

| result | +---+

| pokes | +---+

1 row selected (1.182 seconds) 0: jdbc:hive2://localhost:10000>

图9-3:启动 JDBC 服务器并使用 Beeline 客户端连接

JDBC 时,JDBC 会 台运行 所有 出 定

件中。 用JDBC 进行 的 中 了问

题, 的 。

工 具 ODBC Spark SQL。Spark SQL 的 ODBC 由Simba http://www.simba.com/ 制 作, Spark 应 处 DataBricks Cloud、Datastax 及MapR 。它 会 Microstrategy Tableau 的 能工 具所用 的工具 Spark SQL 。由 Spark SQL 用了和

Hive 同的 及 ,大 数 Hive 的 能工具

有的Hive Spark SQL 。

9.4.1 Beeline

Beeline 中, 用 的HiveQL 、列 及 数据 。

Hive 册 https://cwiki.apache.org/confluence/display/Hive/LanguageManual 中

HiveQL 的所有 法 , 的 作。

,要 本地数据 张数据 , 用CREATE TABLE 。 用LOAD DATA

进行数据读 。Hive 读 带有 定分 的 本 件, CSV 等 式的 件,

9-33 所 。 9-33:读 数据

> CREATE TABLE IF NOT EXISTS mytable (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY , ;

> LOAD DATA LOCAL INPATH learning-spark-examples/files/int_string.csv INTO TABLE mytable;

要 列 数 据 , 用SHOW TABLES 9-34 所 。 DESCRIBE

tableName 张 的 。

9-34:列 数据

> SHOW TABLES;

mytable

Time taken: 0.052 seconds

要 存数据 , 用CACHE TABLE tableName 。 存 用UNCACHE

TABLE tableName 的 存。需要 的 , , 存的 会

JDBC 的所有 共 。

, Beeline 中 计 简 , 运行EXPLAIN , 9-35 所 。

9-35:Spark SQL shell 执行EXPLAIN

spark-sql> EXPLAIN SELECT * FROM mytable where key = 1;

== Physical Plan ==

Filter (key#16 = 1)

HiveTableScan [key#16,value#17], (MetastoreRelation default, mytable, None), None Time taken: 0.551 seconds

计 ,Spark SQL HiveTableScan 用了 作。

, SQL 数据进行 。Beeline shell 用 共 的

存数据 进行快速的数据 有用的。

9.4.2

用Spark SQL 的 JDBC 的 同 序 共 存

的数据 。JDBC Thrift 序, 共 成 了 能。

中所 , 需要 册 数据 运行CACHE , 用 存了。

Spark SQL shell

了JDBC ,Spark SQL 作 的进 用的简

shell, ./bin/spark-sql 。 shell 会

conf/hive-site.xml 中的 Hive 的元数据 。 存 的元数据 ,Spark SQL

会 本地新 。 本 要 本地开发 有用。 共 的集

, 应 用JDBC , 用 beeline进行 。

9.5 数

用 自定义 数, UDF, 用Python/Java/Scala 册自定义 数, SQL

中 用。 方法 用, 用 的SQL 用 高 能 ,

用 用 册的 数而 需自 编 了。 Spark SQL 中,编

UDF 简 。Spark SQL 有自 的UDF , 有的Apache Hive UDF。

9.5.1 Spark SQL UDF

用Spark 的编 编 数, Spark SQL 的方法 进

, 捷地 册 自 的UDF。 Scala 和 Python 中, 用 生的 数和 lambda 法的 ,而 Java 中, 需要 应的UDF 。UDF 能 数据

, 用时的 数 。

Python 和 Java 中, 需要用 9-1 中列出的 SchemaRDD 应的 定

。Java 中 的 应 org.apache.spark.sql.api.java.DataType中 , 而 Python 中 需要 DataType 。

9-36 和 9-37 中, 用 计算字 的 简 的UDF,

用它 计算 的 。

9-36:Python 版本 字 UDF

# 字 的UDF

hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType()) lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10") 9-37:Scala 版本的字 UDF

registerFunction("strLenScala", (_: String).length)

val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10") Java 中定义 UDF 需要 的import 。和 定义RDD 数时 , 据

hiveCtx.udf().register("stringLengthJava", new UDF1<String, Integer>() { @Override

public Integer call(String str) throws Exception { return str.length();

}

}, DataTypes.IntegerType);

SchemaRDD tweetLength = hiveCtx.sql(

"SELECT stringLengthJava('text') FROM tweets LIMIT 10");

List<Row> lengths = tweetLength.collect();

for (Row row : result) {

System.out.println(row.get(0));

} Hive UDF, 需 用hiveCtx.sql("CREATE TEMPORARY FUNCTION name AS class.function")。

9.6 Spark SQL

本 开 所 的 ,Spark SQL 的高 及 加的 Spark

SQL 数据 加高效。

Spark SQL SQL 的用 用的。Spark SQL 有 件的 合 作

, 列进行 9-40 所 。 用Spark SQL 需要 第6 中

的 特 的 进行 作。

9-40:Spark SQL 列 和

SELECT SUM(user.favouritesCount), SUM(retweetCount), user.id FROM tweets GROUP BY user.id

Spark SQL 用 的了解 高效地 数据。 存数据时,Spark SQL 用

存式的列式存 。 了 存的 ,而 能地 了 中

字 时的数据读 。

Spark SQL 中的 分工作 。

需 Spark 中读 特定的记 , 的方法 读 数据集, 执行

件。 而, Spark SQL 中, 的数据存 读 的记

, 制 件,Spark SQL 中的 制 件 数据存

, 而大大 需要读 的数据。

Spark SQL 的 能 有 , 9-2 所列。

表9-2:Spark SQL中的性能选项

spark.sql.codegen false true时,Spark SQL 会

运行时编译 Java 进制代 。

高大 的 能, 进行

时会

spark.sql.inMemoryColumnarStorage.compressed false 自 存中的列式存 进行 spark.sql.inMemoryColumnarStorage.batchSize 1000 列式 存时的 处理的大 。

大 能会 存 的

spark.sql.parquet.compression.codec snappy 用 编 。 的

uncompressed/snappy/gzip/lzo

用JDBC 和Beeline shell 时, set 能 的

, 9-41 所 。

9-41:打开codegen 的Beeline beeline> set spark.sql.codegen=true;

SET spark.sql.codegen=true spark.sql.codegen=true Time taken: 1.196 seconds

的Spark SQL 应用中, Spark 中 Spark , 9-42 所 。 9-42: Scala 中打开codegen 的代

conf.set("spark.sql.codegen", "true")

的 需要 特 的 量。第 spark.sql.codegen,

Spark SQL 运行 编译 Java 进制代 。由 生成了 运行 定

的代 ,codegen 大 者 的 快。 而, 运行特 快

1 2 的 时 时,codegen 有 能会增加 开 , codegen 需要

编译的 。5codegen 的 能, 所有大 的

者 运行的 中 用codegen。

时 能需要 的第 spark.sql.inMemoryColumnarStorage.batchSize。

存SchemaRDD 时,Spark SQL 会 制定的大 1000 记 分

, 分 。 的 处理大 会 ,而 处理大 大的 ,

次处理的数据 存所能 的大 时, 有 能会 发问题。 中的

记 大 数 字 者 网 大的字 字 , 能需要

处理大 存 OOM 的 。 的 , 的 处

理大 合适的, 1000 记 时 本 法 高的 了。

9.7

, 学 了Spark 用Spark SQL 进行 化和 化数据处理的方式。 了本 的 ,第3 第6 中讲 的 作RDD 的方法同 适用 Spark SQL

中的SchemaRDD。 时 , 会 SQL 的编 合 用, 分

用SQL 的简 和编 辑的 。而 用Spark SQL 时,Spark 执

行 能 据数据的 进行 化, 中 。

5: ,codegen 打开时 开 的 会 , Spark SQL 需要 化它的编译 。所

codegen 的 开 应 运行4 5 。

第 1 0 章

Spark Streaming

应用需要 时处理收 的数据, 用 时 问 计的应用、 学

的应用, 有自 的应用。Spark Streaming Spark 应用而 计的

。它 用 用 和 处理 的API 编 式计算应用, 大

量 用 处理应用的技术 代 。

和Spark RDD 的 ,Spark Streaming 用离散化流 discretized stream 作

, 作DStream。DStream 时 而收 的数据的序列。 , 时 区 收 的数据 作 RDD 存 ,而 DStream 由 RDD 所 成的序列

化 。DStream , Flume、Kafka 者HDFS。

出 的DStream 作, 转化操作 transformation ,会生成 新的 DStream, 输出操作 output operation , 数据 中。DStream

了 RDD 所 的 作 的 作 , 增加了 时 的新 作,

和 处理 序 同,Spark Streaming 应用需要进行 保 24/7 工作。本

会 检查点 checkpointing 制, 数据存 件 HDFS

的 制, Spark Streaming 用 工作的 要方式。 , 会讲

时 应用, 及 应用 自 式。

, Spark 1.1 ,Spark Streaming Java 和 Scala 中 用。 的Python Spark 1.2 中 , 本数据。本 用Java 和 Scala 所有的 API, 的 Python 适用的。

10.1

10-1:Spark Streaming 的 Maven groupId = org.apache.spark artifactId = spark-streaming_2.10 version = 1.2.0

StreamingContext 开 ,它 计算 能的 要 。StreamingContext 会 出SparkContext,用 处理数据。 造 数 收用 定 时 处理 次新数据的批次间隔 batch interval 作 , 它 1 。 ,

用socketTextStream() 出 本地7777 收 的 本数据的DStream。

用socketTextStream() 出 本地7777 收 的 本数据的DStream。

在文檔中 快速大数据分析 (頁 166-0)

相關文件