• 沒有找到結果。

Java

在文檔中 快速大数据分析 (頁 52-0)

3.4  Spark 数

3.4.3 Java

def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = { // 问题:"query" "this.query", 要 "this"

rdd.map(x => x.split(query)) }

def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {

// : 需要的字 出 量中

val query_ = this.query rdd.map(x => x.split(query_)) }

}

Scala 中出 了NotSerializableException, 问题 了 序列

化的 中的 数 字 。记 , 序列化 量 中的 数 的。

3.4.3 Java

Java 中, 数需要作 了Spark 的org.apache.spark.api.java.function 中的任

数 的 。 据 同的 , 定义了 同的 。

Function2<T1, T2, R> R call(T1, T2) 收 出 ,用 aggregate()

fold()等 作中

FlatMapFunction<T, R> Iterable<R> call(T) 收 任 出,用 flatMap()

的 作中

的 数 定义 用 3-22 , 具

3-23 。

3-22: Java 中 用 进行 数

RDD<String> errors = lines.filter(new Function<String, Boolean>() { public Boolean call(String x) { return x.contains("error"); } });

3-23: Java 中 用具 进行 数

class ContainsError implements Function<String, Boolean>() { public Boolean call(String x) { return x.contains("error"); } }

RDD<String> errors = lines.filter(new ContainsError());

具 的 决 人 。 发 具 大 序时

。 用 数的 处 它 的 造 数 加 数, 3-24 所 。

3-24:带 数的 Java 数

class Contains implements Function<String, Boolean>() { private String query;

public Contains(String query) { this.query = query; } public Boolean call(String x) { return x.contains(query); } }

RDD<String> errors = lines.filter(new Contains("error"));

Java 8 中, 用lambda 式 简 地 数 。由 本书 作时,

Java 8 新, 的 用了 版本的Java, 的 法 定义

数 。 , 用lambda 式, 的 会 3-25 所 。

3-25: Java 中 用 Java 8 地 lambda 式进行 数

RDD<String> errors = lines.filter(s -> s.contains("error"));

用Java 8 的 lambda 式 , Oracle 的 http://docs.oracle.

com/javase/tutorial/java/javaOO/lambdaexpressions.html 及Databricks Spark 中 用lambda 式的 http://databricks.com/blog/2014/04/14/spark-with-java-8.html 。

和lambda 式 用方法中 的任 final 量,

Python 和 Scala 中 量 Spark。

3.5

本 会 Spark 中大 分 的 化 作和行 作。 特定数据 的RDD

加 作, ,数字 的RDD 计 数 作,而 式的

RDD 据 合数据的 作。 会 中讲

RDD , 及 应的特 作。

3.5.1 RDD

讲讲 化 作和行 作 任 数据 的RDD 。

1.

能会用 的 用的 化 作 map()和filter() 图3-2 。 化 作

map() 收 数, 数用 RDD 中的 元 , 数的 作

RDD 中 应元 的 。而 化 作filter() 收 数, RDD 中 数的 元 新的RDD 中 。

图3-2:从输入 RDD 映射与筛选得到的 RDD

用map() 的 : 的URL 集合中的 URL 应的

出 , 简 数字 方 。map()的 需要和

。 有 字 RDD, 的map() 数 用 字 解析

Double 的, 时 的 RDD RDD[String],而 出

RDD[Double]。

简 的 子,用map() RDD 中的所有数 方 3-26 3-28 所 。 3-26:Python 版计算 RDD 中 的 方

nums = sc.parallelize([1, 2, 3, 4])

squared = nums.map(lambda x: x * x).collect() for num in squared:

print "%i " % (num)

3-27:Scala 版计算 RDD 中 的 方

val input = sc.parallelize(List(1, 2, 3, 4)) val result = input.map(x => x * x)

println(result.collect().mkString(",")) 3-28:Java 版计算 RDD 中 的 方

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));

JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() { public Integer call(Integer x) { return x*x; }

});

System.out.println(StringUtils.join(result.collect(), ","));

有时 , 元 生成 出元 。 能的 作 作flatMap()。

和map() , flatMap()的 数 分 应用 了 RDD 的 元 。

的 元 ,而 序列的迭代 。 出的RDD 由迭代

成的。 的 迭代 问的所有元 的RDD。flatMap()的 简

用 的字 分 , 3-29 3-31 所 。

3-29:Python 中的flatMap() 行数据 分 lines = sc.parallelize(["hello world", "hi"]) words = lines.flatMap(lambda line: line.split(" ")) words.first() # "hello"

3-30:Scala 中的flatMap() 行数据 分

val lines = sc.parallelize(List("hello world", "hi")) val words = lines.flatMap(line => line.split(" ")) words.first() // "hello"

3-31:Java 中的flatMap() 行数据 分

JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String line) {

return Arrays.asList(line.split(" "));

} });

words.first(); // "hello"

图3-3 中 了flatMap()和map()的区 。 flatMap() 作 的迭代

, 了 由 列 中的元 成的RDD,而 由列 成的RDD。

图3-3:RDD 的flatMap()和map()的区别 2.

RDD 本 义 的集合, 它 数学 的集合 作, 合 和

交 作。图3-4 了 作。 , 作 要 作的RDD 同数据 的。

的RDD 中 的集合 元 的 , 有 的元 。

要 的元 , 用RDD.distinct() 化 作 生成 同元 的新

RDD。 需要 ,distinct() 作的开 大, 它需要 所有数据 网络进行

shuffle , 保 元 有 。第4 会 数据 , 及 数据 。

图3-4:一些简单的集合操作

简 的集合 作 union(other),它会 RDD 中所有元 的 RDD。

用 有用, 处理 自 数据 的 件。 数学中的union() 作

同的 , 的RDD 中有 数据,Spark 的union() 作 会 数据

有 要, distinct() 同的效 。

Spark 了intersection(other)方法, RDD 中 有的元 。intersection()

运 行 时 会 所 有 的 元 RDD 的 元 会 。

intersection() union()的 ,intersection()的 能 要 , 它需要

网络 数据 发 共有的元 。

有 时 需 要 数 据。subtract(other) 数 收 RDD 作 数,

由 存 第 RDD 中而 存 第 RDD 中的所有元 成的RDD。和

intersection() ,它 需要数据 。

计算 RDD 的 , 图3-5 所 。cartesian(other) 化 作会

所有 能的(a, b) , 中a RDD 中的元 ,而b 自 RDD。

所有 能的 合的 时 有用, 计算 用 的

。 RDD 自 的 , 用 用 的应用

中。 要特 的 , 大 RDD 的 开 大。

图3-5:两个 RDD 的笛卡儿积

3-2 和 3-3 了 的RDD 化 作。

表3-2:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作

map() 数应用 RDD 中的

distinct() rdd.distinct() {1, 2, 3}

sample(withRe placement, fra ction, [seed])

RDD rdd.sample(false, 0.5) 定的

表3-3:对数据分别为{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作

union() 生成 RDD 中所有元

RDD

rdd.union(other) {1, 2, 3, 3, 4, 5}

intersection() RDD 共同的元 的 RDD rdd.intersection(other) {3}

subtract() RDD 中的

Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) { return x + y; }

});

fold()和reduce() , 收 reduce() 收的 数 同的 数, 加 sumCount = nums.aggregate((0, 0),

(lambda acc, value: (acc[0] + value, acc[1] + 1),

(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))) return sumCount[0] / float(sumCount[1])

3-36:Scala 中的aggregate()

val result = input.aggregate((0, 0))(

(acc, value) => (acc._1 + value, acc._2 + 1),

(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) val avg = result._1 / result._2.toDouble

3-37:Java 中的aggregate()

class AvgCount implements Serializable { public AvgCount(int total, int num) { this.total = total;

this.num = num;

}

public int total;

public int num;

public double avg() {

return total / (double) num;

} }

Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() { public AvgCount call(AvgCount a, Integer x) {

Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() { public AvgCount call(AvgCount a, AvgCount b) { a.total += b.total;

a.num += b.num;

return a;

} };

AvgCount initial = new AvgCount(0, 0);

AvgCount result = rdd.aggregate(initial, addAndCount, combine);

System.out.println(result.avg());

本RDD 的 作, 出它 的行 。count()用

元 的 数,而countByValue() 应的计数的 。 3-4

了 行 作。

表3-4:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作

collect() RDD 中的所有元 rdd.collect() {1, 2, 3, 3}

count() RDD 中的元 rdd.count() 4

countByValue() RDD 中出 的次数 rdd.countByValue() {(1, 1),

(2, 1),

RDD 中 rdd.takeSample(false, 1) 定的

reduce(func) RDD 中 所 有 数 据

SparkContext._ 用 式 。 SparkContext 的Scala http://spark.

apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext$ 中 所列出的 式

。 式 式地 RDD , DoubleRDDFunctions 数 数据的RDD 和PairRDDFunctions RDD , 有了 mean()和

variance() 的 的 数。

式 大, 会 阅读代 的人 。 RDD 用了 mean()

的 数, 能 会 发 RDD 的Scala http://spark.apache.org/docs/latest/api/scala/

index.html#org.apache.spark.rdd.RDD 中 本 有mean() 数。 用 所 能 成 ,

式 RDD[Double] DoubleRDDFunctions。 Scala 中

数时, 要 了 的 用 中的 数。

DoubleFlatMapFunction<T> Function<T, Iterable<Double>> flatMapToDouble 生成DoubleRDD

DoubleFunction<T> Function<T, Double> mapToDouble 生成

DoubleRDD

PairFlatMapFunction<T, K, V> Function<T, Iterable<Tuple2<K, V>>> flatMapToPair PairRDD<K, V>

PairFunction<T, K, V> Function<T, Tuple2<K, V>> mapToPair 生 成 PairRDD<K, V>

3-28 生成 JavaDoubleRDD、计算RDD 中 元 的 方 的 ,

3-38 所 。 用DoubleRDD 有的 数了, mean()和variance()。 3-38:用 Java DoubleRDD

JavaDoubleRDD result = rdd.mapToDouble(

new DoubleFunction<Integer>() { public double call(Integer x) { return (double) x * x;

} });

System.out.println(result.mean());

3. Python

Python 的 API Java 和 Scala 有所 同。 Python 中,所有的 数 本的

RDD 中, 作 应的RDD 数据 , 会 运行时 。

3.6 ( )

所 ,Spark RDD 的,而有时 能 次 用同 RDD。 简 地 RDD 用行 作,Spark 次 会 算RDD 及它的所有 。 迭代算法中

大, 迭代算法 会 次 用同 数据。 3-39 RDD 作 次计

数、 RDD 出的 子。

3-39:Scala 中的 次执行

val result = input.map(x => x*x) println(result.count())

println(result.collect().mkString(","))

了 次计算同 RDD, Spark 数据进行 化。 Spark 化

存 RDD 时,计算出 RDD 的 会分 保存它 所 出的分区数据。 有 化数据的 发生 ,Spark 会 需要用 存的数据时 算 的数据分区。

的 会 的执行速 , 数据 。

出 同的 的, RDD 同的 化 3-6 所 。 Scala

3-40 和 Java 中, persist()会 数据 序列化的 式 存 JVM 的 中。 Python 中, 会 序列化要 化存 的数据,所 化

序列化 的 存 JVM 中。 数据 者 存 时,

用序列化 的数据。

表3-6:org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级 别;如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份

     

MEMORY_ONLY

MEMORY_ONLY_SER

MEMORY_AND_DISK 中等 数据 存中

MEMORY_AND_DISK_SER 数据 存中

存中存 序列化 的数据

DISK_ONLY

存 能, 用Tachyon http://tachyon-project.org/ 作

。 Spark 的 存 有 ,

Tachyon 运 行Spark 的 http://tachyon-project.org/Running-Spark-on-Tachyon.html 。

3-40: Scala 中 用persist() val result = input.map(x => x * x) result.persist(StorageLevel.DISK_ONLY) println(result.count())

println(result.collect().mkString(","))

, 第 次 RDD 用行 作 用了persist()方法。persist()

用本 会 发 制 。

要 存的数据 , 存中 ,Spark 会自 用 用 LRU 的 存

的分区 存中 。 数据存 存中的 存 , 次要用

的分区时, 分区 需要 新计算。 用 存 的 存 的

分区 , 的分区 会 。 , 心 的作 存

了 数据而 打 。 , 存 要的数据会 有用的数据 出 存,带

算的时 开 。

,RDD 有 方法 作unpersist(), 用 方法 化的RDD

存中 。

3.7

本 中, 了RDD 运行 及RDD 的 作。 读 了 ,

—— 学 了Spark 的所有核心 。 进行 行 合、分 等 作时,

需要 用 式的RDD。 会讲解 式的RDD 的特 作。

, 会 数据 的 出, 及 用SparkContext 的进 题。

第 4 章

键值对操作

RDD Spark 中 作所需要的 数据 。本 作

RDD。 RDD 用 进行 合计算。 要 ETL 、

化、 作 数据 化 式。 RDD 了 新的 作

计 的 , 数据中 同的分 , 同的RDD 进行分 合 等 。

本 会 用 用 制 RDD 分 的高 特 :分区。有时,

用 的分区方式 问的数据 同 , 大大 应用的

开 。 会带 的 能 。 会 用PageRank 算法 分区的作用。 分

式数据集 的分区方式和 本地数据集 合适的数据 ——

,数据的分 会 地 序的 能 。

4.1

Spark 的RDD 了 有的 作。 RDD pair RDD1。Pair

RDD 序的 成要 , 它 了 行 作 新进行数据分

的 作 。 ,pair RDD reduceByKey()方法, 分 应的数据,

有join()方法, RDD 中 同的元 合 ,合 RDD。

RDD 中 字 代 件时 、用 ID 者 的字 ,

用 字 作 pair RDD 作中的 。

1: pair RDD RDD , 发 义,译 中保 pair RDD 。——译者

4.2 Pair RDD

Spark 中有 pair RDD 的方式。第 5 会讲 , 存 的数据 式会

读 时 由 数据 成的pair RDD。 , 需要 的RDD

pair RDD 时, 用map() 数 , 的 数需要 。 会 由 本行 成的RDD 行的第 的pair RDD。

RDD 的方法 同的 中会有所 同。 Python 中, 了 的

数据能 数中 用,需要 由 元 成的RDD 4-1 。

4-1: Python 中 用第 作 出 pair RDD pairs = lines.map(lambda x: (x.split(" ")[0], x))

Scala 中, 了 的数据能 数中 用,同 需要 元

4-2 。 式 元 RDD 加的 数。

4-2: Scala 中 用第 作 出 pair RDD val pairs = lines.map(x => (x.split(" ")(0), x))

Java 有自带的 元 , Spark 的 Java API 用 用scala.Tuple2

元 。 简 :Java 用 new Tuple2(elem1, elem2) 新的 元

, ._1()和._2()方法 问 中的元 。

Java 用 需要 用 的Spark 数 pair RDD。 ,要 用mapToPair() 数

代 版的map() 数, 3.5.2 中的 Java 有 的 。

4-3 中 简 的 子。

4-3: Java 中 用第 作 出 pair RDD PairFunction<String, String, String> keyData =

new PairFunction<String, String, String>() { public Tuple2<String, String> call(String x) { return new Tuple2(x.split(" ")[0], x);

} };

JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);

用Scala 和 Python 存中的数据集 pair RDD 时, 需要 由 元 成

的集合 用SparkContext.parallelize()方法。而要 用Java 存数据集 pair RDD

的 , 需要 用SparkContext.parallelizePairs()。

4.3 Pair RDD

Pair RDD 用所有 RDD 的 用的 化 作。3.4 中 的所有有 数

的 同 适用 pair RDD。由 pair RDD 中 元 ,所 需要 的 数应

groupByKey() 具有 同 的 进行分 rdd.groupByKey() {(1,

[2]),

rdd.mapValues(x => x+1) {(1, 3), (3, 5), (3, 7)}

flatMapValues(func) pair RDD 中 的 应 用

迭代 的 数,

的 元 生成

应 的 记 。

用 号化

rdd.flatMapValues(x => (x to 5)) {(1, 2), (1,

sortByKey() 据 序的RDD rdd.sortByKey() {(1,

2), (3, 4), (3, 6)}

表4-2:针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})

subtractByKey RDD 中 other RDD 中的 同的元

rdd.subtractByKey(other) {(1, 2)}

join RDD 进行 rdd.join(other) {(3, (4, 9)), (3,

(6, 9))}

rightOuterJoin RDD 进行 作, 保第

RDD 的 存

rdd.rightOuterJoin(other) {(3,(Some(4),9)), (3,(Some(6),9))}

leftOuterJoin RDD 进行 作, 保第

RDD 的 存

rdd.leftOuterJoin(other) {(1,(2,None)), (3, (4,Some(9))), (3, (6,Some(9)))}

cogroup RDD 中 有 同 的数据分 rdd.cogroup(other) {(1,([2],[])), (3,

([4, 6],[9]))}

的 会 pair RDD 的 数。

Pair RDD RDD 元 Java Scala 中的 Tuple2 Python 中的元 ,

同 RDD 所 的 数。 , 中的pair RDD,

20 字 的行, 4-4 4-6 及图4-1 所 。 4-4:用 Python 第 元 进行

result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20) 4-5:用 Scala 第 元 进行

pairs.filter{case (key, value) => value.length < 20}

4-6:用 Java 第 元 进行

Function<Tuple2<String, String>, Boolean> longWordFilter = new Function<Tuple2<String, String>, Boolean>() { public Boolean call(Tuple2<String, String> keyValue) { return (keyValue._2().length() < 20);

} };

JavaPairRDD<String, String> result = pairs.filter(longWordFilter);

图4-1:根据值筛选

有时, 问pair RDD 的 分, 时 作 元 。由 的

用 式, Spark 了mapValues(func) 数, 能 map{case (x, y): (x,

func(y))}。 子中 用 数。

次 pair RDD 的 作, 合 作开 。

  

4.3.1

数据集 式 的时 , 合具有 同 的元 进行 计 的

作。 讲解 RDD 的fold()、combine()、reduce()等行 作,pair RDD

有 应的 的 化 作。Spark 有 的 作, 合具有 同 的 。

作 RDD, 它 化 作而 行 作。

reduceByKey() reduce() 它 收 数, 用 数 进行合 。

reduceByKey()会 数据集中的 进行 行的 作, 作会 同的 合

。 数据集中 能有大量的 ,所 reduceByKey() 有 用 序

的行 作。 ,它会 由 和 应 出 的 成的新的RDD。

foldByKey() fold() 它 用 RDD 和合 数中的数据

同的 作 。 fold() ,foldByKey() 作所 用的合 数

元 进行合 , 元 。

4-7 和 4-8 所 , 用reduceByKey()和mapValues() 计算 的 应 的 图4-2 。 和 用fold()和map()计算 RDD 的 。

, 用 加 用的 数 同 的 , 会讲 。

4-7: Python 中 用reduceByKey()和mapValues()计算 应的

rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) 4-8: Scala 中 用reduceByKey()和mapValues()计算 应的

rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

图4-2:求每个键平均值的数据流

MapReduce 中的合 combiner 的读者 能 ,

用reduceByKey()和foldByKey()会 计 算 的

自 台 进行本地合 。用 需要 定合 。 化的

combineByKey() 自定义合 的行 。

用 4-9 4-11 中 的方法 解决 的分 式 计数问题。

用 中讲 的flatMap() 生成 、 数字1 的pair RDD,

4-7 和 4-8 中 , 用reduceByKey() 所有的 进行计数。

4-9:用 Python 计数 rdd = sc.textFile("s3://...")

words = rdd.flatMap(lambda x: x.split(" "))

result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) 4-10:用 Scala 计数

val input = sc.textFile("s3://...")

val words = input.flatMap(x => x.split(" "))

val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y) 4-11:用 Java 计数

JavaRDD<String> input = sc.textFile("s3://...")

JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } });

JavaPairRDD<String, Integer> result = words.mapToPair(

new PairFunction<String, String, Integer>() {

public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); } }).reduceByKey(

new Function2<Integer, Integer, Integer>() {

new Function2<Integer, Integer, Integer>() {

在文檔中 快速大数据分析 (頁 52-0)

相關文件