• 沒有找到結果。

安全认证代码

5.3.1 MapReduce 统计样例程序

5.3 开发程序

5.3.1 MapReduce 统计样例程序

场景说明

假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发 MapReduce应用程序实现如下功能。

● 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。

● 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单 位为分钟,分隔符为“,”。

log1.txt:周六网民停留日志

LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50

Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60

log2.txt:周日网民停留日志

LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60

数据规划

首先需要把原日志文件放置在HDFS系统里。

1. 本地新建两个文本文件,将log1.txt中的内容复制保存到input_data1.txt,将 log2.txt中的内容复制保存到input_data2.txt。

2. 在HDFS上建立一个文件夹,“/tmp/input”,并上传input_data1.txt,

input_data2.txt到此目录,命令如下。

a. 在Linux系统HDFS客户端使用命令

hdfs dfs -mkdir /tmp/input

b. 在Linux系统HDFS客户端使用命令hdfs dfs -put local_filepath /tmp/input

开发思路

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。

主要分为四个部分。

● 读取原文件数据。

● 筛选女性网民上网时间数据信息。

● 汇总每个女性上网总时间。

● 筛选出停留总时间大于两个小时的女性网民信息。

功能介绍

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。

主要分为三个部分。

● 从原文件中筛选女性网民上网时间数据信息,通过类CollectionMapper继承 Mapper抽象类实现。

● 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类 CollectionReducer继承Reducer抽象类实现。

● main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集

public static class CollectionMapper extends Mapper<Object, Text, Text, IntWritable> {

private IntWritable timeInfo = new IntWritable(1);

/**

* @throws IOException , InterruptedException */

public void setup(Context context) throws IOException, InterruptedException

{

// 通过Context可以获得配置信息。

delim = context.getConfiguration().get("log.delimiter", ",");

sexFilter = delim

+ context.getConfiguration()

.get("log.sex.filter", "female") + delim;

} }

样例2:类CollectionReducer定义Reducer抽象类的reduce()方法。

public static class CollectionReducer extends Reducer<Text, IntWritable, Text, IntWritable>

* @throws IOException , InterruptedException */

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException

* @throws IOException , InterruptedException */

public void setup(Context context) throws IOException, InterruptedException

public static void main(String[] args) throws Exception { // 初始化环境变量。

Configuration conf = new Configuration();

// 获取入参。

String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: collect female info <in> <out>");

System.exit(2);

}

// 判断是否为安全模式

if("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))){

//security mode

System.setProperty("java.security.krb5.conf", KRB);

LoginUtil.login(PRINCIPAL, KEYTAB, KRB, conf);

}

// 初始化Job任务对象。

Job job = Job.getInstance(conf, "Collect Female Info");

job.setJarByClass(FemaleInfoCollector.class);

// 设置运行时执行map,reduce的类,也可以通过配置文件指定。

job.setMapperClass(CollectionMapper.class);

job.setReducerClass(CollectionReducer.class);

// 设置combiner类,默认不使用,使用时通常使用和reduce一样的类。

// Combiner类需要谨慎使用,也可以通过配置文件指定。

job.setCombinerClass(CollectionCombiner.class);

// 设置作业的输出类型。

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

// 提交任务交到远程环境上执行。

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

public static class CollectionCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { // Intermediate statistical results

private IntWritable intermediateResult = new IntWritable();

/**

* @param key Text : key after Mapper

* @param values Iterable : all results with the same key in this map task * @param context Context

* @throws IOException , InterruptedException */

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0;

for (IntWritable val : values) { sum += val.get();

}

intermediateResult.set(sum);

// In the output information, key indicates netizen information,

// and value indicates the total online time of the netizen in this map task.

context.write(key, intermediateResult);

} }