• 沒有找到結果。

return flag;

}

relogin的代码样例

public Boolean relogin(){

boolean flag = false;

try {

UserGroupInformation.getLoginUser().reloginFromKeytab();

System.out.println("UserGroupInformation.isLoginKeytabBased(): "

+UserGroupInformation.isLoginKeytabBased());

flag = true;

} catch (IOException e) { e.printStackTrace();

}

return flag;

}

5.7.2 建议

全局使用的配置项,在 mapred-site.xml 中指定

如下给出接口所对应的mapred-site.xml中的配置项:

setMapperClass(Class <extends Mapper> cls) ->“mapreduce.job.map.class”

setReducerClass(Class<extends Reducer> cls) ->“mapreduce.job.reduce.class”

setCombinerClass(Class<extends Reducer> cls) ->“mapreduce.job.combine.class”

setInputFormatClass(Class<extends InputFormat> cls)

->“mapreduce.job.inputformat.class”

setJar(String jar) ->“mapreduce.job.jar”

setOutputFormat(Class< extends OutputFormat> theClass)

->“mapred.output.format.class”

setOutputKeyClass(Class<> theClass) ->“mapreduce.job.output.key.class”

setOutputValueClass(Class<> theClass) ->“mapreduce.job.output.value.class”

setPartitionerClass(Class<extends Partitioner> theClass)

->“mapred.partitioner.class”

setMapOutputCompressorClass(Class<extends CompressionCodec> codecClass) ->“mapreduce.map.output.compress”&“mapreduce.map.output.compress.codec

setJobPriority(JobPriority prio) ->“mapreduce.job.priority”

setQueueName(String queueName) ->“mapreduce.job.queuename”

setNumMapTasks(int n) ->“mapreduce.job.maps”

setNumReduceTasks(int n) ->“mapreduce.job.reduces”

5.7.3 示例

统计日志文件中本周末网购停留总时间超过 2 个小时的女性网民信息。

主要分为三个部分。

1. 从原文件中筛选女性网民上网时间数据信息,通过类MapperClass继承Mapper抽 象类实现。

2. 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类 ReducerClass继承Reducer抽象类实现。

3. main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群 样例1:类MapperClass定义Mapper抽象类的map()方法和setup()方法。

public static class MapperClass extends Mapper<Object, Text, Text, IntWritable> { // 分隔符。

String delim;

// 性别筛选。

String sexFilter;

private final static IntWritable timeInfo = new IntWritable(1);

private Text nameInfo = new Text();

/*** map的输入,key为原文件位置偏移量,value为原文件的一行字符数据。

* 其map的输入key,value为文件分割方法InputFormat提供,用户不设置,默认使用TextInputFormat。

*/public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

// 读取的一行字符串数据 String line = value.toString();

if (line.contains(sexFilter)) { // 获取姓名

String name = line.substring(0, line.indexOf(delim));

nameInfo.set(name);

// 获取上网停留时间

String time = line.substring(line.lastIndexOf(delim), line.length());

timeInfo.set(Integer.parseInt(time));

// map输出key,value键值对 context.write(nameInfo, timeInfo);

}

}/**

* setup()方法只在进入map任务的map()方法之前或者reduce任务的reduce()方法之前调用一次

*/public void setup(Context context) throws IOException, InterruptedException {

// 通过Context可以获得配置信息。

sexFilter = delim + context.getConfiguration().get("log.sex.filter", "female") + delim;

}}

样例2:类ReducerClass定义Reducer抽象类的reduce()方法。

public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable();

// 总时间门槛。

private int timeThreshold;

/*** @param 输入为一个key和value值集合迭代器。

* 由各个map汇总相同的key而来。reduce方法汇总相同key的个数。

* 并调用context.write(key, value)输出到指定目录。

* 其reduce的输出的key,value由Outputformat写入文件系统。

* 默认使用TextOutputFormat写入HDFS。

*/public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0;

for (IntWritable val : values) { sum += val.get();

}// 如果时间小于门槛时间,不输出结果。

if (sum < timeThreshold) { return;

*/public void setup(Context context) throws IOException, InterruptedException {

// Context可以获得配置信息。

timeThreshold = context.getConfiguration().getInt(

"log.time.threshold", 120);

}}

样例3:main()方法创建一个job,指定参数,提交作业到hadoop集群。

public static void main(String[] args) throws Exception { Configuration conf = getConfiguration();

// main方法输入参数:args[0]为样例MR作业输入路径,args[1]为样例MR作业输出路径 String[] otherArgs = new GenericOptionsParser(conf, args)

.getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: <in> <out>");

System.exit(2);

}Job job = new Job(conf, "Collect Female Info");

// 设置找到主任务所在的jar包。

job.setCombinerClass(IntSumReducerV1.class);

// 设置作业的输出类型,也可以通过配置文件指定。

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

// 设置该job的输入输出路径,也可以通过配置文件指定。

Path outputPath = new Path(otherArgs[1]);

FileSystem fs = outputPath.getFileSystem(conf);

// 如果输出路径已存在,删除该路径。

if (fs.exists(outputPath)) { fs.delete(outputPath, true);

}FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) 0 : 1);

}

6 HDFS 应用开发

6.1 概述

6.1.1 HDFS 简介

HDFS 简介

HDFS(Hadoop Distribute FileSystem)是一个适合运行在通用硬件之上,具备高度 容错特性,支持高吞吐量数据访问的分布式文件系统,适合大规模数据集应用。

HDFS适用于如下场景。

● 处理海量数据(TB或PB级别以上)

● 需要很高的吞吐量

● 需要高可靠性

● 需要很好的可扩展能力