• 沒有找到結果。

Hadoop 出

在文檔中 快速大数据分析 (頁 95-0)

5.2  件 式

5.2.6 Hadoop 出

了Spark 的 式 , 任 Hadoop 的 式交互。Spark 新

Hadoop 件API, 了 大的 。4 1. Hadoop

要 用新版的Hadoop API 读 件,需要 Spark 。newAPIHadoopFile

收 路 及 。 第 式 , 代 式。 的 数

hadoopFile() 用 用 的API 的Hadoop 式。第 的 ,

的 。 需要 定 的Hadoop , conf 。

KeyValueTextInputFormat 简 的Hadoop 式 , 用 本 件中读

数据 5-24 所 。 行 会 处理, 和 用制 开。

式存 Hadoop 中,所 需 工 中 加 的 能 用它。

5-24: Scala 中 用 式 API 读 KeyValueTextInputFormat()

val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{

case (x, y) => (x.toString, y.toString) }

学 了 读 本 件 加 解析 读 JSON 数据的方法。 ,

用自定义Hadoop 式 读 JSON 数据。 需要 的 ,

的 。Twitter 的 Elephant Bird https://github.com/

twitter/elephant-bird 数据 式, JSON、Lucene、Protocol Buffer 的 式等。 适用 新 Hadoop 件API。 了 Spark 中 用新式 Hadoop API, 用Lzo JsonInputFormat读 LZO 算法 的JSON 数据的

子。

5-25: Scala 中 用 Elephant Bird 读 LZO 算法 的JSON 件 val input = sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat], classOf[LongWritable], classOf[MapWritable], conf)

// " "中的 MapWritable代 JSON

LZO 的 要 hadoop-lzo , Spark 的本地 中。

用Debian , 用spark-submit时加 --driver-library-path /

usr/lib/hadoop/lib/native/ --driver-class-path /usr/lib/hadoop/lib/

了。

用 的Hadoop API 读 件 用法 , 了需要 式InputFormat 。 Spark 自带的 的 数 sequenceFile() 用 式Hadoop API 的。

2. Hadoop

SequenceFile 有了 定的了解, Java API 中 有 用的保存 pair RDD 的

4:Hadoop 进 中增加了 新的MapReduce API, 有 用 的 。

数。 作 用 式Hadoop 式的API 的 子 5-26

新 saveAsNewAPIHadoopFile 的 用方法 的。

5-26: Java 保存 SequenceFile

public static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {

public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) { return new Tuple2(new Text(record._1), new IntWritable(record._2));

} }

JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input);

JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());

result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class);

3. 数据

了hadoopFile()和saveAsHadoopFile() 大 数, 用hadoopDataset/ saveAsHadoopDataSet和newAPIHadoopDataset/saveAsNewAPIHadoopDataset 问Hadoop 所

的 件 的存 式。 , HBase 和 MongoDB 的 存

了用 读 Hadoop 式的 。 Spark 中 方 地 用 式。

hadoopDataset() 数 收 Configuration , 用 问数据

所 需的Hadoop 。 要 用 Hadoop MapReduce 作 同的方式

。所 应 MapReduce 中 问 数据 的 用 ,

Spark。 ,5.5.3 了 用newAPIHadoopDataset HBase 中读 数据。

4. :protocol buffer

Protocol buffer 简 PB,https://github.com/google/protobuf 5 由Google 开发,用

的远 用 RPC , 开 。PB 化数据,它要 字 和 要 定

, 有 能需要读 的 数据 。

PB 字 定义 , 者 PB 。 string、int32、enum

等。 PB 的 , 的 , 问Protocol Buffer 的网 https://developers.google.com/protocol-buffers 了解 。

5-27 的 简 的PB 式中读 VenueResponse 。VenueResponse

字 的简 式, 字 带有 需字 、 字 及

字 的PB 。 5-27:PB 定义

message Venue {

required int32 id = 1;

message VenueResponse { repeated Venue results = 1;

}

中 用 Twitter 的 Elephant Bird 读 JSON 数据,它 PB 中读 和保

存数据。 出Venues的 , 5-28 所 。

5-28: Scala 中 用 Elephant Bird 出protocol buffer val job = new Job()

val conf = job.getConfiguration

LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);

val dnaLounge = Places.Venue.newBuilder() dnaLounge.setId(1);

dnaLounge.setName("DNA Lounge")

dnaLounge.setType(Places.Venue.VenueType.CLUB) val data = sc.parallelize(List(dnaLounge.build())) val outputData = data.map{ pb =>

val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);

protoWritable.set(pb) (null, protoWritable) }

outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text], classOf[ProtobufWritable[Places.Venue]],

classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)

的 版本 本书的 代 中 。

gzip 快 高 org.apache.hadoop.io.

compress.GzipCodec

Spark 的textFile()方 法 处 理 的 , 数 据

分 读 的方式 ,Spark 会打开splittable。 ,

要读 的 , 要 用Spark 的 ,而 用

newAPIHadoopFile 者hadoopFile, 定 的 编解 。

有 式 SequenceFile 数据中的 , 时 有

用。 式 有自 的 制: ,Twitter 的 Elephant Bird 中的 式 用LZO 算法 的数据。

5.3

Spark 读 件 , 用任 要的 件 式。

5.3.1 /“ ”

Spark 本地 件 中读 件, 它要求文件在集群中所有节点的相同路径下 都可以找到。

NFS、AFS 及MapR 的 NFS layer 的网络 件 会 件 件

的 式 用 。 的数据 中, 需要 定 file://

路 要 件 的同 路 ,Spark 会自 处理

5-29 所 。

5-29: Scala 中 本地 件 读 的 本 件 val rdd = sc.textFile("file:///home/holden/happypandas.gz")

件 有 集 中的所有 , 序中 本地读 件而

需 用 集 , 用parallelize 分发 工作 。 方式 能会

,所 的方法 件 HDFS、NFS、S3 等共 件 。

5.3.2 Amazon S3

用Amazon S3 存 大量数据 行。 计算 Amazon EC2 的时 ,

用S3 作 存 快, 需要 网 问数据时 能会 。

要 Spark 中 问 S3 数据, 应 的S3 问 据 AWS_ACCESS_KEY_ID和

AWS_SECRET_ACCESS_KEY 量。 Amazon Web Service 制台 据。

, s3n://开 的 路 s3n://bucket/path-within-bucket的 式

Spark 的 方法。和 所有 件 ,Spark 能 S3 路 中 字 , s3n://bucket/my-Files/*.txt。

Amazon S3 问权 , 保 定了 问 的 号 数据 有 read 读 和 list 列 的权 。Spark 需要列出 的 , 要读 的数据。

5.3.3 HDFS

Hadoop 分 式 件 HDFS 用的 件 ,Spark 能 地 用它。

HDFS 计 价的 件 工作,有 地应 ,同时 高 量。

Spark 和 HDFS 同 , Spark 用数据分 量

网络开 。

Spark 中 用 HDFS 需要 出路 定 hdfs://master:port/path 了。

HDFS Hadoop 版本 而 化, 用的Spark

版本的Hadoop 编译的, 读 会 。 ,Spark

Hadoop 1.0.4 编译7。 代 编译, 量中 定SPARK_

HADOOP_VERSION= 版本的Hadoop 进行编译

编译 的Spark 版本。 据运行hadoop version的

量要 的 。

5.4 Spark SQL中 数据

Spark SQL Spark 1.0 中新加 Spark 的 件, 快速成 了 Spark 中 的 作

化和 化数据的方式。 化数据 的 有结构信息的数据—— 所有的数

据记 具有 字 的集合。Spark SQL 化数据 作 ,而 由

Spark SQL 道数据的 ,它 数据 中 读出所需字 。第9

地讲解Spark SQL, 用它 数据 中读 数据。

, SQL Spark SQL, 它 数据 执行 出

字 者 字 用 数 , 由Row 成的RDD, Row

记 。 Java 和 Scala 中,Row 的 问 的。 Row 有

get()方法,会 进行 。 有 本

的 用get()方 法 getFloat()、getInt()、getLong()、getString()、getShort()、

getBoolean()等 。 Python 中, 用row[column_number] 及row.column_name

问元 。

7:自 Spark 1.4.0 ,Spark 的Hadoop 版本 2.2.0。——译者

5.4.1 Apache Hive

5-30:用 Python HiveContext 数据

from pyspark.sql import HiveContext hiveCtx = HiveContext(sc)

rows = hiveCtx.sql("SELECT name, age FROM users") firstRow = rows.first()

print firstRow.name

5-31:用 Scala HiveContext 数据 import org.apache.spark.sql.hive.HiveContext

val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc) val rows = hiveCtx.sql("SELECT name, age FROM users") val firstRow = rows.first()

println(firstRow.getString(0)) // 字 0 name字 5-32:用 Java HiveContext 数据

import org.apache.spark.sql.hive.HiveContext;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SchemaRDD;

HiveContext hiveCtx = new HiveContext(sc);

SchemaRDD rows = hiveCtx.sql("SELECT name, age FROM users");

Row firstRow = rows.first();

System.out.println(firstRow.getString(0)); // 字 0 name字 会 9.3.1 地 Hive 中读 数据。

5.4.2 JSON

有记 的JSON 数据,Spark SQL 自 出它 的 ,

数据读 记 , 字 的 作 简 。要读 JSON 数

据, 需要和 用Hive HiveContext。 需要

Hive, 需要hive-site.xml 件。 用HiveContext.jsonFile方法

件中 由Row 成的RDD。 了 用 Row , RDD

册 张 , 中 出特定的字 。 , 有 的JSON 件,

式 5-33 所 , 行 记 。 5-33:JSON 中的

{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}

{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}

读 数据, 中 username 用 和text 本 字 , 5-34

5-36 所 。

5-34: Python 中 用 Spark SQL 读 JSON 数据 tweets = hiveCtx.jsonFile("tweets.json") tweets.registerTempTable("tweets")

results = hiveCtx.sql("SELECT user.name, text FROM tweets") 5-35: Scala 中 用 Spark SQL 读 JSON 数据

val tweets = hiveCtx.jsonFile("tweets.json") tweets.registerTempTable("tweets")

val results = hiveCtx.sql("SELECT user.name, text FROM tweets") 5-36: Java 中 用 Spark SQL 读 JSON 数据

SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);

tweets.registerTempTable("tweets");

SchemaRDD results = hiveCtx.sql("SELECT user.name, text FROM tweets");

会 9.3.3 用Spark SQL 读 JSON 数据 问 进行 。

,Spark SQL 的 远 读 数据, 数据、 RDD 所 的方式 的方式 合数据、 数据运行自定义 数, 第9 中讲 。

5.5 数据

数据 的Hadoop 者自定义的Spark ,Spark 问 用的

数据 。本 的 。

5.5.1 Java数据

Spark 任 Java 数 据 JDBC 的 数 据 中 读 数 据,

MySQL、Postgre 等 。 要 问 数 据, 需 要 org.apache.spark.rdd.

JdbcRDD, SparkContext 和 数 它。 5-37 了 用JdbcRDD

MySQL 数据 。

5-37:Scala 中的 JdbcRDD def createConnection() = {

Class.forName("com.mysql.jdbc.Driver").newInstance();

DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");

}

def extractValues(r: ResultSet) = { (r.getInt(1), r.getString(2)) }

val data = new JdbcRDD(sc,

createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",

lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues) println(data.collect().toList)

• 数的 数 出 java.sql.ResultSet http://docs.

oracle.com/javase/7/docs/api/java/sql/ResultSet.html 作数据有用的 式的 数。

5-37 中, 会 (Int, String) 。 数 ,Spark 会自 行

DataStax 开 用 Spark 的 Cassandra https://github.com/datastax/spark-cassandra-connector ,Spark Cassandra 的 大大 。 Spark 的 分,

需要 加 的 的 件中 能 用它。Cassandra 有 用

Spark SQL, 它会 由CassandraRow 成的RDD, 有 分方法 Spark SQL 的Row 的方法 同, 5-38 和 5-39 所 。Spark 的 Cassandra

能 Java 和 Scala 中 用。

5-38:Cassandra 的sbt

"com.datastax.spark" %% "spark-cassandra-connector" % "1.0.0-rc5",

"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.0.0-rc5"

8: 道 有 记 , 执行 计数 , 据 决定upperBound

lowerBound的 。

5-39:Cassandra 的Maven

<dependency> <!-- Cassandra -->

<groupId>com.datastax.spark</groupId>

<artifactId>spark-cassandra-connector</artifactId>

<version>1.0.0-rc5</version>

</dependency>

<dependency> <!-- Cassandra -->

<groupId>com.datastax.spark</groupId>

<artifactId>spark-cassandra-connector-java</artifactId>

<version>1.0.0-rc5</version>

</dependency>

Elasticsearch ,Cassandra 要读 作 决定 集 。

spark.cassandra.connection.host Cassandra 集 。 有用 和 的

, 需要分 spark.cassandra.auth.username和spark.cassandra.auth.password。

定 有 Cassandra 集 要 , SparkContext 时 , 5-40 和 5-41 所 。

5-40: Scala 中 Cassandra val conf = new SparkConf(true)

.set("spark.cassandra.connection.host", "hostname") val sc = new SparkContext(conf)

5-41: Java 中 Cassandra SparkConf conf = new SparkConf(true)

.set("spark.cassandra.connection.host", cassandraHost);

JavaSparkContext sc = new JavaSparkContext(

sparkMaster, "basicquerycassandra", conf);

Datastax 的 Cassandra 用Scala 中的 式 SparkContext 和 RDD

加 数。 式 , 读 数据 5-42 所 。

5-42: Scala 中 张 读 RDD

// SparkContext和RDD 加 数的 式

import com.datastax.spark.connector._

// 张 读 RDD。 的 test的

// CREATE TABLE test.kv(key text PRIMARY KEY, value int);

val data = sc.cassandraTable("test" , "kv")

// 打印出value字 的 本 计。

import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;

// 张 读 RDD。 的 test的

// CREATE TABLE test.kv(key text PRIMARY KEY, value int);

JavaRDD<CassandraRow> data = javaFunctions(sc).cassandraTable("test" , "kv");

// 打印 本 计。

System.out.println(data.mapToDouble(new DoubleFunction<CassandraRow>() { public double call(CassandraRow row) { return row.getInt("value"); } }).stats());

了读 张 , 数据集的子集。 cassandraTable()的 用中加 where 子 , 制 的数据, sc.cassandraTable(...).where("key=?", "panda")。 Cassandra 的RDD 保 存 Cassandra 中。 保 存 由

CassandraRow 成的RDD, 制数据 有用。 定列的

, 存 行的 式而 元 和列 的 式的RDD, 5-44 所 。 5-44: Scala 中保存数据 Cassandra

val rdd = sc.parallelize(List(Seq("moremagic", 1)))

rdd.saveToCassandra("test" , "kv", SomeColumns("key", "value"))

本 简 地 了Cassandra 。要了解 , 阅 的GitHub https://github.com/datastax/spark-cassandra-connector 。

5.5.3 HBase

由 org.apache.hadoop.hbase.mapreduce.TableInputFormat 的 ,Spark

Hadoop 式 问HBase。 式会 数据, 中 的 org.

apache.hadoop.hbase.io.ImmutableBytesWritable,而 的 org.apache.hadoop.hbase.

client.Result。Result 据 列 的 方 法, API https://hbase.

apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html 中有所 。

要 Spark 用 HBase, 需要 用 的 式 用SparkContext.newAPIHadoopRDD。 Scala 中的 5-45 所 。 val conf = HBaseConfiguration.create()

conf.set(TableInputFormat.INPUT_TABLE, "tablename") // 张 val rdd = sc.newAPIHadoopRDD(

conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],classOf[Result])

TableInputFormat 用 化 HBase 的读 的 , 制

分列中, 及 制 的时 。 TableInputFormat的API http://

hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html 中

, HBaseConfiguration中 它 , 它 Spark。

5.5.4 Elasticsearch

Spark 用Elasticsearch-Hadoop https://github.com/elastic/elasticsearch-hadoop Elasticsearch 中读 数据。Elasticsearch 开 的、 Lucene 的 。

Elasticsearch 和 的 大 ,它会 的路 ,

而 SparkContext 中 的 。Elasticsearch 的OutputFormat 有用 Spark 所 的 ,所 用saveAsHadoopDataSet 代 , 需要

5-46: Scala 中 用 Elasticsearch 出

val jobConf = new JobConf(sc.hadoopConfiguration)

jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.

mr.EsOutputFormat")

jobConf.setOutputCommitter(classOf[FileOutputCommitter])

jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets") jobConf.set(ConfigurationOptions.ES_NODES, "localhost")

FileOutputFormat.setOutputPath(jobConf, new Path("-")) output.saveAsHadoopDataset(jobConf)

5-47: Scala 中 用 Elasticsearch

def mapWritableToInput(in: MapWritable): Map[String, String] = { in.map{case (k, v) => (k.toString, v.toString)}.toMap

}

val jobConf = new JobConf(sc.hadoopConfiguration)

jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1)) jobConf.set(ConfigurationOptions.ES_NODES, args(2)) val currentTweets = sc.hadoopRDD(jobConf,

classOf[EsInputFormat[Object, MapWritable]], classOf[Object], classOf[MapWritable])

// map

// MapWritable[Text, Text] Map[String, String]

val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }

和 ,Elasticsearch 有 , 作 的 有效 。

出而 ,Elasticsearch 进行 , 尔会 出 的

数据 , 要存 字 的数据 , 定

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html 。

5.6

本 , 应 能 数据读 Spark 中, 计算 所 的方

式存 。 了数据 用的 同 式, 及它 应的数

据处理的方式。 掌握了读 和保存大 数据集的方法, 会

用 编 高效 大的Spark 序的方法。

第 6 章

Spark编程进阶

6.1

本 有 及的Spark 编 的 进 特 ,会 的共 量:

累加器 accumulator 广播变量 broadcast variable 。 加 用 进行 合,而 量用 高效分发 大的 。 有的RDD 化 作的 ,

数据 需要 大 代价的任 了 作。 了 用的工具 ,本 会

Spark 序交互的方式, 用R 编 的 本进行交互。

本 会 用 电 作者的 作 , 出 的 应用。

的 的 号。 号 由 分 的, 有自 的 号号 ,

所 据 号 应的 。有 作者的地理 ,用

定 。 6-1 了 。本书的 代 中 需要

中 进行处理的 号列 。

6-1: JSON 式的 , 中 字

{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",

"contactlat":"37.384733","contactlong":"-122.032164",

"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",

"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}

要用 的第 Spark 特 共 量。共 量 Spark 任 中 用的特

的 量。 中, 用Spark 共 量 的 进行计数,

及分发 张 大的 。

任 需要 时 进行 , 需要 数据 者 数生成 时, 数 file = sc.textFile(inputFile)

# Accumulator[Int] 化 0 blankLines = sc.accumulator(0) def extractCallSigns(line):

global blankLines # 问 量 if (line == ""):

blankLines += 1 return line.split(" ")

callSigns = file.flatMap(extractCallSigns)

callSigns.saveAsTextFile(outputDir + "/callsigns") print "Blank lines: %d" % blankLines.value

6-3: Scala 中 加 行 val sc = new SparkContext(...) val file = sc.textFile("file.txt")

val blankLines = sc.accumulator(0) // Accumulator[Int] 化 0 val callSigns = file.flatMap(line => {

if (line == "") { println("Blank lines: " + blankLines.value) 6-4: Java 中 加 行

JavaRDD<String> rdd = sc.textFile(args[1]);

final Accumulator<Integer> blankLines = sc.accumulator(0);

JavaRDD<String> callSigns = rdd.flatMap(

new FlatMapFunction<String, String>() { public Iterable<String> call(String line) { if (line.equals("")) {

blankLines.add(1);

}

return Arrays.asList(line.split(" "));

}});

callSigns.saveAsTextFile("output.txt")

System.out.println("Blank lines: "+ blankLines.value());

中, 了 作blankLines的Accumulator[Int] ,

中 行时 加1。执行 化 作 , 打印出 加 中的 。 ,

• 中 用SparkContext.accumulator(initialValue)方 法, 出 存 有

的 加 。 org.apache.spark.Accumulator[T] , 中T

initialValue的 。

的计数 时 方 , 有 需要 时, 者 需要

validSignCount = sc.accumulator(0) invalidSignCount = sc.accumulator(0) def validateSign(sign):

global validSignCount, invalidSignCount

if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):

validSignCount += 1

validSigns = callSigns.filter(validateSign)

contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x + y)

# 制 计算计数

contactCount.count()

if invalidSignCount.value < 0.1 * validSignCount.value:

contactCount.saveAsTextFile(outputDir + "/contactCount") else:

print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.

value)

加 要 处理 ,对于要在行动操作中使用的累加器,Spark

只会把每个任务对各累加器的修改应用一次。 , 要 计

算时 的 加 , 它 foreach() 的行 作中。

对于在RDD 转化操作中使用的累加器,就不能保证有这种情况了。 化 作中 加

能会发生 次 新。 子, 存 有 用的RDD 第

次 LRU 存中 新用 时, 的 次 新 会发生。 会 制

RDD 据 进行 算,而 作用 会 中的 化 作 的 加 进行

新, 次发 中。 化 作中, 加 用 的。

版本的Spark 能会 行 成 新 次 加 的 , 版本

1.2.0 会进行 次 新, 化 作中的 加 时 用。

6.2.2

, 学 了 用加法 作Spark 的 加 Accumulator[Int] 。 Spark Double、Long和Float 的 加 。 ,Spark 了自定义

加 和 合 作的API 要 加的 中的 大 ,而 加 。自

定义 加 需要 AccumulatorParam, Spark API http://spark.apache.org/docs/

latest/api/scala/index.html#package 中有所 。 要 作同时 交 和 合 ,

用任 作 代 数 的加法。 了 和, 数据的 大 。

任 的a 和 b,有 a op b = b op a,op 交 。

任 的a 和 b,有 a op b = b op a,op 交 。

在文檔中 快速大数据分析 (頁 95-0)

相關文件