• 沒有找到結果。

Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java. Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any language that can read standard input and write to standard output to write your MapReduce program.[22]

Streaming is naturally suited for text processing. Map input data is passed over standard input to your map function, which processes it line by line and writes lines to standard output. A map output key-value pair is written as a single tab-delimited line. Input to the reduce function is in the same format — a tab-separated key-value pair — passed over standard input. The reduce function reads lines from standard input, which the framework guarantees are sorted by key, and writes its results to standard output.

Let’s illustrate this by rewriting our MapReduce program for finding maximum temperatures by year in Streaming.

Ruby

The map function can be expressed in Ruby as shown in Example 2-7.

Example 2-7. Map function for maximum temperature in Ruby

#!/usr/bin/env ruby

STDIN.each_line do |line|

val = line

year, temp, q = val[15,4], val[87,5], val[92,1]

puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) end

The program iterates over lines from standard input by executing a block for each line from STDIN (a global constant of type IO). The block pulls out the relevant fields from each input line and, if the temperature is valid, writes the year and the temperature separated by a tab character, \t, to standard output (using puts).

NOTE

It’s worth drawing out a design difference between Streaming and the Java MapReduce API. The Java API is geared toward processing your map function one record at a time. The framework calls the map() method on your Mapper for each record in the input, whereas with Streaming the map program can decide how to process the input — for

example, it could easily read and process multiple lines at a time since it’s in control of the reading. The user’s Java map implementation is “pushed” records, but it’s still possible to consider multiple lines at a time by accumulating previous lines in an instance variable in the Mapper.[23] In this case, you need to implement the close() method so that you know when the last record has been read, so you can finish processing the last group of lines.

Because the script just operates on standard input and output, it’s trivial to test the script without using Hadoop, simply by using Unix pipes:

% cat input/ncdc/sample.txt | ch02-mr-intro/src/main/ruby/max_temperature_map.rb 1950 +0000

1950 +0022 1950 -0011 1949 +0111 1949 +0078

The reduce function shown in Example 2-8 is a little more complex.

Example 2-8. Reduce function for maximum temperature in Ruby

#!/usr/bin/env ruby

last_key, max_val = nil, -1000000 STDIN.each_line do |line|

key, val = line.split("\t") if last_key && last_key != key puts "#{last_key}\t#{max_val}"

last_key, max_val = key, val.to_i else

last_key, max_val = key, [max_val, val.to_i].max end

end

puts "#{last_key}\t#{max_val}" if last_key

Again, the program iterates over lines from standard input, but this time we have to store some state as we process each key group. In this case, the keys are the years, and we store the last key seen and the maximum temperature seen so far for that key. The MapReduce framework ensures that the keys are ordered, so we know that if a key is different from the previous one, we have moved into a new key group. In contrast to the Java API, where you are provided an iterator over each key group, in Streaming you have to find key group boundaries in your program.

For each line, we pull out the key and value. Then, if we’ve just finished a group

(last_key && last_key != key), we write the key and the maximum temperature for that group, separated by a tab character, before resetting the maximum temperature for the new key. If we haven’t just finished a group, we just update the maximum temperature for the current key.

The last line of the program ensures that a line is written for the last key group in the input.

We can now simulate the whole MapReduce pipeline with a Unix pipeline (which is equivalent to the Unix pipeline shown in Figure 2-1):

% cat input/ncdc/sample.txt | \

ch02-mr-intro/src/main/ruby/max_temperature_map.rb | \ sort | ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb 1949 111

1950 22

The output is the same as that of the Java program, so the next step is to run it using Hadoop itself.

The hadoop command doesn’t support a Streaming option; instead, you specify the

Streaming JAR file along with the jar option. Options to the Streaming program specify the input and output paths and the map and reduce scripts. This is what it looks like:

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \ -input input/ncdc/sample.txt \

-output output \

-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \ -reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

When running on a large dataset on a cluster, we should use the -combiner option to set the combiner:

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \ -files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\

ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \ -input input/ncdc/all \

-output output \

-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \ -combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \ -reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

Note also the use of -files, which we use when running Streaming programs on the cluster to ship the scripts to the cluster.

Python

Streaming supports any programming language that can read from standard input and write to standard output, so for readers more familiar with Python, here’s the same example again.[24] The map script is in Example 2-9, and the reduce script is in Example 2-10.

Example 2-9. Map function for maximum temperature in Python

#!/usr/bin/env python

import re import sys

for line in sys.stdin:

val = line.strip()

(year, temp, q) = (val[15:19], val[87:92], val[92:93]) if (temp != "+9999" and re.match("[01459]", q)):

print "%s\t%s" % (year, temp)

Example 2-10. Reduce function for maximum temperature in Python

#!/usr/bin/env python import sys

(last_key, max_val) = (None, -sys.maxint) for line in sys.stdin:

(key, val) = line.strip().split("\t") if last_key and last_key != key:

print "%s\t%s" % (last_key, max_val) (last_key, max_val) = (key, int(val)) else:

(last_key, max_val) = (key, max(max_val, int(val))) if last_key:

print "%s\t%s" % (last_key, max_val)

We can test the programs and run the job in the same way we did in Ruby. For example, to run a test:

% cat input/ncdc/sample.txt | \

ch02-mr-intro/src/main/python/max_temperature_map.py | \ sort | ch02-mr-intro/src/main/python/max_temperature_reduce.py 1949 111

1950 22

[20] Functions with this property are called commutative and associative. They are also sometimes referred to as

distributive, such as by Jim Gray et al.’s “Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals,” February1995.

[21] This is a factor of seven faster than the serial run on one machine using awk. The main reason it wasn’t

proportionately faster is because the input data wasn’t evenly partitioned. For convenience, the input files were gzipped by year, resulting in large files for later years in the dataset, when the number of weather records was much higher.

[22] Hadoop Pipes is an alternative to Streaming for C++ programmers. It uses sockets to communicate with the process running the C++ map or reduce function.

[23] Alternatively, you could use “pull”-style processing in the new MapReduce API; see Appendix D.

[24] As an alternative to Streaming, Python programmers should consider Dumbo, which makes the Streaming MapReduce interface more Pythonic and easier to use.

Chapter 3. The Hadoop Distributed Filesystem

When a dataset outgrows the storage capacity of a single physical machine, it becomes necessary to partition it across a number of separate machines. Filesystems that manage the storage across a network of machines are called distributed filesystems. Since they are network based, all the complications of network programming kick in, thus making

distributed filesystems more complex than regular disk filesystems. For example, one of the biggest challenges is making the filesystem tolerate node failure without suffering data loss.

Hadoop comes with a distributed filesystem called HDFS, which stands for Hadoop Distributed Filesystem. (You may sometimes see references to “DFS” — informally or in older documentation or configurations — which is the same thing.) HDFS is Hadoop’s flagship filesystem and is the focus of this chapter, but Hadoop actually has a general-purpose filesystem abstraction, so we’ll see along the way how Hadoop integrates with other storage systems (such as the local filesystem and Amazon S3).