• 沒有找到結果。

Elasticsearch

在文檔中 快速大数据分析 (頁 107-0)

5.5  数据

5.5.4 Elasticsearch

Spark 用Elasticsearch-Hadoop https://github.com/elastic/elasticsearch-hadoop Elasticsearch 中读 数据。Elasticsearch 开 的、 Lucene 的 。

Elasticsearch 和 的 大 ,它会 的路 ,

而 SparkContext 中 的 。Elasticsearch 的OutputFormat 有用 Spark 所 的 ,所 用saveAsHadoopDataSet 代 , 需要

5-46: Scala 中 用 Elasticsearch 出

val jobConf = new JobConf(sc.hadoopConfiguration)

jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.

mr.EsOutputFormat")

jobConf.setOutputCommitter(classOf[FileOutputCommitter])

jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets") jobConf.set(ConfigurationOptions.ES_NODES, "localhost")

FileOutputFormat.setOutputPath(jobConf, new Path("-")) output.saveAsHadoopDataset(jobConf)

5-47: Scala 中 用 Elasticsearch

def mapWritableToInput(in: MapWritable): Map[String, String] = { in.map{case (k, v) => (k.toString, v.toString)}.toMap

}

val jobConf = new JobConf(sc.hadoopConfiguration)

jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1)) jobConf.set(ConfigurationOptions.ES_NODES, args(2)) val currentTweets = sc.hadoopRDD(jobConf,

classOf[EsInputFormat[Object, MapWritable]], classOf[Object], classOf[MapWritable])

// map

// MapWritable[Text, Text] Map[String, String]

val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }

和 ,Elasticsearch 有 , 作 的 有效 。

出而 ,Elasticsearch 进行 , 尔会 出 的

数据 , 要存 字 的数据 , 定

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html 。

5.6

本 , 应 能 数据读 Spark 中, 计算 所 的方

式存 。 了数据 用的 同 式, 及它 应的数

据处理的方式。 掌握了读 和保存大 数据集的方法, 会

用 编 高效 大的Spark 序的方法。

第 6 章

Spark编程进阶

6.1

本 有 及的Spark 编 的 进 特 ,会 的共 量:

累加器 accumulator 广播变量 broadcast variable 。 加 用 进行 合,而 量用 高效分发 大的 。 有的RDD 化 作的 ,

数据 需要 大 代价的任 了 作。 了 用的工具 ,本 会

Spark 序交互的方式, 用R 编 的 本进行交互。

本 会 用 电 作者的 作 , 出 的 应用。

的 的 号。 号 由 分 的, 有自 的 号号 ,

所 据 号 应的 。有 作者的地理 ,用

定 。 6-1 了 。本书的 代 中 需要

中 进行处理的 号列 。

6-1: JSON 式的 , 中 字

{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",

"contactlat":"37.384733","contactlong":"-122.032164",

"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",

"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}

要用 的第 Spark 特 共 量。共 量 Spark 任 中 用的特

的 量。 中, 用Spark 共 量 的 进行计数,

及分发 张 大的 。

任 需要 时 进行 , 需要 数据 者 数生成 时, 数 file = sc.textFile(inputFile)

# Accumulator[Int] 化 0 blankLines = sc.accumulator(0) def extractCallSigns(line):

global blankLines # 问 量 if (line == ""):

blankLines += 1 return line.split(" ")

callSigns = file.flatMap(extractCallSigns)

callSigns.saveAsTextFile(outputDir + "/callsigns") print "Blank lines: %d" % blankLines.value

6-3: Scala 中 加 行 val sc = new SparkContext(...) val file = sc.textFile("file.txt")

val blankLines = sc.accumulator(0) // Accumulator[Int] 化 0 val callSigns = file.flatMap(line => {

if (line == "") { println("Blank lines: " + blankLines.value) 6-4: Java 中 加 行

JavaRDD<String> rdd = sc.textFile(args[1]);

final Accumulator<Integer> blankLines = sc.accumulator(0);

JavaRDD<String> callSigns = rdd.flatMap(

new FlatMapFunction<String, String>() { public Iterable<String> call(String line) { if (line.equals("")) {

blankLines.add(1);

}

return Arrays.asList(line.split(" "));

}});

callSigns.saveAsTextFile("output.txt")

System.out.println("Blank lines: "+ blankLines.value());

中, 了 作blankLines的Accumulator[Int] ,

中 行时 加1。执行 化 作 , 打印出 加 中的 。 ,

• 中 用SparkContext.accumulator(initialValue)方 法, 出 存 有

的 加 。 org.apache.spark.Accumulator[T] , 中T

initialValue的 。

的计数 时 方 , 有 需要 时, 者 需要

validSignCount = sc.accumulator(0) invalidSignCount = sc.accumulator(0) def validateSign(sign):

global validSignCount, invalidSignCount

if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):

validSignCount += 1

validSigns = callSigns.filter(validateSign)

contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x + y)

# 制 计算计数

contactCount.count()

if invalidSignCount.value < 0.1 * validSignCount.value:

contactCount.saveAsTextFile(outputDir + "/contactCount") else:

print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.

value)

加 要 处理 ,对于要在行动操作中使用的累加器,Spark

只会把每个任务对各累加器的修改应用一次。 , 要 计

算时 的 加 , 它 foreach() 的行 作中。

对于在RDD 转化操作中使用的累加器,就不能保证有这种情况了。 化 作中 加

能会发生 次 新。 子, 存 有 用的RDD 第

次 LRU 存中 新用 时, 的 次 新 会发生。 会 制

RDD 据 进行 算,而 作用 会 中的 化 作 的 加 进行

新, 次发 中。 化 作中, 加 用 的。

版本的Spark 能会 行 成 新 次 加 的 , 版本

1.2.0 会进行 次 新, 化 作中的 加 时 用。

6.2.2

, 学 了 用加法 作Spark 的 加 Accumulator[Int] 。 Spark Double、Long和Float 的 加 。 ,Spark 了自定义

加 和 合 作的API 要 加的 中的 大 ,而 加 。自

定义 加 需要 AccumulatorParam, Spark API http://spark.apache.org/docs/

latest/api/scala/index.html#package 中有所 。 要 作同时 交 和 合 ,

用任 作 代 数 的加法。 了 和, 数据的 大 。

任 的a 和 b,有 a op b = b op a,op 交 。 任 的a、b 和 c,有 (a op b) op = a op (b op ),op

合 。 ,sum和max 交 合 , Spark 加 中

的 用 作。

6.3

Spark 的第 共 量 广播变量,它 序高效地 所有工作 发

大的 读 , Spark 作 用。 , 的应用需要 所有 发

大的 读 , 学 算法中的 大的特 量, 量用

,Spark 会自 中所有 用 的 量发 工作 。 方 ,

效。 有 : , 的任 发 制 任 进行 化的 次,

能会 多个 行 作中 用同 量, Spark 会 作分 发 。

子, 要 Spark 序, 号的 应的 。 的

, 由 用 自的 号 ,所 方法 行的。 用 Spark , 代 6-6 所 。

6-6: Python 中

# RDD contactCounts中的 号的 应 。 号

# 读 代 进行

signPrefixes = loadCallSignTable()

def processSignCount(sign_count, signPrefixes):

country = lookupCountry(sign_count[0], signPrefixes) count = sign_count[1]

return (country, count)

countryContactCounts = (contactCounts

.map(processSignCount)

signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes):

country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1]

return (country, count)

countryContactCounts = (contactCounts

.map(processSignCount)

val signPrefixes = sc.broadcast(loadCallSignTable())

val countryContactCounts = contactCounts.map{case (sign, count) =>

val country = lookupInArray(sign, signPrefixes.value) (country, count)

final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());

JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(

new PairFunction<Tuple2<String, Integer>, String, Integer> (){

public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) { String sign = callSignCount._1();

String country = lookupCountry(sign, callSignInfo.value());

return new Tuple2(country, callSignCount._2());

}}).reduceByKey(new SumInts());

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

所 , 用 量的 简 。

(1) T的 用SparkContext.broadcast 出 Broadcast[T] 。

任 序列化的 。

val theArray = broadcastArray.value; theArray(0) = newValue 的

。 工作 执行时, 行 newValue 数 的第 元 , 工作

的任 效。 用spark.serializer 序列化

化序列化 第8 中会 用Kryo 快的序列化 , 的数

据 自 的序列化方式 Java 用java.io.Externalizable 序列

urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs) #

requests = map(lambda x: (x, http.request('GET', x)), urls) #

result = map(lambda x: (x[0], json.loads(x[1].data)), requests)

# 的

return filter(lambda x: x[1] is not None, result) def fetchCallSigns(input):

""" 号"""

return input.mapPartitions(lambda callSigns : processCallSigns(callSigns)) contactsContactList = fetchCallSigns(validSigns)

6-11: Scala 中 用共 JSON 解析

val contactsContactLists = validSigns.distinct().mapPartitions{

signs =>

val mapper = createMapper() val client = new HttpClient() client.start()

// http

signs.map {sign =>

createExchangeForSign(sign)

// 应

}.map{ case (sign, exchange) =>

(sign, readExchangeCallLog(mapper, exchange)) }.filter(x => x._2 != null) // 的

}

6-12: Java 中 用共 JSON 解析 // 用mapPartitions 用 工作

JavaPairRDD<String, CallLog[]> contactsContactLists = validCallSigns.mapPartitionsToPair(

new PairFlatMapFunction<Iterator<String>, String, CallLog[]>() {

public Iterable<Tuple2<String, CallLog[]>> call(Iterator<String> input) { // 列出

ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();

ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();

ObjectMapper mapper = createMapper();

HttpClient client = new HttpClient();

try {

client.start();

while (input.hasNext()) {

requests.add(createRequestForSign(input.next(), client));

}

for (Tuple2<String, ContentExchange> signExchange : requests) { callsignLogs.add(fetchResultFromRequest(mapper, signExchange));

}

mapPartitionsWithIndex() 分区序号, 及 分区中

的元 的迭代

的元 的迭代 f: (Int, Itera tor[T]) → Iter ator[U]

foreachPartitions() 元 迭代 f: (Iterator[T])

Unit

了 的 工作, 用mapPartitions() 的开 。有时需要

同 的数据 合 。 第3 中, 计算 时, 方法

数 RDD 元 RDD, 中 所处理的元 数。 ,

分区 次 元 ,而 用 元 执行 作, 6-13 和 6-14。

6-13: Python 中 用mapPartitions() def combineCtrs(c1, c2):

return (c1[0] + c2[0], c1[1] + c2[1]) def basicAvg(nums):

"""计算 """

nums.map(lambda num: (num, 1)).reduce(combineCtrs) 6-14: Python 中 用mapPartitions()

def partitionCtr(nums):

"""计算分区的sumCounter"""

sumCount = [0, 0]

sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs) return sumCount[0] / float(sumCount[1])

6.5 道

6-15:R 的 序

#!/usr/bin/env Rscript library("Imap") f <- file("stdin") open(f)

while(length(line <- readLines(f,n=1)) > 0) { # 处理行

contents <- Map(as.numeric, strsplit(line, ",")) mydist <- gdist(contents[[1]][1], contents[[1]][2], contents[[1]][3], contents[[1]][4],

units="m", a=6378137.0, b=6356752.3142, verbose = FALSE) write(mydist, stdout())

}

distScript = "./src/R/finddistance.R"

distScriptName = "finddistance.R"

sc.addFile(distScript) def hasDistInfo(call):

""" 次 有计算 时 需的字 """

requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]

return all(map(lambda f: call[f], requiredFields)) def formatCall(call):

""" 新的 式 新 R 序解析"""

return "{0},{1},{2},{3}".format(

call["mylat"], call["mylong"],

call["contactlat"], call["contactlong"]) pipeInputs = contactsContactList.values().flatMap(

lambda calls: map(formatCall, filter(hasDistInfo, calls))) distances = pipeInputs.pipe(SparkFiles.get(distScriptName)) print distances.collect()

6-17: Scala 中 用pipe() 用finddistance.R的 序

// 用 R 序计算 次 的

// 本 加 需要 本次作 中 的 件的列 中 val distScript = "./src/R/finddistance.R"

val distScriptName = "finddistance.R"

sc.addFile(distScript)

val distances = contactsContactLists.values.flatMap(x => x.map(y =>

s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")).pipe(Seq(

SparkFiles.get(distScriptName))) println(distances.collect().toList)

6-18: Java 中 用pipe() 用finddistance.R的 序

// 用 R 序计算 次 的

// 本 加 需要 本次作 中 的 件的列 中

String distScript = "./src/R/finddistance.R";

String distScriptName = "finddistance.R";

sc.addFile(distScript);

JavaRDD<String> pipeInputs = contactsContactLists.values() .map(new VerifyCallLogs()).flatMap(

new FlatMapFunction<CallLog[], String>() { public Iterable<String> call(CallLog[] calls) { ArrayList<String> latLons = new ArrayList<String>();

for (CallLog call: calls) {

JavaRDD<String> distances = pipeInputs.pipe(SparkFiles.get(distScriptName));

System.out.println(StringUtils.join(distances.collect(), ","));

SparkContext.addFile(path), 件列 , 工作 Spark 作

中 列 中的 件。 件 自 的本地 件 子中所 的

, 者 自HDFS Hadoop 所 的 件 , 者 HTTP、HTTPS FTP 的 URI

地址。 作 中的行 作 发时, 件 会 , 工作

SparkFiles.getRootDirectory 它 。 用SparkFiles.get(Filename)

定 件。 , 保pipe()能 工作 本的方法 。

用 的远 制工具 本 件 的 。

所有 SparkContext.addFile(path) 加的 件 存 同 中,

所 有 要 用 的 字。

本 问,RDD 的pipe()方法 RDD 中的元 地 本 道。

有 版本的findDistance, 行 数的 式 收 定的SEPARATOR。

的 , 的 方法 成工作, 用第 。

• rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))

• rdd.pipe(SparkFiles.get("finddistance.R") + " ,")

第 方法中, 用 定 的 数序列的 式 本

而 第 方法中, 它作 字 , Spark 会 字

解 定 的 数序列。

需要的 , pipe() 定 行 量。 需要 量 应 的

作 pipe()的第 数 进 ,Spark 会 。

应 理解了 用pipe()、 处理RDD 中的元 , 及

的 本分发 集 , 能 工作 本。

6.6 数 RDD

Spark 数 数据的RDD 了 的 计 作。 会 第11

的 的 计方法和 学 方法的 。

Spark 的数 作 式算法 的, 次 元 的方式 出 。

计数据 会 用stats()时 次 数据计算出 , StatsCounter

。 6-2 列出了StatsCounter 的 用方法。

表6-2:StatsCounter中可用的汇总统计数据

count() RDD 中的元 数

mean() 元 的

sum()

max()

min()

variance() 元 的方

sampleVariance() 中计算出的方

stdev()

sampleStdev()

计算 计数据中的 , RDD 用 应的方法, rdd.

mean() 者rdd.sum()。

6-19 6-21 中, 会 用 计 数据中 。由 会

次 用同 RDD 次用 计算 计数据, 次用 , 应

RDD 存 。 的 中, 中 远的

6-19:用 Python

# 要 String RDD 数字数据, 能

# 用 计 数

distanceNumerics = distances.map(lambda string: float(string)) stats = distanceNumerics.stats()

stddev = std.stdev() mean = stats.mean()

reasonableDistances = distanceNumerics.filter(

lambda x: math.fabs(x - mean) < 3 * stddev) print reasonableDistances.collect()

6-20:用 Scala

// 要 , 有 地 能 的

// 要 字 RDD 它

val distanceDouble = distance.map(string => string.toDouble) val stats = distanceDoubles.stats()

val stddev = stats.stdev val mean = stats.mean

val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev) println(reasonableDistance.collect().toList)

6-21:用 Java

// 要 String RDD DoubleRDD, 能 用 计 数

JavaDoubleRDD distanceDoubles = distances.mapToDouble(new DoubleFunction<String>() { public double call(String value) {

return Double.parseDouble(value);

}});

final StatCounter stats = distanceDoubles.stats();

final Double stddev = stats.stdev();

final Double mean = stats.mean();

JavaDoubleRDD reasonableDistances =

distanceDoubles.filter(new Function<Double, Boolean>() { public Boolean call(Double x) {

scala 及src/main/java/com/oreilly/learningsparkexamples/java/ChapterSixExample.java 中 。

6.7

本 中, 了Spark 编 中的 进 特 , 用 特 的 序

高效、 大。 会 Spark 应用, 及 Spark 的SQL 、

计算 和 学 。 , 会开 的 序, 会

用 所学 的 能, 自 的Spark 应用带 和 发。

第 7 章

在集群上运行 Spark

7.1

,本书 讲 用Spark shell 学 Spark, 序 运行 Spark 本

地 式 。而Spark 的 大 处 增加 数量 用集 式运行,

序的计算能 。 编 用 集 行执行的Spark 应用所 用的 API 本书

所 的 。 , 数据集 用本地 式快速开发

的应用, 需 代 大 集 运行。

本 分 式Spark 应用的运行 , 集 运行Spark 应用时的

。Spark 的集 理 Hadoop YARN、Apache Mesos, 有 Spark 自带的 集 理 运行,所 Spark 应用 能 适应 用集 , 能用

共 的 计算 。 会 用 的 和 方法进行 。同时, 会

Spark 应用 、 、 等 方 的 。读 本 , 应 能学会

运行分 式Spark 序的 技能。 会 Spark 应用进行 和 能 。

7.2 Spark

集 运行spark , 了解 Spark 分 式 中的 图7-1 , 有 理解 集 运行Spark 的 具 。

分 式 ,Spark 集 用的 / 。 Spark 集 中,有

责中 , 分 式工作 。 中 驱动器 Driver ,

应的工作 执行器 executor 。 和大量的执行

进行 ,它 作 的Java 进 运行。 和所有的执行

Spark 应用 application 。

Spark 序

集 理 Mesos、

YARN 集 理

集 工作 集 工作 集 工作

执行 进 执行 进 执行 进

图7-1:分布式 Spark 应用中的组件

Spark 应用 作集群管理器 Cluster Manager 的 集 中的

。Spark 自带的集 理 集 理 。Spark 能运行 Hadoop YARN 和 Apache Mesos 大开 集 理 。

7.2.1

Spark 执 行 的 序 中 的main()方 法 的 进 。 它 执 行 用 编 的 用 SparkContext、 RDD, 及进行 RDD 的 化 作和行 作的代 。 ,

Spark shell 时, 了 Spark 序 记 ,Spark shell 会 加 作sc的SparkContext 。 序 ,Spark 应用

了。

序 Spark 应用中有 责。

• 把用户程序转为任务

Spark 序 责 用 序 理执行的 元, 元 任务

task 。 ,所有的Spark 序 同 的 : 序 数据

列RDD, 用 化 作 生出新的RDD, 用行 作收集 存 RDD 中的数据。Spark 序 式地 出了 由 作 成的 辑 的有向无环图

Directed Acyclic Graph,简 DAG 。 序运行时,它会 辑图 理执行计 。

Spark 会 辑执行计 作 化, 的 化执行,

作合 中等。 Spark 辑计 列步骤 stage 。而

由 任务 成。 任 会 打 集 中。任 Spark 中 的工作

元,用 序 要 成 千的 任 。

• 为执行器节点调度任务

有了 理执行计 ,Spark 序 执行 进 任 的 。执行

进 ,会 进 册自 。 , 进 应用中所有的执行

有 的记 。 执行 代 能 处理任 和存 RDD 数据的进 。

Spark 序会 据 的执行 集合, 所有任 数据所 分

合适的执行 进 。 任 执行时,执行 进 会 存数据存 ,而

进 同 会 存数据的 , 用 的任 ,

量 数据的网络 。

序会 Spark 应用的运行时的 网 出 ,

4040 。 , 本地 式 , 问http://localhost:4040 网 了。

会 第8 加 地讲解Spark 的网 用 及Spark 的作 制。

7.2.2

Spark 执行 工作进 , 责 Spark 作 中运行任 ,任 互 。

Spark 应用 时,执行 同时 , Spark 应用的生

而存 。 有执行 发生了 ,Spark 应用 执行。执行 进

有 大作用:第 ,它 责运行 成Spark 应用的任 , 进

第 ,它 自 的 理 Block Manager 用 序中要 存的RDD

存式存 。RDD 存 执行 进 的, 任 运行时 分 用 存数据

加速运算。

本书中,大 分 本地 式运行Spark。 本地 式 ,Spark

序和 执行 序 同 Java 进 中运行。 特 执行

序 运行 用的进 中。

7.2.3

, 了 和执行 的 。 , 和

执行 的 Spark 集 理 执行 ,而 特

, 集 理 。集 理 Spark 中的 式 件。

, 了Spark 自带的 集 理 ,Spark 运行 集 理 , YARN 和 Mesos。

Spark 中 用驱动器节点和执行器节点的 执行Spark

应用的进 。而主节点 master 和工作节点 worker 的 用

分 集 理 中的中心化的 分和分 式的 分。

,所 要 心。 ,Hadoop YARN 会 作 理

Resource Manager 的 进 , 及 列 作 理 Node

Manager 的工作 进 。而 YARN 的工作 ,Spark

运行执行 进 , 运行 进 。

7.2.4

用的 集 理 , 用Spark 的 本spark-submit

用的 集 理 , 用Spark 的 本spark-submit

在文檔中 快速大数据分析 (頁 107-0)

相關文件