} }
public static void execDDL(Connection connection, String sql) throws SQLException {
PreparedStatement statement = null;
try {
statement = connection.prepareStatement(sql);
statement.execute();
public static void execDML(Connection connection, String sql) throws SQLException { PreparedStatement statement = null;
ResultSet resultSet = null;
ResultSetMetaData resultMetaData = null;
try { // 执行HQL
statement = connection.prepareStatement(sql);
resultSet = statement.executeQuery();
// 输出查询的列名到控制台
resultMetaData = resultSet.getMetaData();
int columnCount = resultMetaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
System.out.print(resultMetaData.getColumnLabel(i) + '\t');
}
public class HCatalogExample extends Configured implements Tool { public static class Map extends
Mapper<LongWritable, HCatRecord, IntWritable, IntWritable> {
int age;
@Override
protected void map(
LongWritable key, HCatRecord value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, HCatRecord, IntWritable, IntWritable>.Context context)
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, HCatRecord> {
@Override
protected void reduce(
IntWritable key,
java.lang.Iterable<IntWritable> values,
org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable, IntWritable, HCatRecord>.Context context)
public int run(String[] args) throws Exception { Configuration conf = getConf();
@SuppressWarnings("deprecation") Job job = new Job(conf, "GroupByDemo");
HCatInputFormat.setInput(job, dbName, inputTableName);
job.setInputFormatClass(HCatInputFormat.class);
job.setJarByClass(HCatalogExample.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(WritableComparable.class);
job.setOutputValueClass(DefaultHCatRecord.class);
OutputJobInfo outputjobInfo = OutputJobInfo.create(dbName,outputTableName, null);
HCatOutputFormat.setOutput(job, outputjobInfo);
HCatSchema schema = outputjobInfo.getOutputSchema();
HCatOutputFormat.setSchema(job, schema);
job.setOutputFormatClass(HCatOutputFormat.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new HCatalogExample(), args);
System.exit(exitCode);
} }
5 MapReduce 应用开发
5.1 概述
5.1.1 MapReduce 简介
Hadoop MapReduce是一个使用简易的并行计算软件框架,基于它写出来的应用程序 能够运行在由上千个服务器组成的大型集群上,并以一种可靠容错的方式并行处理上T 级别的数据集。
一个MapReduce作业(application/job)通常会把输入的数据集切分为若干独立的数 据块,由map任务(task)以完全并行的方式来处理。框架会对map的输出先进行排 序,然后把结果输入给reduce任务,最后返回给客户端。通常作业的输入和输出都会 被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任 务。
MapReduce主要特点如下:
● 大规模并行计算
● 适用于大型数据集
● 高容错性和高可靠性
● 合理的资源调度
5.1.2 常用概念
● Hadoop shell命令
Hadoop基本shell命令,包括提交MapReduce作业,kill MapReduce作业,进行 HDFS文件系统各项操作等。
● MapReduce输入输出(InputFormat,OutputFormat)
MapReduce框架根据用户指定的InputFormat切割数据集,读取数据,并提供给 map任务多条键值对进行处理,决定并行启动的map任务数目。MapReduce框架 根据用户指定的OutputFormat,把生成的键值对输出为特定格式的数据。
map、reduce两个阶段都处理在<key,value>键值对上,也就是说,框架把作业的 输入作为一组<key,value>键值对,同样也产出一组<key,value>键值对做为作业的 输出,这两组键值对的类型可能不同。对单个map和reduce而言,对键值对的处 理为单线程串行处理。
框架需要对key和value的类(classes)进行序列化操作,因此,这些类需要实现 Writable接口。另外,为了方便框架执行排序操作,key类必须实现
WritableComparable接口。
一个MapReduce作业的输入和输出类型如下所示:
(input)<k1,v1> —> map —> <k2,v2> —> 汇总数据 —> <k2,List(v2)> —>
reduce —> <k3,v3>(output)
● 业务核心
应用程序通常只需要分别继承Mapper类和Reducer类,并重写其map和reduce方 法来实现业务逻辑,它们组成作业的核心。
● MapReduce WebUI界面
用于监控正在运行的或者历史的MapReduce作业在MapReduce框架各个阶段的细 节,以及提供日志显示,帮助用户更细粒度地去开发、配置和调优作业。
● 归档
用来保证所有映射的键值对中的每一个共享相同的键组。
● 混洗
从Map任务输出的数据到Reduce任务的输入数据的过程称为Shuffle。
● 映射
用来把一组键值对映射成一组新的键值对。
5.1.3 开发流程
开发流程中各阶段的说明如图5-1和表5-1所示。
图5-1 MapReduce 应用程序开发流程
表5-1 MapReduce 应用开发的流程说明
阶段 说明 参考文档
了解基本概
念 在开始开发应用前,需要了解MapReduce的基本
概念。 常用概念