return flag;
}
relogin的代码样例
public Boolean relogin(){
boolean flag = false;
try {
UserGroupInformation.getLoginUser().reloginFromKeytab();
System.out.println("UserGroupInformation.isLoginKeytabBased(): "
+UserGroupInformation.isLoginKeytabBased());
flag = true;
} catch (IOException e) { e.printStackTrace();
}
return flag;
}
5.7.2 建议
全局使用的配置项,在 mapred-site.xml 中指定
如下给出接口所对应的mapred-site.xml中的配置项:
setMapperClass(Class <extends Mapper> cls) ->“mapreduce.job.map.class”
setReducerClass(Class<extends Reducer> cls) ->“mapreduce.job.reduce.class”
setCombinerClass(Class<extends Reducer> cls) ->“mapreduce.job.combine.class”
setInputFormatClass(Class<extends InputFormat> cls)
->“mapreduce.job.inputformat.class”
setJar(String jar) ->“mapreduce.job.jar”
setOutputFormat(Class< extends OutputFormat> theClass)
->“mapred.output.format.class”
setOutputKeyClass(Class<> theClass) ->“mapreduce.job.output.key.class”
setOutputValueClass(Class<> theClass) ->“mapreduce.job.output.value.class”
setPartitionerClass(Class<extends Partitioner> theClass)
->“mapred.partitioner.class”
setMapOutputCompressorClass(Class<extends CompressionCodec> codecClass) ->“mapreduce.map.output.compress”&“mapreduce.map.output.compress.codec
”
setJobPriority(JobPriority prio) ->“mapreduce.job.priority”
setQueueName(String queueName) ->“mapreduce.job.queuename”
setNumMapTasks(int n) ->“mapreduce.job.maps”
setNumReduceTasks(int n) ->“mapreduce.job.reduces”
5.7.3 示例
统计日志文件中本周末网购停留总时间超过 2 个小时的女性网民信息。
主要分为三个部分。
1. 从原文件中筛选女性网民上网时间数据信息,通过类MapperClass继承Mapper抽 象类实现。
2. 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类 ReducerClass继承Reducer抽象类实现。
3. main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群 样例1:类MapperClass定义Mapper抽象类的map()方法和setup()方法。
public static class MapperClass extends Mapper<Object, Text, Text, IntWritable> { // 分隔符。
String delim;
// 性别筛选。
String sexFilter;
private final static IntWritable timeInfo = new IntWritable(1);
private Text nameInfo = new Text();
/*** map的输入,key为原文件位置偏移量,value为原文件的一行字符数据。
* 其map的输入key,value为文件分割方法InputFormat提供,用户不设置,默认使用TextInputFormat。
*/public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 读取的一行字符串数据 String line = value.toString();
if (line.contains(sexFilter)) { // 获取姓名
String name = line.substring(0, line.indexOf(delim));
nameInfo.set(name);
// 获取上网停留时间
String time = line.substring(line.lastIndexOf(delim), line.length());
timeInfo.set(Integer.parseInt(time));
// map输出key,value键值对 context.write(nameInfo, timeInfo);
}
}/**
* setup()方法只在进入map任务的map()方法之前或者reduce任务的reduce()方法之前调用一次
*/public void setup(Context context) throws IOException, InterruptedException {
// 通过Context可以获得配置信息。
sexFilter = delim + context.getConfiguration().get("log.sex.filter", "female") + delim;
}}
样例2:类ReducerClass定义Reducer抽象类的reduce()方法。
public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable();
// 总时间门槛。
private int timeThreshold;
/*** @param 输入为一个key和value值集合迭代器。
* 由各个map汇总相同的key而来。reduce方法汇总相同key的个数。
* 并调用context.write(key, value)输出到指定目录。
* 其reduce的输出的key,value由Outputformat写入文件系统。
* 默认使用TextOutputFormat写入HDFS。
*/public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0;
for (IntWritable val : values) { sum += val.get();
}// 如果时间小于门槛时间,不输出结果。
if (sum < timeThreshold) { return;
*/public void setup(Context context) throws IOException, InterruptedException {
// Context可以获得配置信息。
timeThreshold = context.getConfiguration().getInt(
"log.time.threshold", 120);
}}
样例3:main()方法创建一个job,指定参数,提交作业到hadoop集群。
public static void main(String[] args) throws Exception { Configuration conf = getConfiguration();
// main方法输入参数:args[0]为样例MR作业输入路径,args[1]为样例MR作业输出路径 String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}Job job = new Job(conf, "Collect Female Info");
// 设置找到主任务所在的jar包。
job.setCombinerClass(IntSumReducerV1.class);
// 设置作业的输出类型,也可以通过配置文件指定。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置该job的输入输出路径,也可以通过配置文件指定。
Path outputPath = new Path(otherArgs[1]);
FileSystem fs = outputPath.getFileSystem(conf);
// 如果输出路径已存在,删除该路径。
if (fs.exists(outputPath)) { fs.delete(outputPath, true);
}FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) 0 : 1);
}
6 HDFS 应用开发
6.1 概述
6.1.1 HDFS 简介
HDFS 简介
HDFS(Hadoop Distribute FileSystem)是一个适合运行在通用硬件之上,具备高度 容错特性,支持高吞吐量数据访问的分布式文件系统,适合大规模数据集应用。
HDFS适用于如下场景。
● 处理海量数据(TB或PB级别以上)
● 需要很高的吞吐量
● 需要高可靠性
● 需要很好的可扩展能力