• 沒有找到結果。

Beeline

在文檔中 快速大数据分析 (頁 175-0)

9.4  JDBC/ODBC

9.4.1 Beeline

Beeline 中, 用 的HiveQL 、列 及 数据 。

Hive 册 https://cwiki.apache.org/confluence/display/Hive/LanguageManual 中

HiveQL 的所有 法 , 的 作。

,要 本地数据 张数据 , 用CREATE TABLE 。 用LOAD DATA

进行数据读 。Hive 读 带有 定分 的 本 件, CSV 等 式的 件,

9-33 所 。 9-33:读 数据

> CREATE TABLE IF NOT EXISTS mytable (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY , ;

> LOAD DATA LOCAL INPATH learning-spark-examples/files/int_string.csv INTO TABLE mytable;

要 列 数 据 , 用SHOW TABLES 9-34 所 。 DESCRIBE

tableName 张 的 。

9-34:列 数据

> SHOW TABLES;

mytable

Time taken: 0.052 seconds

要 存数据 , 用CACHE TABLE tableName 。 存 用UNCACHE

TABLE tableName 的 存。需要 的 , , 存的 会

JDBC 的所有 共 。

, Beeline 中 计 简 , 运行EXPLAIN , 9-35 所 。

9-35:Spark SQL shell 执行EXPLAIN

spark-sql> EXPLAIN SELECT * FROM mytable where key = 1;

== Physical Plan ==

Filter (key#16 = 1)

HiveTableScan [key#16,value#17], (MetastoreRelation default, mytable, None), None Time taken: 0.551 seconds

计 ,Spark SQL HiveTableScan 用了 作。

, SQL 数据进行 。Beeline shell 用 共 的

存数据 进行快速的数据 有用的。

9.4.2

用Spark SQL 的 JDBC 的 同 序 共 存

的数据 。JDBC Thrift 序, 共 成 了 能。

中所 , 需要 册 数据 运行CACHE , 用 存了。

Spark SQL shell

了JDBC ,Spark SQL 作 的进 用的简

shell, ./bin/spark-sql 。 shell 会

conf/hive-site.xml 中的 Hive 的元数据 。 存 的元数据 ,Spark SQL

会 本地新 。 本 要 本地开发 有用。 共 的集

, 应 用JDBC , 用 beeline进行 。

9.5 数

用 自定义 数, UDF, 用Python/Java/Scala 册自定义 数, SQL

中 用。 方法 用, 用 的SQL 用 高 能 ,

用 用 册的 数而 需自 编 了。 Spark SQL 中,编

UDF 简 。Spark SQL 有自 的UDF , 有的Apache Hive UDF。

9.5.1 Spark SQL UDF

用Spark 的编 编 数, Spark SQL 的方法 进

, 捷地 册 自 的UDF。 Scala 和 Python 中, 用 生的 数和 lambda 法的 ,而 Java 中, 需要 应的UDF 。UDF 能 数据

, 用时的 数 。

Python 和 Java 中, 需要用 9-1 中列出的 SchemaRDD 应的 定

。Java 中 的 应 org.apache.spark.sql.api.java.DataType中 , 而 Python 中 需要 DataType 。

9-36 和 9-37 中, 用 计算字 的 简 的UDF,

用它 计算 的 。

9-36:Python 版本 字 UDF

# 字 的UDF

hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType()) lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10") 9-37:Scala 版本的字 UDF

registerFunction("strLenScala", (_: String).length)

val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10") Java 中定义 UDF 需要 的import 。和 定义RDD 数时 , 据

hiveCtx.udf().register("stringLengthJava", new UDF1<String, Integer>() { @Override

public Integer call(String str) throws Exception { return str.length();

}

}, DataTypes.IntegerType);

SchemaRDD tweetLength = hiveCtx.sql(

"SELECT stringLengthJava('text') FROM tweets LIMIT 10");

List<Row> lengths = tweetLength.collect();

for (Row row : result) {

System.out.println(row.get(0));

} Hive UDF, 需 用hiveCtx.sql("CREATE TEMPORARY FUNCTION name AS class.function")。

9.6 Spark SQL

本 开 所 的 ,Spark SQL 的高 及 加的 Spark

SQL 数据 加高效。

Spark SQL SQL 的用 用的。Spark SQL 有 件的 合 作

, 列进行 9-40 所 。 用Spark SQL 需要 第6 中

的 特 的 进行 作。

9-40:Spark SQL 列 和

SELECT SUM(user.favouritesCount), SUM(retweetCount), user.id FROM tweets GROUP BY user.id

Spark SQL 用 的了解 高效地 数据。 存数据时,Spark SQL 用

存式的列式存 。 了 存的 ,而 能地 了 中

字 时的数据读 。

Spark SQL 中的 分工作 。

需 Spark 中读 特定的记 , 的方法 读 数据集, 执行

件。 而, Spark SQL 中, 的数据存 读 的记

, 制 件,Spark SQL 中的 制 件 数据存

, 而大大 需要读 的数据。

Spark SQL 的 能 有 , 9-2 所列。

表9-2:Spark SQL中的性能选项

spark.sql.codegen false true时,Spark SQL 会

运行时编译 Java 进制代 。

高大 的 能, 进行

时会

spark.sql.inMemoryColumnarStorage.compressed false 自 存中的列式存 进行 spark.sql.inMemoryColumnarStorage.batchSize 1000 列式 存时的 处理的大 。

大 能会 存 的

spark.sql.parquet.compression.codec snappy 用 编 。 的

uncompressed/snappy/gzip/lzo

用JDBC 和Beeline shell 时, set 能 的

, 9-41 所 。

9-41:打开codegen 的Beeline beeline> set spark.sql.codegen=true;

SET spark.sql.codegen=true spark.sql.codegen=true Time taken: 1.196 seconds

的Spark SQL 应用中, Spark 中 Spark , 9-42 所 。 9-42: Scala 中打开codegen 的代

conf.set("spark.sql.codegen", "true")

的 需要 特 的 量。第 spark.sql.codegen,

Spark SQL 运行 编译 Java 进制代 。由 生成了 运行 定

的代 ,codegen 大 者 的 快。 而, 运行特 快

1 2 的 时 时,codegen 有 能会增加 开 , codegen 需要

编译的 。5codegen 的 能, 所有大 的

者 运行的 中 用codegen。

时 能需要 的第 spark.sql.inMemoryColumnarStorage.batchSize。

存SchemaRDD 时,Spark SQL 会 制定的大 1000 记 分

, 分 。 的 处理大 会 ,而 处理大 大的 ,

次处理的数据 存所能 的大 时, 有 能会 发问题。 中的

记 大 数 字 者 网 大的字 字 , 能需要

处理大 存 OOM 的 。 的 , 的 处

理大 合适的, 1000 记 时 本 法 高的 了。

9.7

, 学 了Spark 用Spark SQL 进行 化和 化数据处理的方式。 了本 的 ,第3 第6 中讲 的 作RDD 的方法同 适用 Spark SQL

中的SchemaRDD。 时 , 会 SQL 的编 合 用, 分

用SQL 的简 和编 辑的 。而 用Spark SQL 时,Spark 执

行 能 据数据的 进行 化, 中 。

5: ,codegen 打开时 开 的 会 , Spark SQL 需要 化它的编译 。所

codegen 的 开 应 运行4 5 。

第 1 0 章

Spark Streaming

应用需要 时处理收 的数据, 用 时 问 计的应用、 学

的应用, 有自 的应用。Spark Streaming Spark 应用而 计的

。它 用 用 和 处理 的API 编 式计算应用, 大

量 用 处理应用的技术 代 。

和Spark RDD 的 ,Spark Streaming 用离散化流 discretized stream 作

, 作DStream。DStream 时 而收 的数据的序列。 , 时 区 收 的数据 作 RDD 存 ,而 DStream 由 RDD 所 成的序列

化 。DStream , Flume、Kafka 者HDFS。

出 的DStream 作, 转化操作 transformation ,会生成 新的 DStream, 输出操作 output operation , 数据 中。DStream

了 RDD 所 的 作 的 作 , 增加了 时 的新 作,

和 处理 序 同,Spark Streaming 应用需要进行 保 24/7 工作。本

会 检查点 checkpointing 制, 数据存 件 HDFS

的 制, Spark Streaming 用 工作的 要方式。 , 会讲

时 应用, 及 应用 自 式。

, Spark 1.1 ,Spark Streaming Java 和 Scala 中 用。 的Python Spark 1.2 中 , 本数据。本 用Java 和 Scala 所有的 API, 的 Python 适用的。

10.1

10-1:Spark Streaming 的 Maven groupId = org.apache.spark artifactId = spark-streaming_2.10 version = 1.2.0

StreamingContext 开 ,它 计算 能的 要 。StreamingContext 会 出SparkContext,用 处理数据。 造 数 收用 定 时 处理 次新数据的批次间隔 batch interval 作 , 它 1 。 ,

用socketTextStream() 出 本地7777 收 的 本数据的DStream。

DStream filter()进 行 化, error 的行。 , 用 出 作

print() 出 的行打印出 。 10-4 和 10-5 所 。

10-4:用 Scala 进行 式 ,打印出 error 的行 // SparkConf StreamingContext 定1 的 处理大 val ssc = new StreamingContext(conf, Seconds(1))

// 本地 7777 , 用收 的数据 DStream

val lines = ssc.socketTextStream("localhost", 7777) // DStream中 出 字 "error"的行

val errorLines = lines.filter(_.contains("error")) // 打印出有"error"的行

errorLines.print()

10-5:用 Java 进行 式 ,打印出 error 的行 // SparkConf StreamingContext 定1 的 处理大

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

// 7777作 DStream

JavaDStream<String> lines = jssc.socketTextStream("localhost", 7777);

// DStream中 出 字 "error"的行

JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() { public Boolean call(String line) {

return line.contains("error");

}});

// 打印出有"error"的行 errorLines.print();

定 了要进行的计算, 收 数据时计算 会开 。要开 收数据,

式 用StreamingContext 的start()方法。 ,Spark Streaming 会开 Spark 作 交 的SparkContext 执行。执行会 中进行,所 需要 用

$ spark-submit --class com.oreilly.learningsparkexamples.scala.StreamingLogInput \

$ASSEMBLY_JAR local[4]

$ nc localhost 7777 # 的行 发

< 处 的 >

Windows 用 用ncat http://nmap.org/ncat/ 代 的nc 。ncat nmap http://nmap.org/ 工具的 分。

会 子加 处理Apache 件。 需要生成 的

, 运行本书Git 中的 本./bin/fakelogs.sh 者./bin/fakelogs.cmd

发 7777 。

10.2

图10-1:Spark Streaming 的高层次架构

讲 ,Spark Streaming 的编 化 , DStream 图10-2 所

DStream, DStream 应用进行转化操作 新的 DStream。DStream 第3 中所讲 的RDD 的 化 作。 ,DStream 有 Time: 1413833674000 ms

---

71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error78978 HTTP/1.1" 404 505 ...

--- Time: 1413833675000 ms

---

71.19.164.174 - - [24/Sep/2014:22:27:10 +0000] "GET /error78978 HTTP/1.1" 404 505 ...

了 化 作 ,DStream 输出操作, 中 用的print()。 出 作

图10-5:Spark Streaming 在 Spark 各组件中的执行过程

Spark Streaming DStream 的 Spark RDD 所 的 : 要

10.3

StreamingContext._ 能 Scala 中 用。和 RDD , Java 中需要 mapToPair()

出 JavaPairDStream 能 用。

表10-1:DStream无状态转化操作的例子(不完整列表)

ds.flatMap(x => x.split(" ")) f: T -> Iterable[U]

filter() 由 定DStream 中

的元 成的DStream。

ds.filter(x => x != 1) f: T -> Boolean

repartition() DStream 的分区数。 ds.repartition(10) N/A

reduceByKey() 次中 同的记 。ds.reduceByKey(

(x, y) => x + y)

f: T, T -> T

groupByKey() 次中的记 据 分 。ds.groupByKey() N/A

需要记 的 , 数 作用 , DStream

10-10: Scala 中 DStream 用map()和reduceByKey()

// ApacheAccessingLog 用 Apache 中解析 的工具

val accessLogDStream = logData.map(line => ApacheAccessLog.parseFromLogLine(line)) val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1))

val ipCountsDStream = ipDStream.reduceByKey((x, y) => x + y) 10-11: Java 中 DStream 用map()和reduceByKey()

// ApacheAccessingLog 用 Apache 中解析 的工具

static final class IpTuple implements PairFunction<ApacheAccessLog, String, Long> { public Tuple2<String, Long> call(ApacheAccessLog log) {

return new Tuple2<>(log.getIpAddress(), 1L);

} }

JavaDStream<ApacheAccessLog> accessLogsDStream = logData.map(new ParseFromLogLine());

JavaPairDStream<String, Long> ipDStream = accessLogsDStream.mapToPair(new IpTuple());

JavaPairDStream<String, Long> ipCountsDStream = ipDStream.reduceByKey(new LongSumReducer());

化 作 能 DStream 合数据, 时 区 。 ,

DStream 有和RDD 的 的 化 作, cogroup()、join()、

leftOuterJoin()等 4.3.3 。 DStream 用 作,

次分 执行了 应的RDD 作。

DStream 用 的 具 子。 10-12 和 10-13 中,

IP 地址 , 计数的数据和 数据量的数据 。

10-12: Scala 中 DStream val ipBytesDStream =

accessLogsDStream.map(entry => (entry.getIpAddress(), entry.getContentSize())) val ipBytesSumDStream =

ipBytesDStream.reduceByKey((x, y) => x + y) val ipBytesRequestCountDStream =

ipCountsDStream.join(ipBytesSumDStream) 10-13: Java 中 DStream

JavaPairDStream<String, Long> ipBytesDStream = accessLogsDStream.mapToPair(new IpContentTuple());

JavaPairDStream<String, Long> ipBytesSumDStream = ipBytesDStream.reduceByKey(new LongSumReducer());

JavaPairDStream<String, Tuple2<Long, Long>> ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream);

的Spark 中 用DStream 的union() 作 它和 DStream

的 合 , 用StreamingContext.union() 合 。

, 化 作 用,DStream 了 作transform()的高

作 , 作 的RDD。 transform() 作 DStream

任 RDD RDD 的 数。 数会 数据 中的 次中 用,生成

新的 。transform()的 应用 用 RDD 的 处理代 。 ,

有 作extractOutliers()的 数,用 记 的RDD 中 出 的

RDD 能 进行 计 , transform()中 用它, 10-14 和

10-15 所 。

10-14: Scala 中 DStream 用transform()

val outlierDStream = accessLogsDStream.transform { rdd =>

extractOutliers(rdd) }

10-15: Java 中 DStream 用transform()

JavaPairDStream<String, Long> ipRawDStream = accessLogsDStream.transform(

new Function<JavaRDD<ApacheAccessLog>, JavaRDD<ApacheAccessLog>>() { public JavaPairRDD<ApacheAccessLog> call(JavaRDD<ApacheAccessLog> rdd) { return extractOutliers(rdd);

} });

StreamingContext.transform DStream.transformWith(otherStream, func) 合 化 DStream。

10.3.2

DStream 的有 化 作 时 区 数据的 作 , 次的数

据 用 新的 次中计算 。 要的 和updateStateByKey(),

者 时 进行 作, 者 用 的 化

代 用 会 的 。

有 化 作需要 的StreamingContext 中打开 制 保 。 会

10.6 中 地 制, 需要 道 作 数

ssc.checkpoint() 打开它, 10-16 所 。

10-16:

ssc.checkpoint("hdfs://...")

进行本地开发时, 用本地路 /tmp 代HDFS。

的 作会 StreamingContext 的 次 的时 , 合

次的 ,计算出 的 。本 会 用 化 作 网络

问 中的 , 的 应代 、 大 , 及 。

所有 的 作 需要 数,分 时 及 , 者 StreamContext 的 次 的 数 。 时 制 次计算 的 次的数据,

的windowDuration/batchInterval 次。 有 10 次 的

DStream,要 30 的时 3 次 , 应 windowDuration

30 。而 的 次 等,用 制 新的DStream 进行计算的

。 DStream 次 10 , 次计算 次 ,

应 20 。图10-6 了 子。

DStream 用的 简 作 window(),它 新的DStream 所

的 作的 数据。 ,window()生成的DStream 中的 RDD 会

次中的数据, 数据进行count()、transform()等 作 10-17 和 10-18 。

网络 数据 有 的数据

大 :3

:2

图10-6:一个基于窗口的流数据,窗口时长为 3 个批次,滑动步长为 2 个批次;每隔 2 个批次就对 前3 个批次的数据进行一次计算

10-17: Scala 中 用window() 进行计数

val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10)) val windowCounts = accessLogsWindow.count()

10-18: Java 中 用window() 进行计数

JavaDStream<ApacheAccessLog> accessLogsWindow = accessLogsDStream.window(

Durations.seconds(30), Durations.seconds(10));

JavaDStream<Integer> windowCounts = accessLogsWindow.count();

用window() 出所有的 作,Spark Streaming 了 的

作, 用 高效而方 地 用。 ,reduceByWindow()和reduceByKeyAndWindow()

高效地进行 作。它 收 数, 执

行, +。 ,它 有 特 式, 新进 的数据和 开

的数据, Spark 增量计算 。 特 式需要 数的 数,

+ 应的 数 -。 大的 , 数 大大 高执行效 图10-7 。

网络 数据 进行 网络 数据 进行有

作的

图10-7:普通的reduceByWindow()与使用逆函数的增量式reduceByWindow()的区别

处理的 子中, 用 数 高效地 IP 地址 问量进行计

数, 10-19 和 10-20 所 。

10-19:Scala 版本的 IP 地址的 问量计数

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow(

{(x, y) => x + y}, // 加 新进 的 次中的元 {(x, y) => x - y}, // 开 的 次中的元 Seconds(30), // 时

Seconds(10)) //

10-20:Java 版本的 IP 地址的 问量计数

class ExtractIp extends PairFunction<ApacheAccessLog, String, Long> { public Tuple2<String, Long> call(ApacheAccessLog entry) {

return new Tuple2(entry.getIpAddress(), 1L);

} }

class AddLongs extends Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) { return v1 + v2; } }

class SubtractLongs extends Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) { return v1 - v2; } }

JavaPairDStream<String, Long> ipAddressPairDStream = accessLogsDStream.mapToPair(

new ExtractIp());

JavaPairDStream<String, Long> ipCountDStream = ipAddressPairDStream.

reduceByKeyAndWindow(

new AddLongs(), // 加 新进 的 次中的元 new SubtractLongs()

// 开 的 次中的元

Durations.seconds(30), // 时 Durations.seconds(10)); //

,DStream 了countByWindow()和countByValueAndWindow()作 数 据 进 行

计数 作的简 。countByWindow() 中元 数的DStream,而

countByValueAndWindow() 的DStream 中 的 数, 10-21 和

10-22 所 。

10-21:Scala 中的 计数 作

val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}

val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10)) 10-22:Java 中的 计数 作

JavaDStream<String> ip = accessLogsDStream.map(

new Function<ApacheAccessLog, String>() { public String call(ApacheAccessLog entry) { return entry.getIpAddress();

}});

JavaDStream<Long> requestCount = accessLogsDStream.countByWindow(

Dirations.seconds(30), Durations.seconds(10));

JavaPairDStream<String, Long> ipAddressRequestCount = ip.countByValueAndWindow(

Dirations.seconds(30), Durations.seconds(10));

UpdateStateByKey

updateStateByKey() 用 问的10 。 列

, 会 件 时 新 。

要 用updateStateByKey(), 了 update(events, oldState) 数, 收

的 件 及 应的 , 应的新 。 数的 所 。

• events: 次中收 的 件的列 能 。

oldState: 的 ,存 Option 有 的 ,

newState:由 数 , Option 式存 的Option

要 。

10-23: Scala 中 用updateStateByKey()运行 应代 的计数 def updateRunningSum(values: Seq[Long], state: Option[Long]) = { Some(state.getOrElse(0L) + values.size)

}

val responseCodeDStream = accessLogsDStream.map(log => (log.getResponseCode(), 1L)) val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _) 10-24: Java 中 用updateStateByKey()运行 应代 的计数

class UpdateRunningSum implements Function2<List<Long>, Optional<Long>, Optional<Long>> {

public Optional<Long> call(List<Long> nums, Optional<Long> current) { long sum = current.or(0L);

return Optional.of(sum + nums.size());

} };

JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogsDStream.mapToPair(

new PairFunction<ApacheAccessLog, Integer, Long>() { public Tuple2<Integer, Long> call(ApacheAccessLog log) { return new Tuple2(log.getResponseCode(), 1L);

}})

.updateStateByKey(new UpdateRunningSum());

10.4

了 序, 用 出 作 保存 了。Spark Streaming DStream 有

,Spark Streaming 有 的saveAsSequenceFile() 数, 用 10-26 和 10-27 中的方法 保存 SequenceFile 件。

10-26: Scala 中 DStream 保存 SequenceFile

val writableIpAddressRequestCount = ipAddressRequestCount.map { (ip, count) => (new Text(ip), new LongWritable(count)) } writableIpAddressRequestCount.saveAsHadoopFiles[

SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt") 10-27: Java 中 DStream 保存 SequenceFile

JavaPairDStream<Text, LongWritable> writableDStream = ipDStream.mapToPair(

new PairFunction<Tuple2<String, Long>, Text, LongWritable>() { public Tuple2<Text, LongWritable> call(Tuple2<String, Long> e) { return new Tuple2(new Text(e._1()), new LongWritable(e._2()));

}});

class OutFormat extends SequenceFileOutputFormat<Text, LongWritable> {};

writableDStream.saveAsHadoopFiles(

"outputDir", "txt", Text.class, LongWritable.class, OutFormat.class);

, 有 用的 出 作foreachRDD(),它用 DStream 中的 RDD 运行任 计 ipAddressRequestCount.foreachRDD { rdd =>

rdd.foreachPartition { partition =>

rdd.foreachPartition { partition =>

在文檔中 快速大数据分析 (頁 175-0)

相關文件