• 沒有找到結果。

PageRank

在文檔中 快速大数据分析 (頁 80-0)

4.5  数据分区 进

4.5.4 PageRank

PageRank RDD 分区中 的 的算法, 它 进行分析。PageRank 算法 Google 的 · Larry Page 的 字 的,用 据

的 , 集合中 的 要 量 。 算法 用 网 进行

序, , 用 序 技 社交网络中有 的用 。

PageRank 执行 次 的 迭代算法, 它 RDD 分区 作的 的用 。

算法会 数据集: 由(pageID, linkList)的元 成, 的

的列 由(pageID, rank)元 成, 的 序 。它

进行计算。

(1) 的 序 化 1.0。

val links = sc.objectFile[(String, Seq[String])]("links") .partitionBy(new HashPartitioner(100)) .persist()

// 的 序 化 1.0 由 用mapValues,生成的RDD

// 的分区方式会和"links"的

var ranks = links.mapValues(v => 1.0) // 运行10 PageRank迭代

for(i <- 0 until 10) {

val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) =>

links.map(dest => (dest, rank / links.size)) }

ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v) }

序 0.15 + 0.85 * contributionsReceived。

代 本 简 , 序 了 保RDD 高效的方式进

化 PageRank 的 的MapReduce 了 的网

络 开 。

(2) 出 同 的 , 用links的persist()方法, 它保 存中 次迭代 用。

(3) 第 次 ranks时, 用mapValues()而 map() 保 RDD links

的分区方式, 它进行的第 次 作 会开 。

(4) 中, reduceByKey() 用mapValues() reduceByKey()的

分区的了, , 次 中 作的 次 links进行

作时 会 加高效。

了 大化分区 化的 作用, 应 需 元 的 时 量

用mapValues() flatMapValues()。

4.5.5 分

Spark 的HashPartitioner RangePartitioner 能 大 数 用 , Spark 自定义的Partitioner 制RDD 的分区方式。

用领 进 开 。

子, 要 网 的集合 运行 中的PageRank 算法。 ,

的ID RDD 中的 的URL。 用简 的 数进行分区时,

有 的URL 的 http://www.cnn.com/WORLD 和 http://www.cnn.com/US 能

会 分 同的 。 而, 道 同 的网 有 能 互 。

由 PageRank 需要 次迭代中 它所有 的 发 ,

分 同 分区中会 。 用自定义的分区 据 而

URL 分区。

要 自定义的分区 , 需要 org.apache.spark.Partitioner 方法。

• numPartitions: Int: 出 的分区数。

• getPartition(key: Any): Int: 定 的分区编号 0 numPartitions-1 。

• equals():Java 等 的 方法。 方法的 要,Spark 需要用

方法 的分区 和 分区 同, Spark

RDD 的分区方式 同。

有 问题需要 , 的算法 Java 的hashCode()方法时, 方法有 能会

数。 需要 分 , 保getPartition() 远 数。

4-26 了 编 的 的分区 , 分区 URL 中的

分 。

4-26:Scala 自定义分区方式

class DomainNamePartitioner(numParts: Int) extends Partitioner { override def numPartitions: Int = numParts

override def getPartition(key: Any): Int = {

val domain = new Java.net.URL(key.toString).getHost() val code = (domain.hashCode % numPartitions)

if(code < 0) {

override def equals(other: Any): Boolean = other match { case dnp: DomainNamePartitioner =>

dnp.numPartitions == numPartitions

DomainNamePartitioner, 成 时自 进行 和Java 中的instanceof()

的。

用自定义的Partitioner 的: 要 它 partitionBy()方法 。Spark 中

有 数据 的方法, join()和groupByKey(),它 收 的

return hash(urlparse.urlparse(url).netloc) rdd.partitionBy(20, hash_domain) # 20 分区

, 所 的 数会 RDD 的分区 数区分开 。 要

RDD 用 同的分区方式, 应 用同 数 , 数,而

RDD 新的 数 。

4.6

本 学 了 用Spark 的 的 数 作 数据。第3 中讲 的技

同 适用 pair RDD。 中 会 读 和保存数据。

第 5 章

数据读取与保存

本 工 和数据 学 用。工 会了解 的 出 式,有

适合用 处理 序的 式。数据 学 能 心数据的 有的 式。

5.1

学了 Spark 中 分发的数据执行的 作。 ,所 的

本地集合 者 件中进行数据读 和保存的。 有时 ,数据量 能大 法

台 中, 时 需要 的数据读 和保存的方法了。

Spark 出 。 分 Spark 本 Hadoop 生 而 ,特

Spark Hadoop MapReduce 所 用的InputFormat和OutputFormat 问 数据,而大 分 的 件 式 存 S3、HDFS、Cassandra、HBase 等

1 5.2.6 了 用 式。

, 出的高 API 会 用。 运的 ,Spark 及 生

了 方 。本 会 的数据 。

• 文件格式与文件系统

存 本地 件 分 式 件 NFS、HDFS、Amazon S3 等 中的 数据,Spark 问 同的 件 式, 本 件、JSON、SequenceFile,

及protocol buffer。 会 式的用法, 及Spark 同 件

的 和 。

1:InputFormatOutputFormat MapReduce 中用 数据 的Java API。

• Spark SQL中的结构化数据源

第9 会 Spark SQL ,它 JSON 和 Apache Hive 的 化数据

, 了 加简 高效的API。 处会 地 用Spark

SQL,而大 分 第9 讲解。

• 数据库与键值存储

本 会 Spark 自带的 和 第 方 ,它 用 Cassandra、HBase、

Elasticsearch 及JDBC 。

的大 数方法 Spark 所 的 编 , 有

Java 和 Scala。 会 出。

5.2

Spark 件 式的读 和保存方式 简 。 本 件的 化的 件,

JSON 式的 化的 件, SequenceFile 的 化的 件,Spark 5-1 。Spark 会 据 件 应的处理方式。

的, 用 。

表5-1:Spark支持的一些常见格式

本 件 的 本 件, 行 记

JSON 化 的 本的 式, 化 大 数 要 行 记

CSV 的 本的 式, 电子 应用中 用

SequenceFiles 用 数据的 Hadoop 件 式

Protocol buffers 快速、 的 式

件 用 Spark 作 中的数据存 共 的代 读 。 的时

它会 效, 它 Java 序列化

了Spark 中 的 出 制, 数据 成 数据 用Hadoop 的新

件API。由 Hadoop 要 用 数据,所 能 用, 有 式

了 。 会 的 式, 用 的 null 。

5.2.1

Spark 中读 本 件 。 本 件读 RDD 时, 的 行

会成 RDD 的 元 。 的 本 件 次 读 pair RDD,

中 件 , 件 。

1.

需要 用 件路 作 数 用SparkContext 中的textFile() 数, 读 本 件, 5-1 5-3 所 。 要 制分区数的 , 定minPartitions。

5-1: Python 中读 本 件

input = sc.textFile("file:///home/holden/repos/spark/README.md") 5-2: Scala 中读 本 件

val input = sc.textFile("file:///home/holden/repos/spark/README.md") 5-3: Java 中读 本 件

JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")

件 数据所有 分的 的 式出 , 用 方式 处

理。 用textFile 数, 作 数, 它会 分 读 RDD

中。有时 有 要 道数据的 分分 自 件 件 中的时

数据 ,有时 同时处理 件。 件 , 用SparkContext.

wholeTextFiles()方法, 方法会 pair RDD, 中 件的 件 。

wholeTextFiles() 件 特定时 的数据时 有用。 有 同

数据的 件, 地 出 的 , 5-4 所 。

5-4: Scala 中 件的

val input = sc.wholeTextFiles("file://home/holden/salesFiles") val result = input.mapValues{y =>

val nums = y.split(" ").map(x => x.toDouble) nums.sum / nums.size.toDouble

}

Spark 读 定 中的所有 件, 及 路 中 用 字

part-*.txt 。大 数据集 存 件中, 特 有

用, 同 中存 的 件 成 记 件 的时 。

2.

出 本 件 简 。 5-5 中 的saveAsTextFile()方法 收 路 ,

RDD 中的 路 应的 件中。Spark 的路 作 ,会

出 件。 ,Spark 行 出了。 方法中,

能 制数据的 分 出 件中, 有 出 式 制。

5-5: Python 中 数据保存 本 件 result.saveAsTextFile(outputFile)

5.2.2 JSON

。Python 中 用 的 的 https://docs.python.org/2/library/json.html,

5-6 ,而 Java 和 Scala 中 会 用Jackson http://jackson.codehaus.org/, 5-7 和

5-8 。 所 , 它 能 ,而 用 简 。

解析 了大量的时 , 应 Scala http://engineering.ooyala.com/blog/

comparing-scala-json-libraries Java http://geokoder.com/java-json-libraries-comparison 中 的JSON 。

5-6: Python 中读 化的JSON import json

data = input.map(lambda x: json.loads(x))

Scala 和 Java 中, 记 读 代 的 中。 中 能 需

case class Person(name: String, lovesPandas: Boolean) //

...

// 解析 特定的case class。 用flatMap, 问题时 列 None

// 处理 ,而 有问题时 元 的列 Some(_)

val result = input.flatMap(record => { try {

Some(mapper.readValue(record, classOf[Person])) } catch {

case e: Exception => None }})

5-8: Java 中读 JSON

class ParseJson implements FlatMapFunction<Iterator<String>, Person> { public Iterable<Person> call(Iterator<String> lines) throws Exception { ArrayList<Person> people = new ArrayList<Person>();

ObjectMapper mapper = new ObjectMapper();

while (lines.hasNext()) { String line = lines.next();

try {

people.add(mapper.readValue(line, Person.class));

} catch (Exception e) {

JavaRDD<String> input = sc.textFile("file.json");

JavaRDD<Person> result = input.mapPartitions(new ParseJson());

处理 式 的记 有 能会 的问题, JSON 5-9: Python 保存 JSON

(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)) .saveAsTextFile(outputFile))

5-10: Scala 中保存 JSON

result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_)) .saveAsTextFile(outputFile)

5-11: Java 中保存 JSON

class WriteJson implements FlatMapFunction<Iterator<Person>, String> { public Iterable<String> call(Iterator<Person> people) throws Exception { ArrayList<String> text = new ArrayList<String>();

ObjectMapper mapper = new ObjectMapper();

while (people.hasNext()) { Person person = people.next();

text.add(mapper.writeValueAsString(person));

}

return text;

} }

JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(

new LikesPandas());

JavaRDD<String> formatted = result.mapPartitions(new WriteJson());

formatted.saveAsTextFile(outfile);

, 有的 作 本数据的 制和JSON , 用Spark 地读 和 保存JSON 数据了。

5.2.3 分 分

号分 CSV 件 行 有 定数 的字 ,字 用 号 开 制 分

件, TSV 件中用制 开 。记 行 , ,有时

行。CSV 件和TSV 件有时 的 , 要 处理 行 、 义

字 、 ASCII 字 、 数 等方 。CSV 生 字 ,所 需要 合

和分解特定的字 。

JSON 中的字 的 , 的 记 有 的字 , 能 应的

序号。 法 用第 行中 列的 作 字 。

1. CSV

读 CSV/TSV 数据和读 JSON 数据 , 需要 件 作 本 件 读 数

据, 数据进行处理。由 式 的 ,同 的 同版本有时 会用 同的方

式处理 数据。

JSON ,CSV 有 同的 , 中 用 。同 ,

Python 会 用自带的csv https://docs.python.org/2/library/csv.html 。 Scala 和 Java 中 用opencsv http://opencsv.sourceforge.net/ 。

Hadoop InputFormat 中的CSVInputFormat http://docs.oracle.com/cd/E27101_01/

appdev.10/e20858/oracle/hadoop/loader/examples/CSVInputFormat.html

用 Scala 和 Java 中读 CSV 数据。 它 行 的记 。

的CSV 的所有数据字 有 行 , 用textFile()读 解析数据, 5-12 5-14 所 。

5-12: Python 中 用textFile()读 CSV import csv

import StringIO

...

def loadRecord(line):

"""解析 行CSV记 """

input = StringIO.StringIO(line)

reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) return reader.next()

val input = sc.textFile(inputFile) val result = input.map{ line =>

val reader = new CSVReader(new StringReader(line));

reader.readNext();

}

5-14: Java 中 用textFile()读 CSV import au.com.bytecode.opencsv.CSVReader;

import Java.io.StringReader;

...

public static class ParseLine implements Function<String, String[]> { public String[] call(String line) throws Exception {

CSVReader reader = new CSVReader(new StringReader(line));

return reader.readNext();

} }

JavaRDD<String> csvFile1 = sc.textFile(inputFile);

JavaPairRDD<String[]> csvData = csvFile1.map(new ParseLine());

字 中 有 行 , 需要 读 件, 解析 , 5-15 5-17

input = StringIO.StringIO(fileNameContents[1])

reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader

fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)

5-16: Scala 中 读 CSV

case class Person(name: String, favoriteAnimal: String) val input = sc.wholeTextFiles(inputFile)

val result = input.flatMap{ case (_, txt) =>

val reader = new CSVReader(new StringReader(txt));

reader.readAll().map(x => Person(x(0), x(1))) }

5-17: Java 中 读 CSV public static class ParseLine

implements FlatMapFunction<Tuple2<String, String>, String[]> {

public Iterable<String[]> call(Tuple2<String, String> file) throws Exception { CSVReader reader = new CSVReader(new StringReader(file._2()));

return reader.readAll();

} }

JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);

JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());

有 分 件, 需要 用wholeFile()方法, 能 需要 5-18: Python 中 CSV

def writeRecords(records):

""" 出 CSV记 """

output = StringIO.StringIO()

writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"]) for record in records:

writer.writerow(record) return [output.getvalue()]

pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile) 5-19: Scala 中 CSV

pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray) .mapPartitions{people =>

val stringWriter = new StringWriter();

val csvWriter = new CSVWriter(stringWriter);

csvWriter.writeAll(people.toList) Iterator(stringWriter.toString) }.saveAsTextFile(outFile)

能 , 的 子 能 道所要 出的所有字 时 用。 而,

字 运行时由用 决定的, 要 用 的方法了。 简 的方法 所

有的数据, 同的 , 分 出。

5.2.4 SequenceFile

SequenceFile 由 有 的 件 成的 用Hadoop 式。SequenceFile

件有同 记,Spark 用它 定 件中的 , 记 的

。 Spark 用 高效地 行读 SequenceFile 件。SequenceFile Hadoop MapReduce 作 中 用的 出 式,所 用 有的Hadoop

,数据 有 能 SequenceFile 的 式 用的。

由 Hadoop 用了 自定义的序列化 , SequenceFile 由 Hadoop 的 Writable 的元 成。 5-2 列出了 的数据 及它 应的Writable 。 的 法 的 加 Writable , 它 org.apache.hadoop.

io.Writable http://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/io/Writable.html 的 子 。 法 要 出的数据 应的Writable 自定义的case class ,

org.apache.hadoop.io.Writable中的readfields和write 自 的Writable 。

Hadoop 的 RecordReader 会 记 用同 , 用RDD

的cache会 , 需要 用 简 的map() 作

存 。 有, Hadoop Writable 有 java.io.Serializable

, 了 它 能 RDD 中 用, 要用map() 它 。23

表5-2:Hadoop Writable类型对应表

Int Integer IntWritable VIntWritable2 Long Long LongWritable VLongWritable2

Float Float FloatWritable

Double Double DoubleWritable Boolean Boolean BooleanWritable Array[Byte] byte[] BytesWritable

String String Text

Array[T] T[] ArrayWritable<TW>3 List[T] List<T> ArrayWritable<TW>3 Map[A, B] Map<A, B> MapWritable<AW, BW>3

2: 和 存 定 的 式。存 数字12 据的 和存 数字2**30 据的 。

有大量的 数据, 应 用 的 VIntWritableVLongWritable,它 存

数 时 用 的 。

3: 用Writable 。

Spark 1.0 及 版本中,SequenceFile 能 Java 和 Scala 中 用, Spark 1.1 加 了 Python 中读 和保存SequenceFile 的 能。 要 , 需要 用Java Scala 自 定 义Writable 。Spark 的 Python API 能 Hadoop 中 存 的 本 Writable Python , 量 用的getter 方法处理 的 。

1. SequenceFile

Spark 有 用 读 SequenceFile 的 。 SparkContext 中, 用sequenceFile(path,

keyClass, valueClass, minPartitions)。 ,SequenceFile 用Writable ,

keyClass和valueClass 数 用 的Writable 。 子, 要

SequenceFile 中读 人员 及 所 的 数 。 子中,keyClass Text,

而valueClass IntWritable VIntWritable。 了方 , 5-20 5-22 中

用IntWritable。

5-20: Python 读 SequenceFile val data = sc.sequenceFile(inFile,

"org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable") 5-21: Scala 中读 SequenceFile

val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).

map{case (x, y) => (x.toString, y.get())}

5-22: Java 中读 SequenceFile

public static class ConvertToNativeTypes implements

PairFunction<Tuple2<Text, IntWritable>, String, Integer> {

public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) { return new Tuple2(record._1.toString(), record._2.get());

} }

JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class, IntWritable.class);

JavaPairRDD<String, Integer> result = input.mapToPair(

new ConvertToNativeTypes());

Scala 中有 方 的 数 自 Writable 应的Scala

。 用sequenceFile[Key, Value](path, minPartitions) Scala

生数据 的RDD,而 需 定keyClass和valueClass。

2. SequenceFile

Scala 中 数据 出 SequenceFile 的 法 。 , SequenceFile 存 的

,所 需要 由 出 SequenceFile 的 成的PairRDD。 进行了 Scala 的 生 Hadoop Writable 的 式 ,所 要

出的 Scala 的 生 , 用saveSequenceFile(path)保存 的PairRDD,它

会 出数据。 和 能自 Writable , 者 用

VIntWritable , 数据进行 作, 保存 进行 。

的 子 人员 及 所 的 数 , 5-23 所 。

5-23: Scala 中保存 SequenceFile

val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2))) data.saveAsSequenceFile(outputFile)

Java 中保存 SequenceFile 要 , JavaPairRDD 有saveAsSequenceFile() 方法。 要 用Spark 保存自定义 Hadoop 式的 能 。5.2.6 会 用 Java SequenceFile 保存数据。

5.2.5

件 SequenceFile 的简 ,它 存 的RDD。和

SequenceFile 的 , 件 用Java 序列化 出的。

了 的 —— 增 了 字 —— 生成的 件

读了。 件 用Java 序列化,它 同 的 同版本有 定

的 , 需要 序员 。

件 用Java 序列化有 要 的地方。 ,和 的SequenceFile 同,

同 的 , 件的 出和Hadoop 的 出 。 次, 件 式 同的

, 件 用 Spark 作 的 。 ,Java 序列化有 能 。

要保存 件, 需 RDD 用saveAsObjectFile 行了。读 件 简

:用SparkContext 中的objectFile() 数 收 路 , 应的RDD。

了解了 用 件的 , 能 道 会有人要用它。 用

件的 要 它 用 保存 任 而 需要 的工作。

件 Python 中 法 用, Python 中的 RDD 和 SparkContext saveAsPickleFile()

和pickleFile()方法作 代。 用了Python 的 pickle 序列化 。 , 件的

同 适用 pickle 件:pickle 能 , 定义 , 生 的 数据 件 能 法 读出 。

5.2.6 Hadoop

了Spark 的 式 , 任 Hadoop 的 式交互。Spark 新

Hadoop 件API, 了 大的 。4 1. Hadoop

要 用新版的Hadoop API 读 件,需要 Spark 。newAPIHadoopFile

收 路 及 。 第 式 , 代 式。 的 数

hadoopFile() 用 用 的API 的Hadoop 式。第 的 ,

的 。 需要 定 的Hadoop , conf 。

KeyValueTextInputFormat 简 的Hadoop 式 , 用 本 件中读

数据 5-24 所 。 行 会 处理, 和 用制 开。

式存 Hadoop 中,所 需 工 中 加 的 能 用它。

5-24: Scala 中 用 式 API 读 KeyValueTextInputFormat()

val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{

case (x, y) => (x.toString, y.toString) }

学 了 读 本 件 加 解析 读 JSON 数据的方法。 ,

用自定义Hadoop 式 读 JSON 数据。 需要 的 ,

的 。Twitter 的 Elephant Bird https://github.com/

的 。Twitter 的 Elephant Bird https://github.com/

在文檔中 快速大数据分析 (頁 80-0)

相關文件