• 沒有找到結果。

} }

public static void execDDL(Connection connection, String sql) throws SQLException {

PreparedStatement statement = null;

try {

statement = connection.prepareStatement(sql);

statement.execute();

public static void execDML(Connection connection, String sql) throws SQLException { PreparedStatement statement = null;

ResultSet resultSet = null;

ResultSetMetaData resultMetaData = null;

try { // 执行HQL

statement = connection.prepareStatement(sql);

resultSet = statement.executeQuery();

// 输出查询的列名到控制台

resultMetaData = resultSet.getMetaData();

int columnCount = resultMetaData.getColumnCount();

for (int i = 1; i <= columnCount; i++) {

System.out.print(resultMetaData.getColumnLabel(i) + '\t');

}

public class HCatalogExample extends Configured implements Tool { public static class Map extends

Mapper<LongWritable, HCatRecord, IntWritable, IntWritable> {

int age;

@Override

protected void map(

LongWritable key, HCatRecord value,

org.apache.hadoop.mapreduce.Mapper<LongWritable, HCatRecord, IntWritable, IntWritable>.Context context)

public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, HCatRecord> {

@Override

protected void reduce(

IntWritable key,

java.lang.Iterable<IntWritable> values,

org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable, IntWritable, HCatRecord>.Context context)

public int run(String[] args) throws Exception { Configuration conf = getConf();

@SuppressWarnings("deprecation") Job job = new Job(conf, "GroupByDemo");

HCatInputFormat.setInput(job, dbName, inputTableName);

job.setInputFormatClass(HCatInputFormat.class);

job.setJarByClass(HCatalogExample.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setMapOutputKeyClass(IntWritable.class);

job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(WritableComparable.class);

job.setOutputValueClass(DefaultHCatRecord.class);

OutputJobInfo outputjobInfo = OutputJobInfo.create(dbName,outputTableName, null);

HCatOutputFormat.setOutput(job, outputjobInfo);

HCatSchema schema = outputjobInfo.getOutputSchema();

HCatOutputFormat.setSchema(job, schema);

job.setOutputFormatClass(HCatOutputFormat.class);

return (job.waitForCompletion(true) ? 0 : 1);

}

public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new HCatalogExample(), args);

System.exit(exitCode);

} }

5 MapReduce 应用开发

5.1 概述

5.1.1 MapReduce 简介

Hadoop MapReduce是一个使用简易的并行计算软件框架,基于它写出来的应用程序 能够运行在由上千个服务器组成的大型集群上,并以一种可靠容错的方式并行处理上T 级别的数据集。

一个MapReduce作业(application/job)通常会把输入的数据集切分为若干独立的数 据块,由map任务(task)以完全并行的方式来处理。框架会对map的输出先进行排 序,然后把结果输入给reduce任务,最后返回给客户端。通常作业的输入和输出都会 被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任 务。

MapReduce主要特点如下:

● 大规模并行计算

● 适用于大型数据集

● 高容错性和高可靠性

● 合理的资源调度

5.1.2 常用概念

Hadoop shell命令

Hadoop基本shell命令,包括提交MapReduce作业,kill MapReduce作业,进行 HDFS文件系统各项操作等。

MapReduce输入输出(InputFormat,OutputFormat)

MapReduce框架根据用户指定的InputFormat切割数据集,读取数据,并提供给 map任务多条键值对进行处理,决定并行启动的map任务数目。MapReduce框架 根据用户指定的OutputFormat,把生成的键值对输出为特定格式的数据。

map、reduce两个阶段都处理在<key,value>键值对上,也就是说,框架把作业的 输入作为一组<key,value>键值对,同样也产出一组<key,value>键值对做为作业的 输出,这两组键值对的类型可能不同。对单个map和reduce而言,对键值对的处 理为单线程串行处理。

框架需要对key和value的类(classes)进行序列化操作,因此,这些类需要实现 Writable接口。另外,为了方便框架执行排序操作,key类必须实现

WritableComparable接口。

一个MapReduce作业的输入和输出类型如下所示:

(input)<k1,v1> —> map —> <k2,v2> —> 汇总数据 —> <k2,List(v2)> —>

reduce —> <k3,v3>(output)

● 业务核心

应用程序通常只需要分别继承Mapper类和Reducer类,并重写其map和reduce方 法来实现业务逻辑,它们组成作业的核心。

MapReduce WebUI界面

用于监控正在运行的或者历史的MapReduce作业在MapReduce框架各个阶段的细 节,以及提供日志显示,帮助用户更细粒度地去开发、配置和调优作业。

● 归档

用来保证所有映射的键值对中的每一个共享相同的键组。

● 混洗

从Map任务输出的数据到Reduce任务的输入数据的过程称为Shuffle。

● 映射

用来把一组键值对映射成一组新的键值对。

5.1.3 开发流程

开发流程中各阶段的说明如图5-1和表5-1所示。

5-1 MapReduce 应用程序开发流程

5-1 MapReduce 应用开发的流程说明

阶段 说明 参考文档

了解基本概

念 在开始开发应用前,需要了解MapReduce的基本

概念。 常用概念