• 沒有找到結果。

HDFS

在文檔中 快速大数据分析 (頁 101-0)

5.3  件

5.3.3 HDFS

Hadoop 分 式 件 HDFS 用的 件 ,Spark 能 地 用它。

HDFS 计 价的 件 工作,有 地应 ,同时 高 量。

Spark 和 HDFS 同 , Spark 用数据分 量

网络开 。

Spark 中 用 HDFS 需要 出路 定 hdfs://master:port/path 了。

HDFS Hadoop 版本 而 化, 用的Spark

版本的Hadoop 编译的, 读 会 。 ,Spark

Hadoop 1.0.4 编译7。 代 编译, 量中 定SPARK_

HADOOP_VERSION= 版本的Hadoop 进行编译

编译 的Spark 版本。 据运行hadoop version的

量要 的 。

5.4 Spark SQL中 数据

Spark SQL Spark 1.0 中新加 Spark 的 件, 快速成 了 Spark 中 的 作

化和 化数据的方式。 化数据 的 有结构信息的数据—— 所有的数

据记 具有 字 的集合。Spark SQL 化数据 作 ,而 由

Spark SQL 道数据的 ,它 数据 中 读出所需字 。第9

地讲解Spark SQL, 用它 数据 中读 数据。

, SQL Spark SQL, 它 数据 执行 出

字 者 字 用 数 , 由Row 成的RDD, Row

记 。 Java 和 Scala 中,Row 的 问 的。 Row 有

get()方法,会 进行 。 有 本

的 用get()方 法 getFloat()、getInt()、getLong()、getString()、getShort()、

getBoolean()等 。 Python 中, 用row[column_number] 及row.column_name

问元 。

7:自 Spark 1.4.0 ,Spark 的Hadoop 版本 2.2.0。——译者

5.4.1 Apache Hive

5-30:用 Python HiveContext 数据

from pyspark.sql import HiveContext hiveCtx = HiveContext(sc)

rows = hiveCtx.sql("SELECT name, age FROM users") firstRow = rows.first()

print firstRow.name

5-31:用 Scala HiveContext 数据 import org.apache.spark.sql.hive.HiveContext

val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc) val rows = hiveCtx.sql("SELECT name, age FROM users") val firstRow = rows.first()

println(firstRow.getString(0)) // 字 0 name字 5-32:用 Java HiveContext 数据

import org.apache.spark.sql.hive.HiveContext;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SchemaRDD;

HiveContext hiveCtx = new HiveContext(sc);

SchemaRDD rows = hiveCtx.sql("SELECT name, age FROM users");

Row firstRow = rows.first();

System.out.println(firstRow.getString(0)); // 字 0 name字 会 9.3.1 地 Hive 中读 数据。

5.4.2 JSON

有记 的JSON 数据,Spark SQL 自 出它 的 ,

数据读 记 , 字 的 作 简 。要读 JSON 数

据, 需要和 用Hive HiveContext。 需要

Hive, 需要hive-site.xml 件。 用HiveContext.jsonFile方法

件中 由Row 成的RDD。 了 用 Row , RDD

册 张 , 中 出特定的字 。 , 有 的JSON 件,

式 5-33 所 , 行 记 。 5-33:JSON 中的

{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}

{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}

读 数据, 中 username 用 和text 本 字 , 5-34

5-36 所 。

5-34: Python 中 用 Spark SQL 读 JSON 数据 tweets = hiveCtx.jsonFile("tweets.json") tweets.registerTempTable("tweets")

results = hiveCtx.sql("SELECT user.name, text FROM tweets") 5-35: Scala 中 用 Spark SQL 读 JSON 数据

val tweets = hiveCtx.jsonFile("tweets.json") tweets.registerTempTable("tweets")

val results = hiveCtx.sql("SELECT user.name, text FROM tweets") 5-36: Java 中 用 Spark SQL 读 JSON 数据

SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);

tweets.registerTempTable("tweets");

SchemaRDD results = hiveCtx.sql("SELECT user.name, text FROM tweets");

会 9.3.3 用Spark SQL 读 JSON 数据 问 进行 。

,Spark SQL 的 远 读 数据, 数据、 RDD 所 的方式 的方式 合数据、 数据运行自定义 数, 第9 中讲 。

5.5 数据

数据 的Hadoop 者自定义的Spark ,Spark 问 用的

数据 。本 的 。

5.5.1 Java数据

Spark 任 Java 数 据 JDBC 的 数 据 中 读 数 据,

MySQL、Postgre 等 。 要 问 数 据, 需 要 org.apache.spark.rdd.

JdbcRDD, SparkContext 和 数 它。 5-37 了 用JdbcRDD

MySQL 数据 。

5-37:Scala 中的 JdbcRDD def createConnection() = {

Class.forName("com.mysql.jdbc.Driver").newInstance();

DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");

}

def extractValues(r: ResultSet) = { (r.getInt(1), r.getString(2)) }

val data = new JdbcRDD(sc,

createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",

lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues) println(data.collect().toList)

• 数的 数 出 java.sql.ResultSet http://docs.

oracle.com/javase/7/docs/api/java/sql/ResultSet.html 作数据有用的 式的 数。

5-37 中, 会 (Int, String) 。 数 ,Spark 会自 行

DataStax 开 用 Spark 的 Cassandra https://github.com/datastax/spark-cassandra-connector ,Spark Cassandra 的 大大 。 Spark 的 分,

需要 加 的 的 件中 能 用它。Cassandra 有 用

Spark SQL, 它会 由CassandraRow 成的RDD, 有 分方法 Spark SQL 的Row 的方法 同, 5-38 和 5-39 所 。Spark 的 Cassandra

能 Java 和 Scala 中 用。

5-38:Cassandra 的sbt

"com.datastax.spark" %% "spark-cassandra-connector" % "1.0.0-rc5",

"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.0.0-rc5"

8: 道 有 记 , 执行 计数 , 据 决定upperBound

lowerBound的 。

5-39:Cassandra 的Maven

<dependency> <!-- Cassandra -->

<groupId>com.datastax.spark</groupId>

<artifactId>spark-cassandra-connector</artifactId>

<version>1.0.0-rc5</version>

</dependency>

<dependency> <!-- Cassandra -->

<groupId>com.datastax.spark</groupId>

<artifactId>spark-cassandra-connector-java</artifactId>

<version>1.0.0-rc5</version>

</dependency>

Elasticsearch ,Cassandra 要读 作 决定 集 。

spark.cassandra.connection.host Cassandra 集 。 有用 和 的

, 需要分 spark.cassandra.auth.username和spark.cassandra.auth.password。

定 有 Cassandra 集 要 , SparkContext 时 , 5-40 和 5-41 所 。

5-40: Scala 中 Cassandra val conf = new SparkConf(true)

.set("spark.cassandra.connection.host", "hostname") val sc = new SparkContext(conf)

5-41: Java 中 Cassandra SparkConf conf = new SparkConf(true)

.set("spark.cassandra.connection.host", cassandraHost);

JavaSparkContext sc = new JavaSparkContext(

sparkMaster, "basicquerycassandra", conf);

Datastax 的 Cassandra 用Scala 中的 式 SparkContext 和 RDD

加 数。 式 , 读 数据 5-42 所 。

5-42: Scala 中 张 读 RDD

// SparkContext和RDD 加 数的 式

import com.datastax.spark.connector._

// 张 读 RDD。 的 test的

// CREATE TABLE test.kv(key text PRIMARY KEY, value int);

val data = sc.cassandraTable("test" , "kv")

// 打印出value字 的 本 计。

import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;

// 张 读 RDD。 的 test的

// CREATE TABLE test.kv(key text PRIMARY KEY, value int);

JavaRDD<CassandraRow> data = javaFunctions(sc).cassandraTable("test" , "kv");

// 打印 本 计。

System.out.println(data.mapToDouble(new DoubleFunction<CassandraRow>() { public double call(CassandraRow row) { return row.getInt("value"); } }).stats());

了读 张 , 数据集的子集。 cassandraTable()的 用中加 where 子 , 制 的数据, sc.cassandraTable(...).where("key=?", "panda")。 Cassandra 的RDD 保 存 Cassandra 中。 保 存 由

CassandraRow 成的RDD, 制数据 有用。 定列的

, 存 行的 式而 元 和列 的 式的RDD, 5-44 所 。 5-44: Scala 中保存数据 Cassandra

val rdd = sc.parallelize(List(Seq("moremagic", 1)))

rdd.saveToCassandra("test" , "kv", SomeColumns("key", "value"))

本 简 地 了Cassandra 。要了解 , 阅 的GitHub https://github.com/datastax/spark-cassandra-connector 。

5.5.3 HBase

由 org.apache.hadoop.hbase.mapreduce.TableInputFormat 的 ,Spark

Hadoop 式 问HBase。 式会 数据, 中 的 org.

apache.hadoop.hbase.io.ImmutableBytesWritable,而 的 org.apache.hadoop.hbase.

client.Result。Result 据 列 的 方 法, API https://hbase.

apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html 中有所 。

要 Spark 用 HBase, 需要 用 的 式 用SparkContext.newAPIHadoopRDD。 Scala 中的 5-45 所 。 val conf = HBaseConfiguration.create()

conf.set(TableInputFormat.INPUT_TABLE, "tablename") // 张 val rdd = sc.newAPIHadoopRDD(

conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],classOf[Result])

TableInputFormat 用 化 HBase 的读 的 , 制

分列中, 及 制 的时 。 TableInputFormat的API http://

hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html 中

, HBaseConfiguration中 它 , 它 Spark。

5.5.4 Elasticsearch

Spark 用Elasticsearch-Hadoop https://github.com/elastic/elasticsearch-hadoop Elasticsearch 中读 数据。Elasticsearch 开 的、 Lucene 的 。

Elasticsearch 和 的 大 ,它会 的路 ,

而 SparkContext 中 的 。Elasticsearch 的OutputFormat 有用 Spark 所 的 ,所 用saveAsHadoopDataSet 代 , 需要

5-46: Scala 中 用 Elasticsearch 出

val jobConf = new JobConf(sc.hadoopConfiguration)

jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.

mr.EsOutputFormat")

jobConf.setOutputCommitter(classOf[FileOutputCommitter])

jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets") jobConf.set(ConfigurationOptions.ES_NODES, "localhost")

FileOutputFormat.setOutputPath(jobConf, new Path("-")) output.saveAsHadoopDataset(jobConf)

5-47: Scala 中 用 Elasticsearch

def mapWritableToInput(in: MapWritable): Map[String, String] = { in.map{case (k, v) => (k.toString, v.toString)}.toMap

}

val jobConf = new JobConf(sc.hadoopConfiguration)

jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1)) jobConf.set(ConfigurationOptions.ES_NODES, args(2)) val currentTweets = sc.hadoopRDD(jobConf,

classOf[EsInputFormat[Object, MapWritable]], classOf[Object], classOf[MapWritable])

// map

// MapWritable[Text, Text] Map[String, String]

val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }

和 ,Elasticsearch 有 , 作 的 有效 。

出而 ,Elasticsearch 进行 , 尔会 出 的

数据 , 要存 字 的数据 , 定

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html 。

5.6

本 , 应 能 数据读 Spark 中, 计算 所 的方

式存 。 了数据 用的 同 式, 及它 应的数

据处理的方式。 掌握了读 和保存大 数据集的方法, 会

用 编 高效 大的Spark 序的方法。

第 6 章

Spark编程进阶

6.1

本 有 及的Spark 编 的 进 特 ,会 的共 量:

累加器 accumulator 广播变量 broadcast variable 。 加 用 进行 合,而 量用 高效分发 大的 。 有的RDD 化 作的 ,

数据 需要 大 代价的任 了 作。 了 用的工具 ,本 会

Spark 序交互的方式, 用R 编 的 本进行交互。

本 会 用 电 作者的 作 , 出 的 应用。

的 的 号。 号 由 分 的, 有自 的 号号 ,

所 据 号 应的 。有 作者的地理 ,用

定 。 6-1 了 。本书的 代 中 需要

中 进行处理的 号列 。

6-1: JSON 式的 , 中 字

{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",

"contactlat":"37.384733","contactlong":"-122.032164",

"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",

"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}

要用 的第 Spark 特 共 量。共 量 Spark 任 中 用的特

的 量。 中, 用Spark 共 量 的 进行计数,

及分发 张 大的 。

任 需要 时 进行 , 需要 数据 者 数生成 时, 数 file = sc.textFile(inputFile)

# Accumulator[Int] 化 0 blankLines = sc.accumulator(0) def extractCallSigns(line):

global blankLines # 问 量 if (line == ""):

blankLines += 1 return line.split(" ")

callSigns = file.flatMap(extractCallSigns)

callSigns.saveAsTextFile(outputDir + "/callsigns") print "Blank lines: %d" % blankLines.value

6-3: Scala 中 加 行 val sc = new SparkContext(...) val file = sc.textFile("file.txt")

val blankLines = sc.accumulator(0) // Accumulator[Int] 化 0 val callSigns = file.flatMap(line => {

if (line == "") { println("Blank lines: " + blankLines.value) 6-4: Java 中 加 行

JavaRDD<String> rdd = sc.textFile(args[1]);

final Accumulator<Integer> blankLines = sc.accumulator(0);

JavaRDD<String> callSigns = rdd.flatMap(

new FlatMapFunction<String, String>() { public Iterable<String> call(String line) { if (line.equals("")) {

blankLines.add(1);

}

return Arrays.asList(line.split(" "));

}});

callSigns.saveAsTextFile("output.txt")

System.out.println("Blank lines: "+ blankLines.value());

中, 了 作blankLines的Accumulator[Int] ,

中 行时 加1。执行 化 作 , 打印出 加 中的 。 ,

• 中 用SparkContext.accumulator(initialValue)方 法, 出 存 有

的 加 。 org.apache.spark.Accumulator[T] , 中T

initialValue的 。

的计数 时 方 , 有 需要 时, 者 需要

validSignCount = sc.accumulator(0) invalidSignCount = sc.accumulator(0) def validateSign(sign):

global validSignCount, invalidSignCount

if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):

validSignCount += 1

validSigns = callSigns.filter(validateSign)

contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x + y)

# 制 计算计数

contactCount.count()

if invalidSignCount.value < 0.1 * validSignCount.value:

contactCount.saveAsTextFile(outputDir + "/contactCount") else:

print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.

value)

加 要 处理 ,对于要在行动操作中使用的累加器,Spark

只会把每个任务对各累加器的修改应用一次。 , 要 计

算时 的 加 , 它 foreach() 的行 作中。

对于在RDD 转化操作中使用的累加器,就不能保证有这种情况了。 化 作中 加

能会发生 次 新。 子, 存 有 用的RDD 第

次 LRU 存中 新用 时, 的 次 新 会发生。 会 制

RDD 据 进行 算,而 作用 会 中的 化 作 的 加 进行

新, 次发 中。 化 作中, 加 用 的。

版本的Spark 能会 行 成 新 次 加 的 , 版本

1.2.0 会进行 次 新, 化 作中的 加 时 用。

6.2.2

, 学 了 用加法 作Spark 的 加 Accumulator[Int] 。 Spark Double、Long和Float 的 加 。 ,Spark 了自定义

加 和 合 作的API 要 加的 中的 大 ,而 加 。自

定义 加 需要 AccumulatorParam, Spark API http://spark.apache.org/docs/

latest/api/scala/index.html#package 中有所 。 要 作同时 交 和 合 ,

用任 作 代 数 的加法。 了 和, 数据的 大 。

任 的a 和 b,有 a op b = b op a,op 交 。 任 的a、b 和 c,有 (a op b) op = a op (b op ),op

合 。 ,sum和max 交 合 , Spark 加 中

的 用 作。

6.3

Spark 的第 共 量 广播变量,它 序高效地 所有工作 发

大的 读 , Spark 作 用。 , 的应用需要 所有 发

大的 读 , 学 算法中的 大的特 量, 量用

,Spark 会自 中所有 用 的 量发 工作 。 方 ,

效。 有 : , 的任 发 制 任 进行 化的 次,

能会 多个 行 作中 用同 量, Spark 会 作分 发 。

子, 要 Spark 序, 号的 应的 。 的

, 由 用 自的 号 ,所 方法 行的。 用 Spark , 代 6-6 所 。

6-6: Python 中

# RDD contactCounts中的 号的 应 。 号

# 读 代 进行

signPrefixes = loadCallSignTable()

def processSignCount(sign_count, signPrefixes):

country = lookupCountry(sign_count[0], signPrefixes) count = sign_count[1]

return (country, count)

countryContactCounts = (contactCounts

.map(processSignCount)

signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes):

country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1]

return (country, count)

countryContactCounts = (contactCounts

.map(processSignCount)

val signPrefixes = sc.broadcast(loadCallSignTable())

val countryContactCounts = contactCounts.map{case (sign, count) =>

val country = lookupInArray(sign, signPrefixes.value) (country, count)

final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());

JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(

new PairFunction<Tuple2<String, Integer>, String, Integer> (){

public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) { String sign = callSignCount._1();

String country = lookupCountry(sign, callSignInfo.value());

return new Tuple2(country, callSignCount._2());

}}).reduceByKey(new SumInts());

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

所 , 用 量的 简 。

(1) T的 用SparkContext.broadcast 出 Broadcast[T] 。

任 序列化的 。

val theArray = broadcastArray.value; theArray(0) = newValue 的

。 工作 执行时, 行 newValue 数 的第 元 , 工作

的任 效。 用spark.serializer 序列化

化序列化 第8 中会 用Kryo 快的序列化 , 的数

据 自 的序列化方式 Java 用java.io.Externalizable 序列

urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs) #

requests = map(lambda x: (x, http.request('GET', x)), urls) #

result = map(lambda x: (x[0], json.loads(x[1].data)), requests)

# 的

return filter(lambda x: x[1] is not None, result) def fetchCallSigns(input):

""" 号"""

return input.mapPartitions(lambda callSigns : processCallSigns(callSigns)) contactsContactList = fetchCallSigns(validSigns)

6-11: Scala 中 用共 JSON 解析

val contactsContactLists = validSigns.distinct().mapPartitions{

signs =>

val mapper = createMapper() val client = new HttpClient() client.start()

// http

signs.map {sign =>

createExchangeForSign(sign)

// 应

}.map{ case (sign, exchange) =>

(sign, readExchangeCallLog(mapper, exchange)) }.filter(x => x._2 != null) // 的

}

6-12: Java 中 用共 JSON 解析 // 用mapPartitions 用 工作

JavaPairRDD<String, CallLog[]> contactsContactLists = validCallSigns.mapPartitionsToPair(

new PairFlatMapFunction<Iterator<String>, String, CallLog[]>() {

public Iterable<Tuple2<String, CallLog[]>> call(Iterator<String> input) { // 列出

ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();

ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();

ObjectMapper mapper = createMapper();

HttpClient client = new HttpClient();

try {

client.start();

while (input.hasNext()) {

requests.add(createRequestForSign(input.next(), client));

}

for (Tuple2<String, ContentExchange> signExchange : requests) { callsignLogs.add(fetchResultFromRequest(mapper, signExchange));

}

mapPartitionsWithIndex() 分区序号, 及 分区中

的元 的迭代

的元 的迭代 f: (Int, Itera tor[T]) → Iter ator[U]

foreachPartitions() 元 迭代 f: (Iterator[T])

Unit

了 的 工作, 用mapPartitions() 的开 。有时需要

同 的数据 合 。 第3 中, 计算 时, 方法

数 RDD 元 RDD, 中 所处理的元 数。 ,

分区 次 元 ,而 用 元 执行 作, 6-13 和 6-14。

6-13: Python 中 用mapPartitions() def combineCtrs(c1, c2):

return (c1[0] + c2[0], c1[1] + c2[1]) def basicAvg(nums):

"""计算 """

nums.map(lambda num: (num, 1)).reduce(combineCtrs) 6-14: Python 中 用mapPartitions()

def partitionCtr(nums):

"""计算分区的sumCounter"""

sumCount = [0, 0]

sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs) return sumCount[0] / float(sumCount[1])

6.5 道

6-15:R 的 序

#!/usr/bin/env Rscript library("Imap") f <- file("stdin") open(f)

while(length(line <- readLines(f,n=1)) > 0) { # 处理行

contents <- Map(as.numeric, strsplit(line, ",")) mydist <- gdist(contents[[1]][1], contents[[1]][2], contents[[1]][3], contents[[1]][4],

units="m", a=6378137.0, b=6356752.3142, verbose = FALSE) write(mydist, stdout())

}

distScript = "./src/R/finddistance.R"

distScriptName = "finddistance.R"

sc.addFile(distScript) def hasDistInfo(call):

""" 次 有计算 时 需的字 """

requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]

return all(map(lambda f: call[f], requiredFields)) def formatCall(call):

""" 新的 式 新 R 序解析"""

return "{0},{1},{2},{3}".format(

call["mylat"], call["mylong"],

call["contactlat"], call["contactlong"]) pipeInputs = contactsContactList.values().flatMap(

lambda calls: map(formatCall, filter(hasDistInfo, calls))) distances = pipeInputs.pipe(SparkFiles.get(distScriptName)) print distances.collect()

6-17: Scala 中 用pipe() 用finddistance.R的 序

// 用 R 序计算 次 的

// 本 加 需要 本次作 中 的 件的列 中 val distScript = "./src/R/finddistance.R"

val distScriptName = "finddistance.R"

sc.addFile(distScript)

val distances = contactsContactLists.values.flatMap(x => x.map(y =>

s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")).pipe(Seq(

SparkFiles.get(distScriptName))) println(distances.collect().toList)

6-18: Java 中 用pipe() 用finddistance.R的 序

// 用 R 序计算 次 的

// 本 加 需要 本次作 中 的 件的列 中

String distScript = "./src/R/finddistance.R";

String distScriptName = "finddistance.R";

sc.addFile(distScript);

JavaRDD<String> pipeInputs = contactsContactLists.values() .map(new VerifyCallLogs()).flatMap(

new FlatMapFunction<CallLog[], String>() { public Iterable<String> call(CallLog[] calls) { ArrayList<String> latLons = new ArrayList<String>();

for (CallLog call: calls) {

JavaRDD<String> distances = pipeInputs.pipe(SparkFiles.get(distScriptName));

System.out.println(StringUtils.join(distances.collect(), ","));

SparkContext.addFile(path), 件列 , 工作 Spark 作

中 列 中的 件。 件 自 的本地 件 子中所 的

, 者 自HDFS Hadoop 所 的 件 , 者 HTTP、HTTPS FTP 的 URI

地址。 作 中的行 作 发时, 件 会 , 工作

SparkFiles.getRootDirectory 它 。 用SparkFiles.get(Filename)

定 件。 , 保pipe()能 工作 本的方法 。

用 的远 制工具 本 件 的 。

所有 SparkContext.addFile(path) 加的 件 存 同 中,

所 有 要 用 的 字。

本 问,RDD 的pipe()方法 RDD 中的元 地 本 道。

有 版本的findDistance, 行 数的 式 收 定的SEPARATOR。

的 , 的 方法 成工作, 用第 。

• rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))

• rdd.pipe(SparkFiles.get("finddistance.R") + " ,")

第 方法中, 用 定 的 数序列的 式 本

而 第 方法中, 它作 字 , Spark 会 字

解 定 的 数序列。

需要的 , pipe() 定 行 量。 需要 量 应 的

作 pipe()的第 数 进 ,Spark 会 。

应 理解了 用pipe()、 处理RDD 中的元 , 及

的 本分发 集 , 能 工作 本。

6.6 数 RDD

Spark 数 数据的RDD 了 的 计 作。 会 第11

Spark 数 数据的RDD 了 的 计 作。 会 第11

在文檔中 快速大数据分析 (頁 101-0)

相關文件