5.5 数据
5.5.4 Elasticsearch
Spark 用Elasticsearch-Hadoop https://github.com/elastic/elasticsearch-hadoop Elasticsearch 中读 数据。Elasticsearch 开 的、 Lucene 的 。
Elasticsearch 和 的 大 ,它会 的路 ,
而 SparkContext 中 的 。Elasticsearch 的OutputFormat 有用 Spark 所 的 ,所 用saveAsHadoopDataSet 代 , 需要
5-46: Scala 中 用 Elasticsearch 出
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.
mr.EsOutputFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets") jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-")) output.saveAsHadoopDataset(jobConf)
5-47: Scala 中 用 Elasticsearch
def mapWritableToInput(in: MapWritable): Map[String, String] = { in.map{case (k, v) => (k.toString, v.toString)}.toMap
}
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1)) jobConf.set(ConfigurationOptions.ES_NODES, args(2)) val currentTweets = sc.hadoopRDD(jobConf,
classOf[EsInputFormat[Object, MapWritable]], classOf[Object], classOf[MapWritable])
// map
// MapWritable[Text, Text] Map[String, String]
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }
和 ,Elasticsearch 有 , 作 的 有效 。
出而 ,Elasticsearch 进行 , 尔会 出 的
数据 , 要存 字 的数据 , 定
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html 。
5.6
本 , 应 能 数据读 Spark 中, 计算 所 的方
式存 。 了数据 用的 同 式, 及它 应的数
据处理的方式。 掌握了读 和保存大 数据集的方法, 会
用 编 高效 大的Spark 序的方法。
第 6 章
Spark编程进阶
6.1
本 有 及的Spark 编 的 进 特 ,会 的共 量:
累加器 accumulator 广播变量 broadcast variable 。 加 用 进行 合,而 量用 高效分发 大的 。 有的RDD 化 作的 ,
数据 需要 大 代价的任 了 作。 了 用的工具 ,本 会
Spark 序交互的方式, 用R 编 的 本进行交互。
本 会 用 电 作者的 作 , 出 的 应用。
的 的 号。 号 由 分 的, 有自 的 号号 ,
所 据 号 应的 。有 作者的地理 ,用
定 。 6-1 了 。本书的 代 中 需要
中 进行处理的 号列 。
6-1: JSON 式的 , 中 字
{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",
"contactlat":"37.384733","contactlong":"-122.032164",
"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",
"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}
要用 的第 Spark 特 共 量。共 量 Spark 任 中 用的特
的 量。 中, 用Spark 共 量 的 进行计数,
及分发 张 大的 。
任 需要 时 进行 , 需要 数据 者 数生成 时, 数 file = sc.textFile(inputFile)
# Accumulator[Int] 化 0 blankLines = sc.accumulator(0) def extractCallSigns(line):
global blankLines # 问 量 if (line == ""):
blankLines += 1 return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns") print "Blank lines: %d" % blankLines.value
6-3: Scala 中 加 行 val sc = new SparkContext(...) val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) // Accumulator[Int] 化 0 val callSigns = file.flatMap(line => {
if (line == "") { println("Blank lines: " + blankLines.value) 6-4: Java 中 加 行
JavaRDD<String> rdd = sc.textFile(args[1]);
final Accumulator<Integer> blankLines = sc.accumulator(0);
JavaRDD<String> callSigns = rdd.flatMap(
new FlatMapFunction<String, String>() { public Iterable<String> call(String line) { if (line.equals("")) {
blankLines.add(1);
}
return Arrays.asList(line.split(" "));
}});
callSigns.saveAsTextFile("output.txt")
System.out.println("Blank lines: "+ blankLines.value());
中, 了 作blankLines的Accumulator[Int] ,
中 行时 加1。执行 化 作 , 打印出 加 中的 。 ,
• 中 用SparkContext.accumulator(initialValue)方 法, 出 存 有
的 加 。 org.apache.spark.Accumulator[T] , 中T
initialValue的 。
的计数 时 方 , 有 需要 时, 者 需要
validSignCount = sc.accumulator(0) invalidSignCount = sc.accumulator(0) def validateSign(sign):
global validSignCount, invalidSignCount
if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):
validSignCount += 1
validSigns = callSigns.filter(validateSign)
contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x + y)
# 制 计算计数
contactCount.count()
if invalidSignCount.value < 0.1 * validSignCount.value:
contactCount.saveAsTextFile(outputDir + "/contactCount") else:
print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.
value)
加 要 处理 ,对于要在行动操作中使用的累加器,Spark
只会把每个任务对各累加器的修改应用一次。 , 要 计
算时 的 加 , 它 foreach() 的行 作中。
对于在RDD 转化操作中使用的累加器,就不能保证有这种情况了。 化 作中 加
能会发生 次 新。 子, 存 有 用的RDD 第
次 LRU 存中 新用 时, 的 次 新 会发生。 会 制
RDD 据 进行 算,而 作用 会 中的 化 作 的 加 进行
新, 次发 中。 化 作中, 加 用 的。
版本的Spark 能会 行 成 新 次 加 的 , 版本
1.2.0 会进行 次 新, 化 作中的 加 时 用。
6.2.2
, 学 了 用加法 作Spark 的 加 Accumulator[Int] 。 Spark Double、Long和Float 的 加 。 ,Spark 了自定义
加 和 合 作的API 要 加的 中的 大 ,而 加 。自
定义 加 需要 AccumulatorParam, Spark API http://spark.apache.org/docs/
latest/api/scala/index.html#package 中有所 。 要 作同时 交 和 合 ,
用任 作 代 数 的加法。 了 和, 数据的 大 。
任 的a 和 b,有 a op b = b op a, 作op 交 。 任 的a、b 和 c,有 (a op b) op = a op (b op ), 作op
合 。 ,sum和max 交 合 , Spark 加 中
的 用 作。
6.3
Spark 的第 共 量 广播变量,它 序高效地 所有工作 发
大的 读 , Spark 作 用。 , 的应用需要 所有 发
大的 读 , 学 算法中的 大的特 量, 量用
。
,Spark 会自 中所有 用 的 量发 工作 。 方 ,
效。 有 : , 的任 发 制 任 进行 化的 次,
能会 多个 行 作中 用同 量, Spark 会 作分 发 。
子, 要 Spark 序, 号的 应的 。 的
, 由 用 自的 号 ,所 方法 行的。 用 Spark , 代 6-6 所 。
6-6: Python 中
# RDD contactCounts中的 号的 应 。 号
# 读 代 进行
signPrefixes = loadCallSignTable()
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes) count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts
.map(processSignCount)
signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts
.map(processSignCount)
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
val country = lookupInArray(sign, signPrefixes.value) (country, count)
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());
JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
new PairFunction<Tuple2<String, Integer>, String, Integer> (){
public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) { String sign = callSignCount._1();
String country = lookupCountry(sign, callSignInfo.value());
return new Tuple2(country, callSignCount._2());
}}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
所 , 用 量的 简 。
(1) T的 用SparkContext.broadcast 出 Broadcast[T] 。
任 序列化的 。
val theArray = broadcastArray.value; theArray(0) = newValue 的
。 工作 执行时, 行 newValue 数 的第 元 , 工作
的任 效。 用spark.serializer 序列化
化序列化 第8 中会 用Kryo 快的序列化 , 的数
据 自 的序列化方式 Java 用java.io.Externalizable 序列
urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs) #
requests = map(lambda x: (x, http.request('GET', x)), urls) #
result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
# 的
return filter(lambda x: x[1] is not None, result) def fetchCallSigns(input):
""" 号"""
return input.mapPartitions(lambda callSigns : processCallSigns(callSigns)) contactsContactList = fetchCallSigns(validSigns)
6-11: Scala 中 用共 JSON 解析
val contactsContactLists = validSigns.distinct().mapPartitions{
signs =>
val mapper = createMapper() val client = new HttpClient() client.start()
// http
signs.map {sign =>
createExchangeForSign(sign)
// 应
}.map{ case (sign, exchange) =>
(sign, readExchangeCallLog(mapper, exchange)) }.filter(x => x._2 != null) // 的
}
6-12: Java 中 用共 JSON 解析 // 用mapPartitions 用 工作
JavaPairRDD<String, CallLog[]> contactsContactLists = validCallSigns.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<String>, String, CallLog[]>() {
public Iterable<Tuple2<String, CallLog[]>> call(Iterator<String> input) { // 列出
ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();
ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();
ObjectMapper mapper = createMapper();
HttpClient client = new HttpClient();
try {
client.start();
while (input.hasNext()) {
requests.add(createRequestForSign(input.next(), client));
}
for (Tuple2<String, ContentExchange> signExchange : requests) { callsignLogs.add(fetchResultFromRequest(mapper, signExchange));
}
mapPartitionsWithIndex() 分区序号, 及 分区中
的元 的迭代
的元 的迭代 f: (Int, Itera tor[T]) → Iter ator[U]
foreachPartitions() 元 迭代 f: (Iterator[T]) →
Unit
了 的 工作, 用mapPartitions() 的开 。有时需要
同 的数据 合 。 第3 中, 计算 时, 方法
数 RDD 元 RDD, 中 所处理的元 数。 ,
分区 次 元 ,而 用 元 执行 作, 6-13 和 6-14。
6-13: Python 中 用mapPartitions() def combineCtrs(c1, c2):
return (c1[0] + c2[0], c1[1] + c2[1]) def basicAvg(nums):
"""计算 """
nums.map(lambda num: (num, 1)).reduce(combineCtrs) 6-14: Python 中 用mapPartitions()
def partitionCtr(nums):
"""计算分区的sumCounter"""
sumCount = [0, 0]
sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs) return sumCount[0] / float(sumCount[1])
6.5 道
6-15:R 的 序
#!/usr/bin/env Rscript library("Imap") f <- file("stdin") open(f)
while(length(line <- readLines(f,n=1)) > 0) { # 处理行
contents <- Map(as.numeric, strsplit(line, ",")) mydist <- gdist(contents[[1]][1], contents[[1]][2], contents[[1]][3], contents[[1]][4],
units="m", a=6378137.0, b=6356752.3142, verbose = FALSE) write(mydist, stdout())
}
distScript = "./src/R/finddistance.R"
distScriptName = "finddistance.R"
sc.addFile(distScript) def hasDistInfo(call):
""" 次 有计算 时 需的字 """
requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]
return all(map(lambda f: call[f], requiredFields)) def formatCall(call):
""" 新的 式 新 R 序解析"""
return "{0},{1},{2},{3}".format(
call["mylat"], call["mylong"],
call["contactlat"], call["contactlong"]) pipeInputs = contactsContactList.values().flatMap(
lambda calls: map(formatCall, filter(hasDistInfo, calls))) distances = pipeInputs.pipe(SparkFiles.get(distScriptName)) print distances.collect()
6-17: Scala 中 用pipe() 用finddistance.R的 序
// 用 R 序计算 次 的
// 本 加 需要 本次作 中 的 件的列 中 val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x => x.map(y =>
s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")).pipe(Seq(
SparkFiles.get(distScriptName))) println(distances.collect().toList)
6-18: Java 中 用pipe() 用finddistance.R的 序
// 用 R 序计算 次 的
// 本 加 需要 本次作 中 的 件的列 中
String distScript = "./src/R/finddistance.R";
String distScriptName = "finddistance.R";
sc.addFile(distScript);
JavaRDD<String> pipeInputs = contactsContactLists.values() .map(new VerifyCallLogs()).flatMap(
new FlatMapFunction<CallLog[], String>() { public Iterable<String> call(CallLog[] calls) { ArrayList<String> latLons = new ArrayList<String>();
for (CallLog call: calls) {
JavaRDD<String> distances = pipeInputs.pipe(SparkFiles.get(distScriptName));
System.out.println(StringUtils.join(distances.collect(), ","));
SparkContext.addFile(path), 件列 , 工作 Spark 作
中 列 中的 件。 件 自 的本地 件 子中所 的
, 者 自HDFS Hadoop 所 的 件 , 者 HTTP、HTTPS FTP 的 URI
地址。 作 中的行 作 发时, 件 会 , 工作
SparkFiles.getRootDirectory 它 。 用SparkFiles.get(Filename)
定 件。 , 保pipe()能 工作 本的方法 。
用 的远 制工具 本 件 的 。
所有 SparkContext.addFile(path) 加的 件 存 同 中,
所 有 要 用 的 字。
本 问,RDD 的pipe()方法 RDD 中的元 地 本 道。
有 版本的findDistance, 行 数的 式 收 定的SEPARATOR。
的 , 的 方法 成工作, 用第 。
• rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))
• rdd.pipe(SparkFiles.get("finddistance.R") + " ,")
第 方法中, 用 定 的 数序列的 式 本
而 第 方法中, 它作 字 , Spark 会 字
解 定 的 数序列。
需要的 , pipe() 定 行 量。 需要 量 应 的
作 pipe()的第 数 进 ,Spark 会 。
应 理解了 用pipe()、 处理RDD 中的元 , 及
的 本分发 集 , 能 工作 本。
6.6 数 RDD
Spark 数 数据的RDD 了 的 计 作。 会 第11
的 的 计方法和 学 方法的 。
Spark 的数 作 式算法 的, 次 元 的方式 出 。
计数据 会 用stats()时 次 数据计算出 , StatsCounter
。 6-2 列出了StatsCounter 的 用方法。
表6-2:StatsCounter中可用的汇总统计数据
count() RDD 中的元 数
mean() 元 的
sum() 和
max() 大
min()
variance() 元 的方
sampleVariance() 中计算出的方
stdev()
sampleStdev() 的
计算 计数据中的 , RDD 用 应的方法, rdd.
mean() 者rdd.sum()。
6-19 6-21 中, 会 用 计 数据中 。由 会
次 用同 RDD 次用 计算 计数据, 次用 , 应
RDD 存 。 的 中, 中 远的
。
6-19:用 Python
# 要 String RDD 数字数据, 能
# 用 计 数
distanceNumerics = distances.map(lambda string: float(string)) stats = distanceNumerics.stats()
stddev = std.stdev() mean = stats.mean()
reasonableDistances = distanceNumerics.filter(
lambda x: math.fabs(x - mean) < 3 * stddev) print reasonableDistances.collect()
6-20:用 Scala
// 要 , 有 地 能 的
// 要 字 RDD 它
val distanceDouble = distance.map(string => string.toDouble) val stats = distanceDoubles.stats()
val stddev = stats.stdev val mean = stats.mean
val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev) println(reasonableDistance.collect().toList)
6-21:用 Java
// 要 String RDD DoubleRDD, 能 用 计 数
JavaDoubleRDD distanceDoubles = distances.mapToDouble(new DoubleFunction<String>() { public double call(String value) {
return Double.parseDouble(value);
}});
final StatCounter stats = distanceDoubles.stats();
final Double stddev = stats.stdev();
final Double mean = stats.mean();
JavaDoubleRDD reasonableDistances =
distanceDoubles.filter(new Function<Double, Boolean>() { public Boolean call(Double x) {
scala 及src/main/java/com/oreilly/learningsparkexamples/java/ChapterSixExample.java 中 。
6.7
本 中, 了Spark 编 中的 进 特 , 用 特 的 序
高效、 大。 会 Spark 应用, 及 Spark 的SQL 、
计算 和 学 。 , 会开 的 序, 会
用 所学 的 能, 自 的Spark 应用带 和 发。
第 7 章
在集群上运行 Spark
7.1
,本书 讲 用Spark shell 学 Spark, 序 运行 Spark 本
地 式 。而Spark 的 大 处 增加 数量 用集 式运行,
序的计算能 。 编 用 集 行执行的Spark 应用所 用的 API 本书
所 的 。 , 数据集 用本地 式快速开发
的应用, 需 代 大 集 运行。
本 分 式Spark 应用的运行 , 集 运行Spark 应用时的
。Spark 的集 理 Hadoop YARN、Apache Mesos, 有 Spark 自带的 集 理 运行,所 Spark 应用 能 适应 用集 , 能用
共 的 计算 。 会 用 的 和 方法进行 。同时, 会
Spark 应用 、 、 等 方 的 。读 本 , 应 能学会
运行分 式Spark 序的 技能。 会 Spark 应用进行 和 能 。
7.2 Spark
集 运行spark , 了解 Spark 分 式 中的 图7-1 , 有 理解 集 运行Spark 的 具 。
分 式 ,Spark 集 用的 / 。 Spark 集 中,有
责中 , 分 式工作 。 中 驱动器 Driver ,
应的工作 执行器 executor 。 和大量的执行
进行 ,它 作 的Java 进 运行。 和所有的执行
Spark 应用 application 。
Spark 序
集 理 Mesos、
YARN 集 理
集 工作 集 工作 集 工作
执行 进 执行 进 执行 进
图7-1:分布式 Spark 应用中的组件
Spark 应用 作集群管理器 Cluster Manager 的 集 中的
。Spark 自带的集 理 集 理 。Spark 能运行 Hadoop YARN 和 Apache Mesos 大开 集 理 。
7.2.1
Spark 执 行 的 序 中 的main()方 法 的 进 。 它 执 行 用 编 的 用 SparkContext、 RDD, 及进行 RDD 的 化 作和行 作的代 。 ,
Spark shell 时, 了 Spark 序 记 ,Spark shell 会 加 作sc的SparkContext 。 序 ,Spark 应用
了。
序 Spark 应用中有 责。
• 把用户程序转为任务
Spark 序 责 用 序 理执行的 元, 元 任务
task 。 ,所有的Spark 序 同 的 : 序 数据
列RDD, 用 化 作 生出新的RDD, 用行 作收集 存 RDD 中的数据。Spark 序 式地 出了 由 作 成的 辑 的有向无环图
Directed Acyclic Graph,简 DAG 。 序运行时,它会 辑图 理执行计 。
Spark 会 辑执行计 作 化, 的 化执行,
作合 中等。 Spark 辑计 列步骤 stage 。而
由 任务 成。 任 会 打 集 中。任 Spark 中 的工作
元,用 序 要 成 千的 任 。
• 为执行器节点调度任务
有了 理执行计 ,Spark 序 执行 进 任 的 。执行
进 ,会 进 册自 。 , 进 应用中所有的执行
有 的记 。 执行 代 能 处理任 和存 RDD 数据的进 。
Spark 序会 据 的执行 集合, 所有任 数据所 分
合适的执行 进 。 任 执行时,执行 进 会 存数据存 ,而
进 同 会 存数据的 , 用 的任 ,
量 数据的网络 。
序会 Spark 应用的运行时的 网 出 ,
4040 。 , 本地 式 , 问http://localhost:4040 网 了。
会 第8 加 地讲解Spark 的网 用 及Spark 的作 制。
7.2.2
Spark 执行 工作进 , 责 Spark 作 中运行任 ,任 互 。
Spark 应用 时,执行 同时 , Spark 应用的生
而存 。 有执行 发生了 ,Spark 应用 执行。执行 进
有 大作用:第 ,它 责运行 成Spark 应用的任 , 进
第 ,它 自 的 理 Block Manager 用 序中要 存的RDD
存式存 。RDD 存 执行 进 的, 任 运行时 分 用 存数据
加速运算。
中
本书中,大 分 本地 式运行Spark。 本地 式 ,Spark
序和 执行 序 同 Java 进 中运行。 特 执行
序 运行 用的进 中。
7.2.3
, 了 和执行 的 。 , 和
执行 的 Spark 集 理 执行 ,而 特
, 集 理 。集 理 Spark 中的 式 件。
, 了Spark 自带的 集 理 ,Spark 运行 集 理 , YARN 和 Mesos。
Spark 中 用驱动器节点和执行器节点的 执行Spark
应用的进 。而主节点 master 和工作节点 worker 的 用
分 集 理 中的中心化的 分和分 式的 分。
,所 要 心。 ,Hadoop YARN 会 作 理
Resource Manager 的 进 , 及 列 作 理 Node
Manager 的工作 进 。而 YARN 的工作 ,Spark
运行执行 进 , 运行 进 。
7.2.4
用的 集 理 , 用Spark 的 本spark-submit
用的 集 理 , 用Spark 的 本spark-submit