3.4 Spark 数
3.4.3 Java
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = { // 问题:"query" "this.query", 要 "this"
rdd.map(x => x.split(query)) }
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// : 需要的字 出 量中
val query_ = this.query rdd.map(x => x.split(query_)) }
}
Scala 中出 了NotSerializableException, 问题 了 序列
化的 中的 数 字 。记 , 序列化 量 中的 数 的。
3.4.3 Java
Java 中, 数需要作 了Spark 的org.apache.spark.api.java.function 中的任
数 的 。 据 同的 , 定义了 同的 。
Function2<T1, T2, R> R call(T1, T2) 收 出 ,用 aggregate()
和fold()等 作中
FlatMapFunction<T, R> Iterable<R> call(T) 收 任 出,用 flatMap()
的 作中
的 数 定义 用 3-22 , 具
3-23 。
3-22: Java 中 用 进行 数
RDD<String> errors = lines.filter(new Function<String, Boolean>() { public Boolean call(String x) { return x.contains("error"); } });
3-23: Java 中 用具 进行 数
class ContainsError implements Function<String, Boolean>() { public Boolean call(String x) { return x.contains("error"); } }
RDD<String> errors = lines.filter(new ContainsError());
具 的 决 人 。 发 具 大 序时
。 用 数的 处 它 的 造 数 加 数, 3-24 所 。
3-24:带 数的 Java 数
class Contains implements Function<String, Boolean>() { private String query;
public Contains(String query) { this.query = query; } public Boolean call(String x) { return x.contains(query); } }
RDD<String> errors = lines.filter(new Contains("error"));
Java 8 中, 用lambda 式 简 地 数 。由 本书 作时,
Java 8 新, 的 用了 版本的Java, 的 法 定义
数 。 , 用lambda 式, 的 会 3-25 所 。
3-25: Java 中 用 Java 8 地 lambda 式进行 数
RDD<String> errors = lines.filter(s -> s.contains("error"));
用Java 8 的 lambda 式 , Oracle 的 http://docs.oracle.
com/javase/tutorial/java/javaOO/lambdaexpressions.html 及Databricks Spark 中 用lambda 式的 http://databricks.com/blog/2014/04/14/spark-with-java-8.html 。
和lambda 式 用方法中 的任 final 量,
Python 和 Scala 中 量 Spark。
3.5
本 会 Spark 中大 分 的 化 作和行 作。 特定数据 的RDD
加 作, ,数字 的RDD 计 数 作,而 式的
RDD 据 合数据的 作。 会 中讲
RDD , 及 应的特 作。
3.5.1 RDD
讲讲 化 作和行 作 任 数据 的RDD 。
1.
能会用 的 用的 化 作 map()和filter() 图3-2 。 化 作
map() 收 数, 数用 RDD 中的 元 , 数的 作
RDD 中 应元 的 。而 化 作filter() 收 数, RDD 中 数的 元 新的RDD 中 。
图3-2:从输入 RDD 映射与筛选得到的 RDD
用map() 的 : 的URL 集合中的 URL 应的
出 , 简 数字 方 。map()的 需要和
。 有 字 RDD, 的map() 数 用 字 解析
Double 的, 时 的 RDD RDD[String],而 出
RDD[Double]。
简 的 子,用map() RDD 中的所有数 方 3-26 3-28 所 。 3-26:Python 版计算 RDD 中 的 方
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect() for num in squared:
print "%i " % (num)
3-27:Scala 版计算 RDD 中 的 方
val input = sc.parallelize(List(1, 2, 3, 4)) val result = input.map(x => x * x)
println(result.collect().mkString(",")) 3-28:Java 版计算 RDD 中 的 方
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() { public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));
有时 , 元 生成 出元 。 能的 作 作flatMap()。
和map() , flatMap()的 数 分 应用 了 RDD 的 元 。
的 元 ,而 序列的迭代 。 出的RDD 由迭代
成的。 的 迭代 问的所有元 的RDD。flatMap()的 简
用 的字 分 , 3-29 3-31 所 。
3-29:Python 中的flatMap() 行数据 分 lines = sc.parallelize(["hello world", "hi"]) words = lines.flatMap(lambda line: line.split(" ")) words.first() # "hello"
3-30:Scala 中的flatMap() 行数据 分
val lines = sc.parallelize(List("hello world", "hi")) val words = lines.flatMap(line => line.split(" ")) words.first() // "hello"
3-31:Java 中的flatMap() 行数据 分
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
} });
words.first(); // "hello"
图3-3 中 了flatMap()和map()的区 。 flatMap() 作 的迭代
, 了 由 列 中的元 成的RDD,而 由列 成的RDD。
图3-3:RDD 的flatMap()和map()的区别 2.
RDD 本 义 的集合, 它 数学 的集合 作, 合 和
交 作。图3-4 了 作。 , 作 要 作的RDD 同数据 的。
的RDD 中 的集合 元 的 , 有 的元 。
要 的元 , 用RDD.distinct() 化 作 生成 同元 的新
RDD。 需要 ,distinct() 作的开 大, 它需要 所有数据 网络进行
shuffle , 保 元 有 。第4 会 数据 , 及 数据 。
图3-4:一些简单的集合操作
简 的集合 作 union(other),它会 RDD 中所有元 的 RDD。
用 有用, 处理 自 数据 的 件。 数学中的union() 作
同的 , 的RDD 中有 数据,Spark 的union() 作 会 数据
有 要, distinct() 同的效 。
Spark 了intersection(other)方法, RDD 中 有的元 。intersection()
运 行 时 会 所 有 的 元 RDD 的 元 会 。
intersection() union()的 ,intersection()的 能 要 , 它需要
网络 数据 发 共有的元 。
有 时 需 要 数 据。subtract(other) 数 收 RDD 作 数,
由 存 第 RDD 中而 存 第 RDD 中的所有元 成的RDD。和
intersection() ,它 需要数据 。
计算 RDD 的 , 图3-5 所 。cartesian(other) 化 作会
所有 能的(a, b) , 中a RDD 中的元 ,而b 自 RDD。
所有 能的 合的 时 有用, 计算 用 的
。 RDD 自 的 , 用 用 的应用
中。 要特 的 , 大 RDD 的 开 大。
图3-5:两个 RDD 的笛卡儿积
3-2 和 3-3 了 的RDD 化 作。
表3-2:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作
map() 数应用 RDD 中的 元
distinct() rdd.distinct() {1, 2, 3}
sample(withRe placement, fra ction, [seed])
RDD , 及 rdd.sample(false, 0.5) 定的
表3-3:对数据分别为{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作
union() 生成 RDD 中所有元
的RDD
rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() RDD 共同的元 的 RDD rdd.intersection(other) {3}
subtract() RDD 中的
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) { return x + y; }
});
fold()和reduce() , 收 reduce() 收的 数 同的 数, 加 sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))) return sumCount[0] / float(sumCount[1])
3-36:Scala 中的aggregate()
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) val avg = result._1 / result._2.toDouble
3-37:Java 中的aggregate()
class AvgCount implements Serializable { public AvgCount(int total, int num) { this.total = total;
this.num = num;
}
public int total;
public int num;
public double avg() {
return total / (double) num;
} }
Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() { public AvgCount call(AvgCount a, Integer x) {
Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() { public AvgCount call(AvgCount a, AvgCount b) { a.total += b.total;
a.num += b.num;
return a;
} };
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
本RDD 的 作, 出它 的行 。count()用
元 的 数,而countByValue() 应的计数的 。 3-4
了 行 作。
表3-4:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作
collect() RDD 中的所有元 rdd.collect() {1, 2, 3, 3}
count() RDD 中的元 数 rdd.count() 4
countByValue() 元 RDD 中出 的次数 rdd.countByValue() {(1, 1),
(2, 1),
RDD 中 任 元 rdd.takeSample(false, 1) 定的
reduce(func) 行 合RDD 中 所 有 数 据
SparkContext._ 用 式 。 SparkContext 的Scala http://spark.
apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext$ 中 所列出的 式
。 式 式地 RDD , DoubleRDDFunctions 数 数据的RDD 和PairRDDFunctions RDD , 有了 mean()和
variance() 的 的 数。
式 大, 会 阅读代 的人 。 RDD 用了 mean()
的 数, 能 会 发 RDD 的Scala http://spark.apache.org/docs/latest/api/scala/
index.html#org.apache.spark.rdd.RDD 中 本 有mean() 数。 用 所 能 成 ,
式 RDD[Double] DoubleRDDFunctions。 Scala 中
数时, 要 了 的 用 中的 数。
DoubleFlatMapFunction<T> Function<T, Iterable<Double>> 用 flatMapToDouble, 生成DoubleRDD
DoubleFunction<T> Function<T, Double> 用 mapToDouble, 生成
DoubleRDD
PairFlatMapFunction<T, K, V> Function<T, Iterable<Tuple2<K, V>>> 用 flatMapToPair, 生 成PairRDD<K, V>
PairFunction<T, K, V> Function<T, Tuple2<K, V>> 用 mapToPair, 生 成 PairRDD<K, V>
3-28 生成 JavaDoubleRDD、计算RDD 中 元 的 方 的 ,
3-38 所 。 用DoubleRDD 有的 数了, mean()和variance()。 3-38:用 Java DoubleRDD
JavaDoubleRDD result = rdd.mapToDouble(
new DoubleFunction<Integer>() { public double call(Integer x) { return (double) x * x;
} });
System.out.println(result.mean());
3. Python
Python 的 API Java 和 Scala 有所 同。 Python 中,所有的 数 本的
RDD 中, 作 应的RDD 数据 , 会 运行时 。
3.6 ( )
所 ,Spark RDD 的,而有时 能 次 用同 RDD。 简 地 RDD 用行 作,Spark 次 会 算RDD 及它的所有 。 迭代算法中
大, 迭代算法 会 次 用同 数据。 3-39 RDD 作 次计
数、 RDD 出的 子。
3-39:Scala 中的 次执行
val result = input.map(x => x*x) println(result.count())
println(result.collect().mkString(","))
了 次计算同 RDD, Spark 数据进行 化。 Spark 化
存 RDD 时,计算出 RDD 的 会分 保存它 所 出的分区数据。 有 化数据的 发生 ,Spark 会 需要用 存的数据时 算 的数据分区。
的 会 的执行速 , 数据 。
出 同的 的, RDD 同的 化 3-6 所 。 Scala
3-40 和 Java 中, persist()会 数据 序列化的 式 存 JVM 的 中。 Python 中, 会 序列化要 化存 的数据,所 化
序列化 的 存 JVM 中。 数据 者 存 时,
用序列化 的数据。
表3-6:org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级 别;如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份
内
MEMORY_ONLY 高
MEMORY_ONLY_SER 高
MEMORY_AND_DISK 高 中等 分 分 数据 存中 ,
MEMORY_AND_DISK_SER 高 分 分 数据 存中 ,
。 存中存 序列化 的数据
DISK_ONLY 高
存 能, 用Tachyon http://tachyon-project.org/ 作
。 Spark 的 存 有 ,
Tachyon 运 行Spark 的 http://tachyon-project.org/Running-Spark-on-Tachyon.html 。
3-40: Scala 中 用persist() val result = input.map(x => x * x) result.persist(StorageLevel.DISK_ONLY) println(result.count())
println(result.collect().mkString(","))
, 第 次 RDD 用行 作 用了persist()方法。persist()
用本 会 发 制 。
要 存的数据 , 存中 ,Spark 会自 用 用 LRU 的 存
的分区 存中 。 数据存 存中的 存 , 次要用
的分区时, 分区 需要 新计算。 用 存 的 存 的
分区 , 的分区 会 。 , 心 的作 存
了 数据而 打 。 , 存 要的数据会 有用的数据 出 存,带
算的时 开 。
,RDD 有 方法 作unpersist(), 用 方法 化的RDD
存中 。
3.7
本 中, 了RDD 运行 及RDD 的 作。 读 了 ,
—— 学 了Spark 的所有核心 。 进行 行 合、分 等 作时,
需要 用 式的RDD。 会讲解 式的RDD 的特 作。
, 会 数据 的 出, 及 用SparkContext 的进 题。
第 4 章
键值对操作
RDD Spark 中 作所需要的 数据 。本 作
RDD。 RDD 用 进行 合计算。 要 ETL 、
化、 作 数据 化 式。 RDD 了 新的 作
计 的 , 数据中 同的分 , 同的RDD 进行分 合 等 。
本 会 用 用 制 RDD 分 的高 特 :分区。有时,
用 的分区方式 问的数据 同 , 大大 应用的
开 。 会带 的 能 。 会 用PageRank 算法 分区的作用。 分
式数据集 的分区方式和 本地数据集 合适的数据 ——
,数据的分 会 地 序的 能 。
4.1
Spark 的RDD 了 有的 作。 RDD pair RDD1。Pair
RDD 序的 成要 , 它 了 行 作 新进行数据分
的 作 。 ,pair RDD reduceByKey()方法, 分 应的数据,
有join()方法, RDD 中 同的元 合 ,合 RDD。
RDD 中 字 代 件时 、用 ID 者 的字 ,
用 字 作 pair RDD 作中的 。
1: pair RDD RDD , 发 义,译 中保 pair RDD 。——译者
4.2 Pair RDD
Spark 中有 pair RDD 的方式。第 5 会讲 , 存 的数据 式会
读 时 由 数据 成的pair RDD。 , 需要 的RDD
pair RDD 时, 用map() 数 , 的 数需要 。 会 由 本行 成的RDD 行的第 的pair RDD。
RDD 的方法 同的 中会有所 同。 Python 中, 了 的
数据能 数中 用,需要 由 元 成的RDD 4-1 。
4-1: Python 中 用第 作 出 pair RDD pairs = lines.map(lambda x: (x.split(" ")[0], x))
Scala 中, 了 的数据能 数中 用,同 需要 元
4-2 。 式 元 RDD 加的 数。
4-2: Scala 中 用第 作 出 pair RDD val pairs = lines.map(x => (x.split(" ")(0), x))
Java 有自带的 元 , Spark 的 Java API 用 用scala.Tuple2
元 。 简 :Java 用 new Tuple2(elem1, elem2) 新的 元
, ._1()和._2()方法 问 中的元 。
Java 用 需要 用 的Spark 数 pair RDD。 ,要 用mapToPair() 数
代 版的map() 数, 3.5.2 中的 Java 有 的 。
4-3 中 简 的 子。
4-3: Java 中 用第 作 出 pair RDD PairFunction<String, String, String> keyData =
new PairFunction<String, String, String>() { public Tuple2<String, String> call(String x) { return new Tuple2(x.split(" ")[0], x);
} };
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
用Scala 和 Python 存中的数据集 pair RDD 时, 需要 由 元 成
的集合 用SparkContext.parallelize()方法。而要 用Java 存数据集 pair RDD
的 , 需要 用SparkContext.parallelizePairs()。
4.3 Pair RDD
Pair RDD 用所有 RDD 的 用的 化 作。3.4 中 的所有有 数
的 同 适用 pair RDD。由 pair RDD 中 元 ,所 需要 的 数应
groupByKey() 具有 同 的 进行分 rdd.groupByKey() {(1,
[2]),
rdd.mapValues(x => x+1) {(1, 3), (3, 5), (3, 7)}
flatMapValues(func) pair RDD 中 的 应 用
迭代 的 数,
的 元 生成
应 的 记 。
用 号化
rdd.flatMapValues(x => (x to 5)) {(1, 2), (1,
sortByKey() 据 序的RDD rdd.sortByKey() {(1,
2), (3, 4), (3, 6)}
表4-2:针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})
subtractByKey RDD 中 other RDD 中的 同的元
rdd.subtractByKey(other) {(1, 2)}
join RDD 进行 rdd.join(other) {(3, (4, 9)), (3,
(6, 9))}
rightOuterJoin RDD 进行 作, 保第
RDD 的 存
rdd.rightOuterJoin(other) {(3,(Some(4),9)), (3,(Some(6),9))}
leftOuterJoin RDD 进行 作, 保第
RDD 的 存
rdd.leftOuterJoin(other) {(1,(2,None)), (3, (4,Some(9))), (3, (6,Some(9)))}
cogroup RDD 中 有 同 的数据分 rdd.cogroup(other) {(1,([2],[])), (3,
([4, 6],[9]))}
的 会 pair RDD 的 数。
Pair RDD RDD 元 Java Scala 中的 Tuple2 Python 中的元 ,
同 RDD 所 的 数。 , 中的pair RDD,
20 字 的行, 4-4 4-6 及图4-1 所 。 4-4:用 Python 第 元 进行
result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20) 4-5:用 Scala 第 元 进行
pairs.filter{case (key, value) => value.length < 20}
4-6:用 Java 第 元 进行
Function<Tuple2<String, String>, Boolean> longWordFilter = new Function<Tuple2<String, String>, Boolean>() { public Boolean call(Tuple2<String, String> keyValue) { return (keyValue._2().length() < 20);
} };
JavaPairRDD<String, String> result = pairs.filter(longWordFilter);
图4-1:根据值筛选
有时, 问pair RDD 的 分, 时 作 元 。由 的
用 式, Spark 了mapValues(func) 数, 能 map{case (x, y): (x,
func(y))}。 子中 用 数。
次 pair RDD 的 作, 合 作开 。
4.3.1
数据集 式 的时 , 合具有 同 的元 进行 计 的
作。 讲解 RDD 的fold()、combine()、reduce()等行 作,pair RDD
有 应的 的 化 作。Spark 有 的 作, 合具有 同 的 。
作 RDD, 它 化 作而 行 作。
reduceByKey() reduce() 它 收 数, 用 数 进行合 。
reduceByKey()会 数据集中的 进行 行的 作, 作会 同的 合
。 数据集中 能有大量的 ,所 reduceByKey() 有 用 序
的行 作。 ,它会 由 和 应 出 的 成的新的RDD。
foldByKey() fold() 它 用 RDD 和合 数中的数据
同的 作 。 fold() ,foldByKey() 作所 用的合 数
元 进行合 , 元 。
4-7 和 4-8 所 , 用reduceByKey()和mapValues() 计算 的 应 的 图4-2 。 和 用fold()和map()计算 RDD 的 。
, 用 加 用的 数 同 的 , 会讲 。
4-7: Python 中 用reduceByKey()和mapValues()计算 应的
rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) 4-8: Scala 中 用reduceByKey()和mapValues()计算 应的
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
图4-2:求每个键平均值的数据流
MapReduce 中的合 combiner 的读者 能 ,
用reduceByKey()和foldByKey()会 计 算 的
自 台 进行本地合 。用 需要 定合 。 化的
combineByKey() 自定义合 的行 。
用 4-9 4-11 中 的方法 解决 的分 式 计数问题。
用 中讲 的flatMap() 生成 、 数字1 的pair RDD,
4-7 和 4-8 中 , 用reduceByKey() 所有的 进行计数。
4-9:用 Python 计数 rdd = sc.textFile("s3://...")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) 4-10:用 Scala 计数
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y) 4-11:用 Java 计数
JavaRDD<String> input = sc.textFile("s3://...")
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } });
JavaPairRDD<String, Integer> result = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); } }).reduceByKey(
new Function2<Integer, Integer, Integer>() {
new Function2<Integer, Integer, Integer>() {