安全认证代码
5.3.2 MapReduce 访问多组件样例程序
场景说明
该样例以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业 访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。
该样例逻辑过程如下。
以HDFS文本文件为输入数据 log1.txt:数据输入文件
YuanJing,male,10 GuoYijun,male,5
Map阶段
1. 获取输入数据的一行并提取姓名信息。
2. 查询HBase一条数据。
3. 查询Hive一条数据。
4. 将HBase查询结果与Hive查询结果进行拼接作为Map输出。
Reduce阶段
1. 获取Map输出中的最后一条数据。
2. 将数据输出到HBase。
3. 将数据保存到HDFS。
数据规划
1. 创建HDFS数据文件。
a. 在Linux系统上新建文本文件,将log1.txt中的内容复制保存到data.txt。
b. 在HDFS上创建一个文件夹,“/tmp/examples/multi-components/
mapreduce/input/”,并上传data.txt到此目录,命令如下。
i. 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir -p /tmp/examples/
multi-components/mapreduce/input/
ii. 在Linux系统HDFS客户端使用命令hdfs dfs -put data.txt /tmp/
examples/multi-components/mapreduce/input/
2. 创建HBase表并插入数据。
a. 在Linux系统HBase客户端使用命令hbase shell。
b. 在HBase shell交互窗口创建数据表table1,该表有一个列族cf,使用命令 create 'table1', 'cf'。
c. 插入一条rowkey为1、列名为cid、数据值为123的数据,使用命令put 'table1', '1', 'cf:cid', '123'。
d. 执行命令quit退出。
3. 创建Hive表并载入数据。
a. 在Linux系统Hive客户端使用命令beeline。
b. 在Hive beeline交互窗口创建数据表person,该表有3个字段:name/gender/
stayTime,使用命令CREATE TABLE person(name STRING, gender
STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;。
c. 在Hive beeline交互窗口加载数据文件,LOAD DATA INPATH '/tmp/
examples/multi-components/mapreduce/input/' OVERWRITE INTO TABLE person;。
● main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集 群。
代码样例
下面代码片段仅为演示,具体代码请参见
com.huawei.bigdata.mapreduce.examples.MultiComponentExample类 样例1:类MultiComponentMapper定义Mapper抽象类的map方法。
private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> { Configuration conf;
@Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
setJaasInfo("krb5.conf", "jaas.conf");
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
String zkQuorum = clientInfo.getProperty("zk.quorum");
String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
// 创建Hive鉴权信息 // Read this carefully:
// MapReduce can only use Hive through JDBC.
// Hive will submit another MapReduce Job to execute query.
// So we run Hive in MapReduce is not recommended.
final String driver = "org.apache.hive.jdbc.HiveDriver";
String sql = "select name,sum(stayTime) as "
+ "stayTime from person where name = ? group by name";
StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
// in map or reduce, use 'auth=delegationToken'
hbaseConn = ConnectionFactory.createConnection(conf);
// get table
table = hbaseConn.getTable(TableName.valueOf(HBASE_TABLE_NAME));
} catch (IOException e) {
context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData));
}
Configuration hbaseConfig = HBaseConfiguration.create(conf);
org.apache.hadoop.hbase.client.Connection conn = null;
try {
// 创建一个HBase Get请求实例
Get get = new Get(hbaseKey.getBytes());
// 提交Get请求
Result result = table.get(get);
hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes()));
return hbaseValue;
private int readHive(String name) {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text finalValue = new Text("");
setJaasInfo("krb5.conf", "jaas.conf");
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
table = conn.getTable(TableName.valueOf(HBASE_TABLE_NAME));
} catch (IOException e) {
writeHBase(key.toString(), finalValue.toString());
// 将结果保存到HDFS context.write(key, finalValue);
}
样例5:结果输出到HBase的writeHBase方法。
private void writeHBase(String rowKey, String data) {
public static void main(String[] args) throws Exception {
// 清理所需目录
MultiComponentExample.cleanupBeforeRun();
// 查找Hive依赖jar包
Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver");
Class thriftClass = Class.forName("org.apache.thrift.TException");
Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.TCLIService");
Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf");
Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport");
Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException");
Class hiveShimClass = Class.forName("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge23");
// 添加Hive运行依赖到Job JarFinderUtil
.addDependencyJars(config, hiveDriverClass, thriftCLIClass, thriftClass, hiveConfClass, hiveTransClass, hiveMetaClass, hiveShimClass);
//开启Kerberos认证的安全集群登录
if("kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication"))){
//security mode
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, PRINCIPAL, KEYTAB);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
System.setProperty("java.security.krb5.conf", KRB);
LoginUtil.login(PRINCIPAL, KEYTAB, KRB, config);
}
// 添加Hive配置文件
config.addResource("hive-site.xml");
// 添加HBase配置文件
Configuration conf = HBaseConfiguration.create(config);
// 实例化Job
Job job = Job.getInstance(conf);
job.setJarByClass(MultiComponentExample.class);
// 设置mapper&reducer类
job.setMapperClass(MultiComponentMapper.class);
job.setReducerClass(MultiComponentReducer.class);
//设置Job输入输出路径
FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt"));
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
// 设置输出键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// HBase提供工具类添加HBase运行依赖到Job TableMapReduceUtil.addDependencyJars(job);
// 安全模式下必须要执行这个操作
// HBase添加鉴权信息到Job,map或reduce任务将会使用此处的鉴权信息 TableMapReduceUtil.initCredentials(job);
// 创建Hive鉴权信息 Properties clientInfo = null;
InputStream fileInputStream = null;
try {
clientInfo = new Properties();
File propertiesFile = new File(hiveClientProperties);
fileInputStream = new FileInputStream(propertiesFile);
clientInfo.load(fileInputStream);
String zkQuorum = clientInfo.getProperty("zk.quorum");//zookeeper节点ip和端口列表 String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
String principal = clientInfo.getProperty("principal");
String auth = clientInfo.getProperty("auth");
String sasl_qop = clientInfo.getProperty("sasl.qop");
StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=
")
.append(zooKeeperNamespace) .append(";sasl.qop=")
.append(sasl_qop) .append(";auth=") .append(auth) .append(";principal=") .append(principal) .append(";");
String url = sBuilder.toString();
Connection connection = DriverManager.getConnection(url, "", "");
String tokenStr = ((HiveConnection) connection)
.getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(), PRINCIPAL);
connection.close();
Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>();
hive2Token.decodeFromUrlString(tokenStr);
// 添加Hive鉴权信息到Job
job.getCredentials().addToken(new Text("hive.server2.delegation.token"), hive2Token);
job.getCredentials().addToken(new Text(HiveAuthFactory.HS2_CLIENT_TOKEN), hive2Token);
// 提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
} 说明
样例中所有zkQuorum对象需替换为实际ZooKeeper集群节点信息。
5.4 调测程序
5.4.1 编译并运行程序
在程序代码完成开发后,可以在Linux环境中运行应用。
说明
MapReduce应用程序只支持在Linux环境下运行,不支持在Windows环境下运行。
操作步骤
步骤1 生成MapReduce应用可执行包。
执行mvn package打出jar包,在工程目录target目录下获取,比如“mapreduce-examples-1.0.jar”。
步骤2 上传生成的应用包“mapreduce-examples-1.0.jar”到Linux客户端上。例如“/opt”
目录。
步骤3 如果集群开启Kerberos,参考5.2.2-准备开发用户获得的user.keytab”、
“krb5.conf”文件需要在Linux环境上创建文件夹保存这些配置文件,例如“/opt/
conf”。并在linux环境上,在客户端路径下(/opt/client/HDFS/hadoop/etc/
hadoop/)获得core-site.xml、hdfs-site.xml文件放入上述文件夹里。
步骤4 样例程序如果指定OBS为输入输出的目标文件系统(如obs://<BucketName>/
input/),需要进行以下配置。
在$YARN_CONF_DIR/core-site.xml中添加AK配置项“fs.obs.access.key”和SK配置项
“fs.obs.secret.key”,AK/SK可登录“OBS控制台”,进入“我的凭证”页面获取。
<property>
<name>fs.obs.access.key</name>
<value>xxxxxxxxxxxxxxxx</value>
yarn jar mapreduce-examples-1.0.jar
com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector <inputPath>
<outputPath>
此命令包含了设置参数和提交job的操作,其中<inputPath>指HDFS文件系统中input 的路径,<outputPath>指HDFS文件系统中output的路径。
说明
● 在执行yarn jar mapreduce-examples-1.0.jar
com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector <inputPath>
<outputPath>命令之前,需要把log1.txt和log2.txt这两个文件上传到HDFS的<inputPath>目
录下。
● 在执行yarn jar mapreduce-examples-1.0.jar
com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector <inputPath>
<outputPath>命令之前,<outputPath>目录必须不存在,否则会报错。
● mapreduce-examples-1.0.jar适用于MRS 1.x版本。
● 在MapReduce任务运行过程中禁止重启HDFS服务,否则可能会导致任务失败。
storeKey=true debug=true;
};
说明
▪
文件内容中的[email protected]为示例,实际操作时请做相应修改。▪
“jaas_mr.conf”文件和代码中的“principal”请根据实际环境修改。例如 test@FAA12CC3_0996_432F_9D6F_E18F6F9D7F43.COM。▪
未开启Kerberos认证集群略过此步骤。c. 在Linux环境中添加样例工程运行所需的classpath,例如(以客户端安装路径 为/opt/conf为例)
export YARN_USER_CLASSPATH=/opt/conf/:/opt/client/HBase/
hbase/lib/*:/opt/client/Hive/Beeline/lib/*
说明
▪
针对MRS 1.9.x版本集群,需要在执行上述命令前或者执行上述命令后执行 mv /opt/client/Hive/Beeline/lib/derby-10.10.2.0.jar derby-10.10.2.0.jar.bak 命令。▪
命令中使用的jar包请根据集群中对应路径下的实际版本修改。d. 提交MapReduce任务,执行如下命令,运行样例工程。运行样例工程前需要 根据实际环境修改认证信息。
yarn jar mapreduce-examples-1.0.jar
com.huawei.bigdata.mapreduce.examples.MultiComponentExample ----结束
5.4.2 查看调测结果
MapReduce应用程序运行完成后,可以通过WebUI查看应用程序运行情况,也可以通 过MapReduce日志获取应用运行情况。
● 通过MapReduce服务的WebUI进行查看
登录MRS Manager,单击“服务管理 > MapReduce > JobHistoryServer”进入 Web界面后查看任务执行状态。
图5-4 JobHistory Web UI 界面
● 通过YARN服务的WebUI进行查看
登录MRS Manager,单击“服务管理 > Yarn > ResourceManager(主)”进入 Web界面后查看任务执行状态。
图5-5 ResourceManager Web UI 页面
● 查看MapReduce应用运行结果数据。
– 当用户在Linux环境下执行
yarn jar mapreduce-example.jar
命令后,可以 通过执行结果显示正在执行的应用的运行情况。例如:yarn jar mapreduce-example.jar /tmp/mapred/example/input/ /tmp/root/output/1 16/07/12 17:07:16 INFO hdfs.PeerCache: SocketCache disabled.
16/07/12 17:07:17 INFO input.FileInputFormat: Total input files to process : 2 16/07/12 17:07:18 INFO mapreduce.JobSubmitter: number of splits:2 16/07/12 17:07:18 INFO mapreduce.JobSubmitter: Submitting tokens for job:
job_1468241424339_0006
16/07/12 17:07:18 INFO impl.YarnClientImpl: Submitted application application_1468241424339_0006
16/07/12 17:07:18 INFO mapreduce.Job: The url to track the job: http://10-120-180-170:26000/
proxy/application_1468241424339_0006/
16/07/12 17:07:18 INFO mapreduce.Job: Running job: job_1468241424339_0006
16/07/12 17:07:31 INFO mapreduce.Job: Job job_1468241424339_0006 running in uber mode : false
16/07/12 17:07:31 INFO mapreduce.Job: map 0% reduce 0%
16/07/12 17:07:41 INFO mapreduce.Job: map 50% reduce 0%
16/07/12 17:07:43 INFO mapreduce.Job: map 100% reduce 0%
16/07/12 17:07:51 INFO mapreduce.Job: map 100% reduce 100%
16/07/12 17:07:51 INFO mapreduce.Job: Job job_1468241424339_0006 completed successfully 16/07/12 17:07:51 INFO mapreduce.Job: Counters: 49
File System Counters
Reduce shuffle bytes=114
– 在Linux环境下执行
yarn application -status
<ApplicationId> ,可以通过执 行结果显示正在执行的应用的运行情况。例如:yarn application -status application_1468241424339_0006 Application Report :
Application-Id : application_1468241424339_0006 Application-Name : Collect Female Info
Tracking-URL : http://10-120-180-170:26012/jobhistory/job/job_1468241424339_0006 RPC Port : 27100
AM Host : 10-120-169-46
Aggregate Resource Allocation : 172153 MB-seconds, 64 vcore-seconds Log Aggregation Status : SUCCEEDED
Diagnostics : Application finished execution.
Application Node Label Expression : <Not set>
AM container Node Label Expression : <DEFAULT_PARTITION>
● 查看MapReduce日志获取应用运行情况。
您可以查看MapReduce日志了解应用运行情况,并根据日志信息调整应用程序。