5.3 件
5.3.3 HDFS
Hadoop 分 式 件 HDFS 用的 件 ,Spark 能 地 用它。
HDFS 计 价的 件 工作,有 地应 ,同时 高 量。
Spark 和 HDFS 同 , Spark 用数据分 量
网络开 。
Spark 中 用 HDFS 需要 出路 定 hdfs://master:port/path 了。
HDFS Hadoop 版本 而 化, 用的Spark
版本的Hadoop 编译的, 读 会 。 ,Spark
Hadoop 1.0.4 编译7。 代 编译, 量中 定SPARK_
HADOOP_VERSION= 版本的Hadoop 进行编译
编译 的Spark 版本。 据运行hadoop version的
量要 的 。
5.4 Spark SQL中 数据
Spark SQL Spark 1.0 中新加 Spark 的 件, 快速成 了 Spark 中 的 作
化和 化数据的方式。 化数据 的 有结构信息的数据—— 所有的数
据记 具有 字 的集合。Spark SQL 化数据 作 ,而 由
Spark SQL 道数据的 ,它 数据 中 读出所需字 。第9
地讲解Spark SQL, 用它 数据 中读 数据。
, SQL Spark SQL, 它 数据 执行 出
字 者 字 用 数 , 由Row 成的RDD, Row
记 。 Java 和 Scala 中,Row 的 问 的。 Row 有
get()方法,会 进行 。 有 本
的 用get()方 法 getFloat()、getInt()、getLong()、getString()、getShort()、
getBoolean()等 。 Python 中, 用row[column_number] 及row.column_name
问元 。
7:自 Spark 1.4.0 ,Spark 的Hadoop 版本 2.2.0。——译者
5.4.1 Apache Hive
5-30:用 Python HiveContext 数据from pyspark.sql import HiveContext hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users") firstRow = rows.first()
print firstRow.name
5-31:用 Scala HiveContext 数据 import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc) val rows = hiveCtx.sql("SELECT name, age FROM users") val firstRow = rows.first()
println(firstRow.getString(0)) // 字 0 name字 5-32:用 Java HiveContext 数据
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SchemaRDD;
HiveContext hiveCtx = new HiveContext(sc);
SchemaRDD rows = hiveCtx.sql("SELECT name, age FROM users");
Row firstRow = rows.first();
System.out.println(firstRow.getString(0)); // 字 0 name字 会 9.3.1 地 Hive 中读 数据。
5.4.2 JSON
有记 的JSON 数据,Spark SQL 自 出它 的 ,
数据读 记 , 字 的 作 简 。要读 JSON 数
据, 需要和 用Hive HiveContext。 需要
Hive, 需要hive-site.xml 件。 用HiveContext.jsonFile方法
件中 由Row 成的RDD。 了 用 Row , RDD
册 张 , 中 出特定的字 。 , 有 的JSON 件,
式 5-33 所 , 行 记 。 5-33:JSON 中的
{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}
读 数据, 中 username 用 和text 本 字 , 5-34
5-36 所 。
5-34: Python 中 用 Spark SQL 读 JSON 数据 tweets = hiveCtx.jsonFile("tweets.json") tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name, text FROM tweets") 5-35: Scala 中 用 Spark SQL 读 JSON 数据
val tweets = hiveCtx.jsonFile("tweets.json") tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets") 5-36: Java 中 用 Spark SQL 读 JSON 数据
SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);
tweets.registerTempTable("tweets");
SchemaRDD results = hiveCtx.sql("SELECT user.name, text FROM tweets");
会 9.3.3 用Spark SQL 读 JSON 数据 问 进行 。
,Spark SQL 的 远 读 数据, 数据、 RDD 所 的方式 的方式 合数据、 数据运行自定义 数, 第9 中讲 。
5.5 数据
数据 的Hadoop 者自定义的Spark ,Spark 问 用的
数据 。本 的 。
5.5.1 Java数据
Spark 任 Java 数 据 JDBC 的 数 据 中 读 数 据,
MySQL、Postgre 等 。 要 问 数 据, 需 要 org.apache.spark.rdd.
JdbcRDD, SparkContext 和 数 它。 5-37 了 用JdbcRDD
MySQL 数据 。
5-37:Scala 中的 JdbcRDD def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) = { (r.getInt(1), r.getString(2)) }
val data = new JdbcRDD(sc,
createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues) println(data.collect().toList)
• 数的 数 出 java.sql.ResultSet http://docs.
oracle.com/javase/7/docs/api/java/sql/ResultSet.html 作数据有用的 式的 数。
5-37 中, 会 (Int, String) 。 数 ,Spark 会自 行
DataStax 开 用 Spark 的 Cassandra https://github.com/datastax/spark-cassandra-connector ,Spark Cassandra 的 大大 。 Spark 的 分,
需要 加 的 的 件中 能 用它。Cassandra 有 用
Spark SQL, 它会 由CassandraRow 成的RDD, 有 分方法 Spark SQL 的Row 的方法 同, 5-38 和 5-39 所 。Spark 的 Cassandra
能 Java 和 Scala 中 用。
5-38:Cassandra 的sbt
"com.datastax.spark" %% "spark-cassandra-connector" % "1.0.0-rc5",
"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.0.0-rc5"
8: 道 有 记 , 执行 计数 , 据 决定upperBound和
lowerBound的 。
5-39:Cassandra 的Maven
<dependency> <!-- Cassandra -->
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector</artifactId>
<version>1.0.0-rc5</version>
</dependency>
<dependency> <!-- Cassandra -->
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java</artifactId>
<version>1.0.0-rc5</version>
</dependency>
Elasticsearch ,Cassandra 要读 作 决定 集 。
spark.cassandra.connection.host Cassandra 集 。 有用 和 的
, 需要分 spark.cassandra.auth.username和spark.cassandra.auth.password。
定 有 Cassandra 集 要 , SparkContext 时 , 5-40 和 5-41 所 。
5-40: Scala 中 Cassandra val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "hostname") val sc = new SparkContext(conf)
5-41: Java 中 Cassandra SparkConf conf = new SparkConf(true)
.set("spark.cassandra.connection.host", cassandraHost);
JavaSparkContext sc = new JavaSparkContext(
sparkMaster, "basicquerycassandra", conf);
Datastax 的 Cassandra 用Scala 中的 式 SparkContext 和 RDD
加 数。 式 , 读 数据 5-42 所 。
5-42: Scala 中 张 读 RDD
// SparkContext和RDD 加 数的 式
import com.datastax.spark.connector._
// 张 读 RDD。 的 test的
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
val data = sc.cassandraTable("test" , "kv")
// 打印出value字 的 本 计。
import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;
// 张 读 RDD。 的 test的
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
JavaRDD<CassandraRow> data = javaFunctions(sc).cassandraTable("test" , "kv");
// 打印 本 计。
System.out.println(data.mapToDouble(new DoubleFunction<CassandraRow>() { public double call(CassandraRow row) { return row.getInt("value"); } }).stats());
了读 张 , 数据集的子集。 cassandraTable()的 用中加 where 子 , 制 的数据, sc.cassandraTable(...).where("key=?", "panda")。 Cassandra 的RDD 保 存 Cassandra 中。 保 存 由
CassandraRow 成的RDD, 制数据 有用。 定列的
, 存 行的 式而 元 和列 的 式的RDD, 5-44 所 。 5-44: Scala 中保存数据 Cassandra
val rdd = sc.parallelize(List(Seq("moremagic", 1)))
rdd.saveToCassandra("test" , "kv", SomeColumns("key", "value"))
本 简 地 了Cassandra 。要了解 , 阅 的GitHub https://github.com/datastax/spark-cassandra-connector 。
5.5.3 HBase
由 org.apache.hadoop.hbase.mapreduce.TableInputFormat 的 ,Spark
Hadoop 式 问HBase。 式会 数据, 中 的 org.
apache.hadoop.hbase.io.ImmutableBytesWritable,而 的 org.apache.hadoop.hbase.
client.Result。Result 据 列 的 方 法, API https://hbase.
apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html 中有所 。
要 Spark 用 HBase, 需要 用 的 式 用SparkContext.newAPIHadoopRDD。 Scala 中的 5-45 所 。 val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename") // 张 val rdd = sc.newAPIHadoopRDD(
conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],classOf[Result])
TableInputFormat 用 化 HBase 的读 的 , 制
分列中, 及 制 的时 。 TableInputFormat的API http://
hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html 中
, HBaseConfiguration中 它 , 它 Spark。
5.5.4 Elasticsearch
Spark 用Elasticsearch-Hadoop https://github.com/elastic/elasticsearch-hadoop Elasticsearch 中读 数据。Elasticsearch 开 的、 Lucene 的 。
Elasticsearch 和 的 大 ,它会 的路 ,
而 SparkContext 中 的 。Elasticsearch 的OutputFormat 有用 Spark 所 的 ,所 用saveAsHadoopDataSet 代 , 需要
5-46: Scala 中 用 Elasticsearch 出
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.
mr.EsOutputFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets") jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-")) output.saveAsHadoopDataset(jobConf)
5-47: Scala 中 用 Elasticsearch
def mapWritableToInput(in: MapWritable): Map[String, String] = { in.map{case (k, v) => (k.toString, v.toString)}.toMap
}
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1)) jobConf.set(ConfigurationOptions.ES_NODES, args(2)) val currentTweets = sc.hadoopRDD(jobConf,
classOf[EsInputFormat[Object, MapWritable]], classOf[Object], classOf[MapWritable])
// map
// MapWritable[Text, Text] Map[String, String]
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }
和 ,Elasticsearch 有 , 作 的 有效 。
出而 ,Elasticsearch 进行 , 尔会 出 的
数据 , 要存 字 的数据 , 定
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html 。
5.6
本 , 应 能 数据读 Spark 中, 计算 所 的方
式存 。 了数据 用的 同 式, 及它 应的数
据处理的方式。 掌握了读 和保存大 数据集的方法, 会
用 编 高效 大的Spark 序的方法。
第 6 章
Spark编程进阶
6.1
本 有 及的Spark 编 的 进 特 ,会 的共 量:
累加器 accumulator 广播变量 broadcast variable 。 加 用 进行 合,而 量用 高效分发 大的 。 有的RDD 化 作的 ,
数据 需要 大 代价的任 了 作。 了 用的工具 ,本 会
Spark 序交互的方式, 用R 编 的 本进行交互。
本 会 用 电 作者的 作 , 出 的 应用。
的 的 号。 号 由 分 的, 有自 的 号号 ,
所 据 号 应的 。有 作者的地理 ,用
定 。 6-1 了 。本书的 代 中 需要
中 进行处理的 号列 。
6-1: JSON 式的 , 中 字
{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",
"contactlat":"37.384733","contactlong":"-122.032164",
"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",
"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}
要用 的第 Spark 特 共 量。共 量 Spark 任 中 用的特
的 量。 中, 用Spark 共 量 的 进行计数,
及分发 张 大的 。
任 需要 时 进行 , 需要 数据 者 数生成 时, 数 file = sc.textFile(inputFile)
# Accumulator[Int] 化 0 blankLines = sc.accumulator(0) def extractCallSigns(line):
global blankLines # 问 量 if (line == ""):
blankLines += 1 return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns") print "Blank lines: %d" % blankLines.value
6-3: Scala 中 加 行 val sc = new SparkContext(...) val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) // Accumulator[Int] 化 0 val callSigns = file.flatMap(line => {
if (line == "") { println("Blank lines: " + blankLines.value) 6-4: Java 中 加 行
JavaRDD<String> rdd = sc.textFile(args[1]);
final Accumulator<Integer> blankLines = sc.accumulator(0);
JavaRDD<String> callSigns = rdd.flatMap(
new FlatMapFunction<String, String>() { public Iterable<String> call(String line) { if (line.equals("")) {
blankLines.add(1);
}
return Arrays.asList(line.split(" "));
}});
callSigns.saveAsTextFile("output.txt")
System.out.println("Blank lines: "+ blankLines.value());
中, 了 作blankLines的Accumulator[Int] ,
中 行时 加1。执行 化 作 , 打印出 加 中的 。 ,
• 中 用SparkContext.accumulator(initialValue)方 法, 出 存 有
的 加 。 org.apache.spark.Accumulator[T] , 中T
initialValue的 。
的计数 时 方 , 有 需要 时, 者 需要
validSignCount = sc.accumulator(0) invalidSignCount = sc.accumulator(0) def validateSign(sign):
global validSignCount, invalidSignCount
if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):
validSignCount += 1
validSigns = callSigns.filter(validateSign)
contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x + y)
# 制 计算计数
contactCount.count()
if invalidSignCount.value < 0.1 * validSignCount.value:
contactCount.saveAsTextFile(outputDir + "/contactCount") else:
print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.
value)
加 要 处理 ,对于要在行动操作中使用的累加器,Spark
只会把每个任务对各累加器的修改应用一次。 , 要 计
算时 的 加 , 它 foreach() 的行 作中。
对于在RDD 转化操作中使用的累加器,就不能保证有这种情况了。 化 作中 加
能会发生 次 新。 子, 存 有 用的RDD 第
次 LRU 存中 新用 时, 的 次 新 会发生。 会 制
RDD 据 进行 算,而 作用 会 中的 化 作 的 加 进行
新, 次发 中。 化 作中, 加 用 的。
版本的Spark 能会 行 成 新 次 加 的 , 版本
1.2.0 会进行 次 新, 化 作中的 加 时 用。
6.2.2
, 学 了 用加法 作Spark 的 加 Accumulator[Int] 。 Spark Double、Long和Float 的 加 。 ,Spark 了自定义
加 和 合 作的API 要 加的 中的 大 ,而 加 。自
定义 加 需要 AccumulatorParam, Spark API http://spark.apache.org/docs/
latest/api/scala/index.html#package 中有所 。 要 作同时 交 和 合 ,
用任 作 代 数 的加法。 了 和, 数据的 大 。
任 的a 和 b,有 a op b = b op a, 作op 交 。 任 的a、b 和 c,有 (a op b) op = a op (b op ), 作op
合 。 ,sum和max 交 合 , Spark 加 中
的 用 作。
6.3
Spark 的第 共 量 广播变量,它 序高效地 所有工作 发
大的 读 , Spark 作 用。 , 的应用需要 所有 发
大的 读 , 学 算法中的 大的特 量, 量用
。
,Spark 会自 中所有 用 的 量发 工作 。 方 ,
效。 有 : , 的任 发 制 任 进行 化的 次,
能会 多个 行 作中 用同 量, Spark 会 作分 发 。
子, 要 Spark 序, 号的 应的 。 的
, 由 用 自的 号 ,所 方法 行的。 用 Spark , 代 6-6 所 。
6-6: Python 中
# RDD contactCounts中的 号的 应 。 号
# 读 代 进行
signPrefixes = loadCallSignTable()
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes) count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts
.map(processSignCount)
signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts
.map(processSignCount)
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
val country = lookupInArray(sign, signPrefixes.value) (country, count)
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());
JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
new PairFunction<Tuple2<String, Integer>, String, Integer> (){
public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) { String sign = callSignCount._1();
String country = lookupCountry(sign, callSignInfo.value());
return new Tuple2(country, callSignCount._2());
}}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
所 , 用 量的 简 。
(1) T的 用SparkContext.broadcast 出 Broadcast[T] 。
任 序列化的 。
val theArray = broadcastArray.value; theArray(0) = newValue 的
。 工作 执行时, 行 newValue 数 的第 元 , 工作
的任 效。 用spark.serializer 序列化
化序列化 第8 中会 用Kryo 快的序列化 , 的数
据 自 的序列化方式 Java 用java.io.Externalizable 序列
urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs) #
requests = map(lambda x: (x, http.request('GET', x)), urls) #
result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
# 的
return filter(lambda x: x[1] is not None, result) def fetchCallSigns(input):
""" 号"""
return input.mapPartitions(lambda callSigns : processCallSigns(callSigns)) contactsContactList = fetchCallSigns(validSigns)
6-11: Scala 中 用共 JSON 解析
val contactsContactLists = validSigns.distinct().mapPartitions{
signs =>
val mapper = createMapper() val client = new HttpClient() client.start()
// http
signs.map {sign =>
createExchangeForSign(sign)
// 应
}.map{ case (sign, exchange) =>
(sign, readExchangeCallLog(mapper, exchange)) }.filter(x => x._2 != null) // 的
}
6-12: Java 中 用共 JSON 解析 // 用mapPartitions 用 工作
JavaPairRDD<String, CallLog[]> contactsContactLists = validCallSigns.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<String>, String, CallLog[]>() {
public Iterable<Tuple2<String, CallLog[]>> call(Iterator<String> input) { // 列出
ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();
ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();
ObjectMapper mapper = createMapper();
HttpClient client = new HttpClient();
try {
client.start();
while (input.hasNext()) {
requests.add(createRequestForSign(input.next(), client));
}
for (Tuple2<String, ContentExchange> signExchange : requests) { callsignLogs.add(fetchResultFromRequest(mapper, signExchange));
}
mapPartitionsWithIndex() 分区序号, 及 分区中
的元 的迭代
的元 的迭代 f: (Int, Itera tor[T]) → Iter ator[U]
foreachPartitions() 元 迭代 f: (Iterator[T]) →
Unit
了 的 工作, 用mapPartitions() 的开 。有时需要
同 的数据 合 。 第3 中, 计算 时, 方法
数 RDD 元 RDD, 中 所处理的元 数。 ,
分区 次 元 ,而 用 元 执行 作, 6-13 和 6-14。
6-13: Python 中 用mapPartitions() def combineCtrs(c1, c2):
return (c1[0] + c2[0], c1[1] + c2[1]) def basicAvg(nums):
"""计算 """
nums.map(lambda num: (num, 1)).reduce(combineCtrs) 6-14: Python 中 用mapPartitions()
def partitionCtr(nums):
"""计算分区的sumCounter"""
sumCount = [0, 0]
sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs) return sumCount[0] / float(sumCount[1])
6.5 道
6-15:R 的 序
#!/usr/bin/env Rscript library("Imap") f <- file("stdin") open(f)
while(length(line <- readLines(f,n=1)) > 0) { # 处理行
contents <- Map(as.numeric, strsplit(line, ",")) mydist <- gdist(contents[[1]][1], contents[[1]][2], contents[[1]][3], contents[[1]][4],
units="m", a=6378137.0, b=6356752.3142, verbose = FALSE) write(mydist, stdout())
}
distScript = "./src/R/finddistance.R"
distScriptName = "finddistance.R"
sc.addFile(distScript) def hasDistInfo(call):
""" 次 有计算 时 需的字 """
requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]
return all(map(lambda f: call[f], requiredFields)) def formatCall(call):
""" 新的 式 新 R 序解析"""
return "{0},{1},{2},{3}".format(
call["mylat"], call["mylong"],
call["contactlat"], call["contactlong"]) pipeInputs = contactsContactList.values().flatMap(
lambda calls: map(formatCall, filter(hasDistInfo, calls))) distances = pipeInputs.pipe(SparkFiles.get(distScriptName)) print distances.collect()
6-17: Scala 中 用pipe() 用finddistance.R的 序
// 用 R 序计算 次 的
// 本 加 需要 本次作 中 的 件的列 中 val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x => x.map(y =>
s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")).pipe(Seq(
SparkFiles.get(distScriptName))) println(distances.collect().toList)
6-18: Java 中 用pipe() 用finddistance.R的 序
// 用 R 序计算 次 的
// 本 加 需要 本次作 中 的 件的列 中
String distScript = "./src/R/finddistance.R";
String distScriptName = "finddistance.R";
sc.addFile(distScript);
JavaRDD<String> pipeInputs = contactsContactLists.values() .map(new VerifyCallLogs()).flatMap(
new FlatMapFunction<CallLog[], String>() { public Iterable<String> call(CallLog[] calls) { ArrayList<String> latLons = new ArrayList<String>();
for (CallLog call: calls) {
JavaRDD<String> distances = pipeInputs.pipe(SparkFiles.get(distScriptName));
System.out.println(StringUtils.join(distances.collect(), ","));
SparkContext.addFile(path), 件列 , 工作 Spark 作
中 列 中的 件。 件 自 的本地 件 子中所 的
, 者 自HDFS Hadoop 所 的 件 , 者 HTTP、HTTPS FTP 的 URI
地址。 作 中的行 作 发时, 件 会 , 工作
SparkFiles.getRootDirectory 它 。 用SparkFiles.get(Filename)
定 件。 , 保pipe()能 工作 本的方法 。
用 的远 制工具 本 件 的 。
所有 SparkContext.addFile(path) 加的 件 存 同 中,
所 有 要 用 的 字。
本 问,RDD 的pipe()方法 RDD 中的元 地 本 道。
有 版本的findDistance, 行 数的 式 收 定的SEPARATOR。
的 , 的 方法 成工作, 用第 。
• rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))
• rdd.pipe(SparkFiles.get("finddistance.R") + " ,")
第 方法中, 用 定 的 数序列的 式 本
而 第 方法中, 它作 字 , Spark 会 字
解 定 的 数序列。
需要的 , pipe() 定 行 量。 需要 量 应 的
作 pipe()的第 数 进 ,Spark 会 。
应 理解了 用pipe()、 处理RDD 中的元 , 及
的 本分发 集 , 能 工作 本。
6.6 数 RDD
Spark 数 数据的RDD 了 的 计 作。 会 第11
Spark 数 数据的RDD 了 的 计 作。 会 第11