本 开 所 的 ,Spark SQL 的高 及 加的 Spark
SQL 数据 加高效。
Spark SQL SQL 的用 用的。Spark SQL 有 件的 合 作
, 列进行 9-40 所 。 用Spark SQL 需要 第6 中
的 特 的 进行 作。
9-40:Spark SQL 列 和
SELECT SUM(user.favouritesCount), SUM(retweetCount), user.id FROM tweets GROUP BY user.id
Spark SQL 用 的了解 高效地 数据。 存数据时,Spark SQL 用
存式的列式存 。 了 存的 ,而 能地 了 中
字 时的数据读 。
Spark SQL 中的 分工作 。
需 Spark 中读 特定的记 , 的方法 读 数据集, 执行
件。 而, Spark SQL 中, 的数据存 读 的记
, 制 件,Spark SQL 中的 制 件 数据存
, 而大大 需要读 的数据。
Spark SQL 的 能 有 , 9-2 所列。
表9-2:Spark SQL中的性能选项
spark.sql.codegen false true时,Spark SQL 会
运行时编译 Java 进制代 。
高大 的 能, 进行
时会
spark.sql.inMemoryColumnarStorage.compressed false 自 存中的列式存 进行 spark.sql.inMemoryColumnarStorage.batchSize 1000 列式 存时的 处理的大 。
大 能会 存 的
spark.sql.parquet.compression.codec snappy 用 编 。 的
uncompressed/snappy/gzip/lzo
用JDBC 和Beeline shell 时, set 能 的
, 9-41 所 。
9-41:打开codegen 的Beeline beeline> set spark.sql.codegen=true;
SET spark.sql.codegen=true spark.sql.codegen=true Time taken: 1.196 seconds
的Spark SQL 应用中, Spark 中 Spark , 9-42 所 。 9-42: Scala 中打开codegen 的代
conf.set("spark.sql.codegen", "true")
的 需要 特 的 量。第 spark.sql.codegen,
Spark SQL 运行 编译 Java 进制代 。由 生成了 运行 定
的代 ,codegen 大 者 的 快。 而, 运行特 快
1 2 的 时 时,codegen 有 能会增加 开 , codegen 需要
编译的 。5codegen 的 能, 所有大 的
者 运行的 中 用codegen。
时 能需要 的第 spark.sql.inMemoryColumnarStorage.batchSize。
存SchemaRDD 时,Spark SQL 会 制定的大 1000 记 分
, 分 。 的 处理大 会 ,而 处理大 大的 ,
次处理的数据 存所能 的大 时, 有 能会 发问题。 中的
记 大 数 字 者 网 大的字 字 , 能需要
处理大 存 OOM 的 。 的 , 的 处
理大 合适的, 1000 记 时 本 法 高的 了。
9.7
, 学 了Spark 用Spark SQL 进行 化和 化数据处理的方式。 了本 的 ,第3 第6 中讲 的 作RDD 的方法同 适用 Spark SQL
中的SchemaRDD。 时 , 会 SQL 的编 合 用, 分
用SQL 的简 和编 辑的 。而 用Spark SQL 时,Spark 执
行 能 据数据的 进行 化, 中 。
5: ,codegen 打开时 开 的 会 , Spark SQL 需要 化它的编译 。所
codegen 的 开 应 运行4 5 。
第 1 0 章
Spark Streaming
应用需要 时处理收 的数据, 用 时 问 计的应用、 学
的应用, 有自 的应用。Spark Streaming Spark 应用而 计的
。它 用 用 和 处理 的API 编 式计算应用, 大
量 用 处理应用的技术 代 。
和Spark RDD 的 ,Spark Streaming 用离散化流 discretized stream 作
, 作DStream。DStream 时 而收 的数据的序列。 , 时 区 收 的数据 作 RDD 存 ,而 DStream 由 RDD 所 成的序列
化 。DStream , Flume、Kafka 者HDFS。
出 的DStream 作, 转化操作 transformation ,会生成 新的 DStream, 输出操作 output operation , 数据 中。DStream
了 RDD 所 的 作 的 作 , 增加了 时 的新 作,
。
和 处理 序 同,Spark Streaming 应用需要进行 保 24/7 工作。本
会 检查点 checkpointing 制, 数据存 件 HDFS
的 制, Spark Streaming 用 工作的 要方式。 , 会讲
时 应用, 及 应用 自 式。
, Spark 1.1 ,Spark Streaming Java 和 Scala 中 用。 的Python Spark 1.2 中 , 本数据。本 用Java 和 Scala 所有的 API, 的 Python 适用的。
10.1
10-1:Spark Streaming 的 Maven groupId = org.apache.spark artifactId = spark-streaming_2.10 version = 1.2.0
StreamingContext 开 ,它 计算 能的 要 。StreamingContext 会 出SparkContext,用 处理数据。 造 数 收用 定 时 处理 次新数据的批次间隔 batch interval 作 , 它 1 。 ,
用socketTextStream() 出 本地7777 收 的 本数据的DStream。
DStream filter()进 行 化, error 的行。 , 用 出 作
print() 出 的行打印出 。 10-4 和 10-5 所 。
10-4:用 Scala 进行 式 ,打印出 error 的行 // SparkConf StreamingContext 定1 的 处理大 val ssc = new StreamingContext(conf, Seconds(1))
// 本地 7777 , 用收 的数据 DStream
val lines = ssc.socketTextStream("localhost", 7777) // DStream中 出 字 "error"的行
val errorLines = lines.filter(_.contains("error")) // 打印出有"error"的行
errorLines.print()
10-5:用 Java 进行 式 ,打印出 error 的行 // SparkConf StreamingContext 定1 的 处理大
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// 7777作 DStream
JavaDStream<String> lines = jssc.socketTextStream("localhost", 7777);
// DStream中 出 字 "error"的行
JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() { public Boolean call(String line) {
return line.contains("error");
}});
// 打印出有"error"的行 errorLines.print();
定 了要进行的计算, 收 数据时计算 会开 。要开 收数据,
式 用StreamingContext 的start()方法。 ,Spark Streaming 会开 Spark 作 交 的SparkContext 执行。执行会 中进行,所 需要 用
$ spark-submit --class com.oreilly.learningsparkexamples.scala.StreamingLogInput \
$ASSEMBLY_JAR local[4]
$ nc localhost 7777 # 的行 发
< 处 的 >
Windows 用 用ncat http://nmap.org/ncat/ 代 的nc 。ncat nmap http://nmap.org/ 工具的 分。
会 子加 处理Apache 件。 需要生成 的
, 运行本书Git 中的 本./bin/fakelogs.sh 者./bin/fakelogs.cmd
发 7777 。
10.2
图10-1:Spark Streaming 的高层次架构
讲 ,Spark Streaming 的编 化 , DStream 图10-2 所
DStream, DStream 应用进行转化操作 新的 DStream。DStream 第3 中所讲 的RDD 的 化 作。 ,DStream 有 Time: 1413833674000 ms
---
71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error78978 HTTP/1.1" 404 505 ...
--- Time: 1413833675000 ms
---
71.19.164.174 - - [24/Sep/2014:22:27:10 +0000] "GET /error78978 HTTP/1.1" 404 505 ...
了 化 作 ,DStream 输出操作, 中 用的print()。 出 作
图10-5:Spark Streaming 在 Spark 各组件中的执行过程
Spark Streaming DStream 的 Spark RDD 所 的 : 要
10.3
StreamingContext._ 能 Scala 中 用。和 RDD , Java 中需要 mapToPair()
出 JavaPairDStream 能 用。
表10-1:DStream无状态转化操作的例子(不完整列表)
ds.flatMap(x => x.split(" ")) f: T -> Iterable[U]
filter() 由 定DStream 中
的元 成的DStream。
ds.filter(x => x != 1) f: T -> Boolean
repartition() DStream 的分区数。 ds.repartition(10) N/A
reduceByKey() 次中 同的记 。ds.reduceByKey(
(x, y) => x + y)
f: T, T -> T
groupByKey() 次中的记 据 分 。ds.groupByKey() N/A
需要记 的 , 数 作用 , DStream
10-10: Scala 中 DStream 用map()和reduceByKey()
// ApacheAccessingLog 用 Apache 中解析 的工具
val accessLogDStream = logData.map(line => ApacheAccessLog.parseFromLogLine(line)) val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1))
val ipCountsDStream = ipDStream.reduceByKey((x, y) => x + y) 10-11: Java 中 DStream 用map()和reduceByKey()
// ApacheAccessingLog 用 Apache 中解析 的工具
static final class IpTuple implements PairFunction<ApacheAccessLog, String, Long> { public Tuple2<String, Long> call(ApacheAccessLog log) {
return new Tuple2<>(log.getIpAddress(), 1L);
} }
JavaDStream<ApacheAccessLog> accessLogsDStream = logData.map(new ParseFromLogLine());
JavaPairDStream<String, Long> ipDStream = accessLogsDStream.mapToPair(new IpTuple());
JavaPairDStream<String, Long> ipCountsDStream = ipDStream.reduceByKey(new LongSumReducer());
化 作 能 DStream 合数据, 时 区 。 ,
DStream 有和RDD 的 的 化 作, cogroup()、join()、
leftOuterJoin()等 4.3.3 。 DStream 用 作,
次分 执行了 应的RDD 作。
DStream 用 的 具 子。 10-12 和 10-13 中,
IP 地址 , 计数的数据和 数据量的数据 。
10-12: Scala 中 DStream val ipBytesDStream =
accessLogsDStream.map(entry => (entry.getIpAddress(), entry.getContentSize())) val ipBytesSumDStream =
ipBytesDStream.reduceByKey((x, y) => x + y) val ipBytesRequestCountDStream =
ipCountsDStream.join(ipBytesSumDStream) 10-13: Java 中 DStream
JavaPairDStream<String, Long> ipBytesDStream = accessLogsDStream.mapToPair(new IpContentTuple());
JavaPairDStream<String, Long> ipBytesSumDStream = ipBytesDStream.reduceByKey(new LongSumReducer());
JavaPairDStream<String, Tuple2<Long, Long>> ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream);
的Spark 中 用DStream 的union() 作 它和 DStream
的 合 , 用StreamingContext.union() 合 。
, 化 作 用,DStream 了 作transform()的高
作 , 作 的RDD。 transform() 作 DStream
任 RDD RDD 的 数。 数会 数据 中的 次中 用,生成
新的 。transform()的 应用 用 RDD 的 处理代 。 ,
有 作extractOutliers()的 数,用 记 的RDD 中 出 的
RDD 能 进行 计 , transform()中 用它, 10-14 和
10-15 所 。
10-14: Scala 中 DStream 用transform()
val outlierDStream = accessLogsDStream.transform { rdd =>
extractOutliers(rdd) }
10-15: Java 中 DStream 用transform()
JavaPairDStream<String, Long> ipRawDStream = accessLogsDStream.transform(
new Function<JavaRDD<ApacheAccessLog>, JavaRDD<ApacheAccessLog>>() { public JavaPairRDD<ApacheAccessLog> call(JavaRDD<ApacheAccessLog> rdd) { return extractOutliers(rdd);
} });
StreamingContext.transform DStream.transformWith(otherStream, func) 合 化 DStream。
10.3.2
DStream 的有 化 作 时 区 数据的 作 , 次的数
据 用 新的 次中计算 。 要的 和updateStateByKey(),
者 时 进行 作, 者 用 的 化
代 用 会 的 。
有 化 作需要 的StreamingContext 中打开 制 保 。 会
10.6 中 地 制, 需要 道 作 数
ssc.checkpoint() 打开它, 10-16 所 。
10-16:
ssc.checkpoint("hdfs://...")
进行本地开发时, 用本地路 /tmp 代HDFS。
的 作会 StreamingContext 的 次 的时 , 合
次的 ,计算出 的 。本 会 用 化 作 网络
问 中的 , 的 应代 、 大 , 及 。
所有 的 作 需要 数,分 时 及 , 者 StreamContext 的 次 的 数 。 时 制 次计算 的 次的数据,
的windowDuration/batchInterval 次。 有 10 次 的
DStream,要 30 的时 3 次 , 应 windowDuration
30 。而 的 次 等,用 制 新的DStream 进行计算的
。 DStream 次 10 , 次计算 次 ,
应 20 。图10-6 了 子。
DStream 用的 简 作 window(),它 新的DStream 所
的 作的 数据。 ,window()生成的DStream 中的 RDD 会
次中的数据, 数据进行count()、transform()等 作 10-17 和 10-18 。
网络 数据 有 的数据
大 :3
:2
图10-6:一个基于窗口的流数据,窗口时长为 3 个批次,滑动步长为 2 个批次;每隔 2 个批次就对 前3 个批次的数据进行一次计算
10-17: Scala 中 用window() 进行计数
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10)) val windowCounts = accessLogsWindow.count()
10-18: Java 中 用window() 进行计数
JavaDStream<ApacheAccessLog> accessLogsWindow = accessLogsDStream.window(
Durations.seconds(30), Durations.seconds(10));
JavaDStream<Integer> windowCounts = accessLogsWindow.count();
用window() 出所有的 作,Spark Streaming 了 的
作, 用 高效而方 地 用。 ,reduceByWindow()和reduceByKeyAndWindow()
高效地进行 作。它 收 数, 执
行, +。 ,它 有 特 式, 新进 的数据和 开
的数据, Spark 增量计算 。 特 式需要 数的 数,
+ 应的 数 -。 大的 , 数 大大 高执行效 图10-7 。
网络 数据 进行 网络 数据 进行有
作的
图10-7:普通的reduceByWindow()与使用逆函数的增量式reduceByWindow()的区别
处理的 子中, 用 数 高效地 IP 地址 问量进行计
数, 10-19 和 10-20 所 。
10-19:Scala 版本的 IP 地址的 问量计数
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow(
{(x, y) => x + y}, // 加 新进 的 次中的元 {(x, y) => x - y}, // 开 的 次中的元 Seconds(30), // 时
Seconds(10)) //
10-20:Java 版本的 IP 地址的 问量计数
class ExtractIp extends PairFunction<ApacheAccessLog, String, Long> { public Tuple2<String, Long> call(ApacheAccessLog entry) {
return new Tuple2(entry.getIpAddress(), 1L);
} }
class AddLongs extends Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) { return v1 + v2; } }
class SubtractLongs extends Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) { return v1 - v2; } }
JavaPairDStream<String, Long> ipAddressPairDStream = accessLogsDStream.mapToPair(
new ExtractIp());
JavaPairDStream<String, Long> ipCountDStream = ipAddressPairDStream.
reduceByKeyAndWindow(
new AddLongs(), // 加 新进 的 次中的元 new SubtractLongs()
// 开 的 次中的元
Durations.seconds(30), // 时 Durations.seconds(10)); //
,DStream 了countByWindow()和countByValueAndWindow()作 数 据 进 行
计数 作的简 。countByWindow() 中元 数的DStream,而
countByValueAndWindow() 的DStream 中 的 数, 10-21 和
10-22 所 。
10-21:Scala 中的 计数 作
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10)) 10-22:Java 中的 计数 作
JavaDStream<String> ip = accessLogsDStream.map(
new Function<ApacheAccessLog, String>() { public String call(ApacheAccessLog entry) { return entry.getIpAddress();
}});
JavaDStream<Long> requestCount = accessLogsDStream.countByWindow(
Dirations.seconds(30), Durations.seconds(10));
JavaPairDStream<String, Long> ipAddressRequestCount = ip.countByValueAndWindow(
Dirations.seconds(30), Durations.seconds(10));
UpdateStateByKey
updateStateByKey() 用 问的10 。 列
, 会 件 时 新 。
要 用updateStateByKey(), 了 update(events, oldState) 数, 收
的 件 及 应的 , 应的新 。 数的 所 。
• events: 次中收 的 件的列 能 。
• oldState: 的 ,存 Option 有 的 ,
。
• newState:由 数 , Option 式存 的Option
要 。
10-23: Scala 中 用updateStateByKey()运行 应代 的计数 def updateRunningSum(values: Seq[Long], state: Option[Long]) = { Some(state.getOrElse(0L) + values.size)
}
val responseCodeDStream = accessLogsDStream.map(log => (log.getResponseCode(), 1L)) val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _) 10-24: Java 中 用updateStateByKey()运行 应代 的计数
class UpdateRunningSum implements Function2<List<Long>, Optional<Long>, Optional<Long>> {
public Optional<Long> call(List<Long> nums, Optional<Long> current) { long sum = current.or(0L);
return Optional.of(sum + nums.size());
} };
JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogsDStream.mapToPair(
new PairFunction<ApacheAccessLog, Integer, Long>() { public Tuple2<Integer, Long> call(ApacheAccessLog log) { return new Tuple2(log.getResponseCode(), 1L);
}})
.updateStateByKey(new UpdateRunningSum());
10.4
了 序, 用 出 作 保存 了。Spark Streaming DStream 有
,Spark Streaming 有 的saveAsSequenceFile() 数, 用 10-26 和 10-27 中的方法 保存 SequenceFile 件。
10-26: Scala 中 DStream 保存 SequenceFile
val writableIpAddressRequestCount = ipAddressRequestCount.map { (ip, count) => (new Text(ip), new LongWritable(count)) } writableIpAddressRequestCount.saveAsHadoopFiles[
SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt") 10-27: Java 中 DStream 保存 SequenceFile
JavaPairDStream<Text, LongWritable> writableDStream = ipDStream.mapToPair(
new PairFunction<Tuple2<String, Long>, Text, LongWritable>() { public Tuple2<Text, LongWritable> call(Tuple2<String, Long> e) { return new Tuple2(new Text(e._1()), new LongWritable(e._2()));
}});
class OutFormat extends SequenceFileOutputFormat<Text, LongWritable> {};
writableDStream.saveAsHadoopFiles(
"outputDir", "txt", Text.class, LongWritable.class, OutFormat.class);
, 有 用的 出 作foreachRDD(),它用 DStream 中的 RDD 运行任 计 ipAddressRequestCount.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
// 打开 存 的 数据 的
10.5
Spark Streaming 生 同的数据 。 核心 数据 打 Spark Streaming 的 Maven 工件中,而 的 spark-streaming-kafka等 加工件
。
本 会 分数据 进行 。 定 了 数据 , 会
中 Spark 的 件。 计 新的应用, 用HDFS
Kafka 简 的 开 。
10.5.1 心数据
所有用 核心数据 DStream 的方法 StreamingContext 中。
中用 中 : 字。 : 件和Akka actor。
1.
Spark 任 Hadoop 的 件 中读 数据,所 Spark Streaming
任 Hadoop 的 件 中的 件 数据 。由 , 方
式 用, 要 制 HDFS 的数据。要 Spark Streaming
处理数据, 需要 字 的 式, 件 原子化
件 Spark 的 。2 10-4 和 10-5 处理新出 的
件, 10-29 和 10-30 所 。 10-29:用 Scala 读 中的 本 件
val logData = ssc.textFileStream(logDirectory) 10-30:用 Java 读 中的 本 件
JavaDStream<String> logData = jssc.textFileStream(logsDirectory);
用所 的./bin/fakelogs_directory.sh 本 造出 。 有 数据
的 , 用mv 件 所 的 中。
了 本数据, 读 任 Hadoop 式。 5.2.6 所讲的 , 需要
Key、Value 及InputFormat Spark Streaming 。 , 有了
处理作 处理 , 的 时 区 的数据分 存 成了
SequenceFile, 10-31 中所 的 读 数据。
2: 子化 作 次 成。 Spark Streaming 要处理 件时, 的数据出 了,Spark
Streaming 会 法 新 加的数据, 子化 要。 件 中, 件
作 子化的。
10-31:用 Scala 读 中的SequenceFile ssc.fileStream[LongWritable, IntWritable,
SequenceFileInputFormat[LongWritable, IntWritable]](inputDirectory).map { case (x, y) => (x.get(), y.get())
} 2. Akka actor
核心数据 收 actorStream,它 Akka actor http://akka.io/ 作 数据
的 。要 出 actor ,需要 Akka actor, org.apache.spark.
streaming.receiver.ActorHelper 。要 数据 actor 制 Spark Streaming 中,
需要 收 新数据时 用actor 的store() 数。Akka actor ,所 会 进行 。 阅读 计算的 http://spark.apache.org/docs/latest/streaming-custom-receivers.html 及Spark 中 的 ActorWordCount https://github.com/apache/spark/
blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount. Kafka、Amazon Kinesis、Apache Flume, 及 ZeroMQ。 加 Spark 版本 的Maven 工件spark-streaming-[projectname]_2.10 加 收 。 1. Apache Kafka
Apache Kafka http://kafka.apache.org/ 速 成 了 行的 。 用 Kafka 生的 , 处理 题的 。 工 中需要 Maven 工件
spark-streaming-kafka_2.10 用它。 的KafkaUtils StreamingContext 和
JavaStreamingContext 中 的Kafka 出DStream。由 KafkaUtils 阅 题, 它 出的DStream 由成 的 题和 成。要 出 数据,需 要 用StreamingContext 、 由 号 开的ZooKeeper 列 字 、 者
的 字 字 , 及 题 题的 收 数的 用
createStream()方法 10-32 和 10-33 所 。
10-32: Scala 中用 Apache Kafka 阅Panda 题 import org.apache.spark.streaming.kafka._
...
// 题 收 数的
val topics = List(("pandas", 1), ("logs", 1)).toMap
val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics) StreamingLogInput.processLines(topicLines.map(_._2))
10-33: Java 中用 Apache Kafka 阅Panda 题 import org.apache.spark.streaming.kafka.*;
10-33: Java 中用 Apache Kafka 阅Panda 题 import org.apache.spark.streaming.kafka.*;