• 沒有找到結果。

安全认证代码

5.3.2 MapReduce 访问多组件样例程序

场景说明

该样例以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业 访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。

该样例逻辑过程如下。

以HDFS文本文件为输入数据 log1.txt:数据输入文件

YuanJing,male,10 GuoYijun,male,5

Map阶段

1. 获取输入数据的一行并提取姓名信息。

2. 查询HBase一条数据。

3. 查询Hive一条数据。

4. 将HBase查询结果与Hive查询结果进行拼接作为Map输出。

Reduce阶段

1. 获取Map输出中的最后一条数据。

2. 将数据输出到HBase。

3. 将数据保存到HDFS。

数据规划

1. 创建HDFS数据文件。

a. 在Linux系统上新建文本文件,将log1.txt中的内容复制保存到data.txt。

b. 在HDFS上创建一个文件夹,“/tmp/examples/multi-components/

mapreduce/input/”,并上传data.txt到此目录,命令如下。

i. 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir -p /tmp/examples/

multi-components/mapreduce/input/

ii. 在Linux系统HDFS客户端使用命令hdfs dfs -put data.txt /tmp/

examples/multi-components/mapreduce/input/

2. 创建HBase表并插入数据。

a. 在Linux系统HBase客户端使用命令hbase shell。

b. 在HBase shell交互窗口创建数据表table1,该表有一个列族cf,使用命令 create 'table1', 'cf'。

c. 插入一条rowkey为1、列名为cid、数据值为123的数据,使用命令put 'table1', '1', 'cf:cid', '123'。

d. 执行命令quit退出。

3. 创建Hive表并载入数据。

a. 在Linux系统Hive客户端使用命令beeline。

b. 在Hive beeline交互窗口创建数据表person,该表有3个字段:name/gender/

stayTime,使用命令CREATE TABLE person(name STRING, gender

STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;。

c. 在Hive beeline交互窗口加载数据文件,LOAD DATA INPATH '/tmp/

examples/multi-components/mapreduce/input/' OVERWRITE INTO TABLE person;。

● main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集 群。

代码样例

下面代码片段仅为演示,具体代码请参见

com.huawei.bigdata.mapreduce.examples.MultiComponentExample类 样例1:类MultiComponentMapper定义Mapper抽象类的map方法。

private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> { Configuration conf;

@Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

setJaasInfo("krb5.conf", "jaas.conf");

LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);

LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);

String zkQuorum = clientInfo.getProperty("zk.quorum");

String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");

String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");

// 创建Hive鉴权信息 // Read this carefully:

// MapReduce can only use Hive through JDBC.

// Hive will submit another MapReduce Job to execute query.

// So we run Hive in MapReduce is not recommended.

final String driver = "org.apache.hive.jdbc.HiveDriver";

String sql = "select name,sum(stayTime) as "

+ "stayTime from person where name = ? group by name";

StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");

// in map or reduce, use 'auth=delegationToken'

hbaseConn = ConnectionFactory.createConnection(conf);

// get table

table = hbaseConn.getTable(TableName.valueOf(HBASE_TABLE_NAME));

} catch (IOException e) {

context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData));

}

Configuration hbaseConfig = HBaseConfiguration.create(conf);

org.apache.hadoop.hbase.client.Connection conn = null;

try {

// 创建一个HBase Get请求实例

Get get = new Get(hbaseKey.getBytes());

// 提交Get请求

Result result = table.get(get);

hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes()));

return hbaseValue;

private int readHive(String name) {

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

Text finalValue = new Text("");

setJaasInfo("krb5.conf", "jaas.conf");

LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, "test", KEYTAB);

LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);

table = conn.getTable(TableName.valueOf(HBASE_TABLE_NAME));

} catch (IOException e) {

writeHBase(key.toString(), finalValue.toString());

// 将结果保存到HDFS context.write(key, finalValue);

}

样例5:结果输出到HBase的writeHBase方法。

private void writeHBase(String rowKey, String data) {

public static void main(String[] args) throws Exception {

// 清理所需目录

MultiComponentExample.cleanupBeforeRun();

// 查找Hive依赖jar包

Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver");

Class thriftClass = Class.forName("org.apache.thrift.TException");

Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.TCLIService");

Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf");

Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport");

Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException");

Class hiveShimClass = Class.forName("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge23");

// 添加Hive运行依赖到Job JarFinderUtil

.addDependencyJars(config, hiveDriverClass, thriftCLIClass, thriftClass, hiveConfClass, hiveTransClass, hiveMetaClass, hiveShimClass);

//开启Kerberos认证的安全集群登录

if("kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication"))){

//security mode

LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, PRINCIPAL, KEYTAB);

LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);

System.setProperty("java.security.krb5.conf", KRB);

LoginUtil.login(PRINCIPAL, KEYTAB, KRB, config);

}

// 添加Hive配置文件

config.addResource("hive-site.xml");

// 添加HBase配置文件

Configuration conf = HBaseConfiguration.create(config);

// 实例化Job

Job job = Job.getInstance(conf);

job.setJarByClass(MultiComponentExample.class);

// 设置mapper&reducer类

job.setMapperClass(MultiComponentMapper.class);

job.setReducerClass(MultiComponentReducer.class);

//设置Job输入输出路径

FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt"));

FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));

// 设置输出键值类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

// HBase提供工具类添加HBase运行依赖到Job TableMapReduceUtil.addDependencyJars(job);

// 安全模式下必须要执行这个操作

// HBase添加鉴权信息到Job,map或reduce任务将会使用此处的鉴权信息 TableMapReduceUtil.initCredentials(job);

// 创建Hive鉴权信息 Properties clientInfo = null;

InputStream fileInputStream = null;

try {

clientInfo = new Properties();

File propertiesFile = new File(hiveClientProperties);

fileInputStream = new FileInputStream(propertiesFile);

clientInfo.load(fileInputStream);

String zkQuorum = clientInfo.getProperty("zk.quorum");//zookeeper节点ip和端口列表 String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");

String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");

String principal = clientInfo.getProperty("principal");

String auth = clientInfo.getProperty("auth");

String sasl_qop = clientInfo.getProperty("sasl.qop");

StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");

sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=

")

.append(zooKeeperNamespace) .append(";sasl.qop=")

.append(sasl_qop) .append(";auth=") .append(auth) .append(";principal=") .append(principal) .append(";");

String url = sBuilder.toString();

Connection connection = DriverManager.getConnection(url, "", "");

String tokenStr = ((HiveConnection) connection)

.getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(), PRINCIPAL);

connection.close();

Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>();

hive2Token.decodeFromUrlString(tokenStr);

// 添加Hive鉴权信息到Job

job.getCredentials().addToken(new Text("hive.server2.delegation.token"), hive2Token);

job.getCredentials().addToken(new Text(HiveAuthFactory.HS2_CLIENT_TOKEN), hive2Token);

// 提交作业

System.exit(job.waitForCompletion(true) ? 0 : 1);

} 说明

样例中所有zkQuorum对象需替换为实际ZooKeeper集群节点信息。

5.4 调测程序

5.4.1 编译并运行程序

在程序代码完成开发后,可以在Linux环境中运行应用。

说明

MapReduce应用程序只支持在Linux环境下运行,不支持在Windows环境下运行。

操作步骤

步骤1 生成MapReduce应用可执行包。

执行mvn package打出jar包,在工程目录target目录下获取,比如“mapreduce-examples-1.0.jar”。

步骤2 上传生成的应用包“mapreduce-examples-1.0.jar”到Linux客户端上。例如“/opt”

目录。

步骤3 如果集群开启Kerberos,参考5.2.2-准备开发用户获得的user.keytab”、

“krb5.conf”文件需要在Linux环境上创建文件夹保存这些配置文件,例如“/opt/

conf”。并在linux环境上,在客户端路径下(/opt/client/HDFS/hadoop/etc/

hadoop/)获得core-site.xml、hdfs-site.xml文件放入上述文件夹里。

步骤4 样例程序如果指定OBS为输入输出的目标文件系统(如obs://<BucketName>/

input/),需要进行以下配置。

在$YARN_CONF_DIR/core-site.xml中添加AK配置项“fs.obs.access.key”和SK配置项

“fs.obs.secret.key”,AK/SK可登录“OBS控制台”,进入“我的凭证”页面获取。

<property>

<name>fs.obs.access.key</name>

<value>xxxxxxxxxxxxxxxx</value>

yarn jar mapreduce-examples-1.0.jar

com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector <inputPath>

<outputPath>

此命令包含了设置参数和提交job的操作,其中<inputPath>指HDFS文件系统中input 的路径,<outputPath>指HDFS文件系统中output的路径。

说明

● 在执行yarn jar mapreduce-examples-1.0.jar

com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector <inputPath>

<outputPath>命令之前,需要把log1.txt和log2.txt这两个文件上传到HDFS的<inputPath>目

录下。

● 在执行yarn jar mapreduce-examples-1.0.jar

com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector <inputPath>

<outputPath>命令之前,<outputPath>目录必须不存在,否则会报错。

● mapreduce-examples-1.0.jar适用于MRS 1.x版本。

● 在MapReduce任务运行过程中禁止重启HDFS服务,否则可能会导致任务失败。

storeKey=true debug=true;

};

说明

文件内容中的[email protected]为示例,实际操作时请做相应修改。

“jaas_mr.conf”文件和代码中的“principal”请根据实际环境修改。例如 test@FAA12CC3_0996_432F_9D6F_E18F6F9D7F43.COM。

未开启Kerberos认证集群略过此步骤。

c. 在Linux环境中添加样例工程运行所需的classpath,例如(以客户端安装路径 为/opt/conf为例)

export YARN_USER_CLASSPATH=/opt/conf/:/opt/client/HBase/

hbase/lib/*:/opt/client/Hive/Beeline/lib/*

说明

针对MRS 1.9.x版本集群,需要在执行上述命令前或者执行上述命令后执行 mv /opt/client/Hive/Beeline/lib/derby-10.10.2.0.jar derby-10.10.2.0.jar.bak 命令。

命令中使用的jar包请根据集群中对应路径下的实际版本修改。

d. 提交MapReduce任务,执行如下命令,运行样例工程。运行样例工程前需要 根据实际环境修改认证信息。

yarn jar mapreduce-examples-1.0.jar

com.huawei.bigdata.mapreduce.examples.MultiComponentExample ----结束

5.4.2 查看调测结果

MapReduce应用程序运行完成后,可以通过WebUI查看应用程序运行情况,也可以通 过MapReduce日志获取应用运行情况。

● 通过MapReduce服务的WebUI进行查看

登录MRS Manager,单击“服务管理 > MapReduce > JobHistoryServer”进入 Web界面后查看任务执行状态。

5-4 JobHistory Web UI 界面

● 通过YARN服务的WebUI进行查看

登录MRS Manager,单击“服务管理 > Yarn > ResourceManager(主)”进入 Web界面后查看任务执行状态。

5-5 ResourceManager Web UI 页面

● 查看MapReduce应用运行结果数据。

– 当用户在Linux环境下执行

yarn jar mapreduce-example.jar

命令后,可以 通过执行结果显示正在执行的应用的运行情况。例如:

yarn jar mapreduce-example.jar /tmp/mapred/example/input/ /tmp/root/output/1 16/07/12 17:07:16 INFO hdfs.PeerCache: SocketCache disabled.

16/07/12 17:07:17 INFO input.FileInputFormat: Total input files to process : 2 16/07/12 17:07:18 INFO mapreduce.JobSubmitter: number of splits:2 16/07/12 17:07:18 INFO mapreduce.JobSubmitter: Submitting tokens for job:

job_1468241424339_0006

16/07/12 17:07:18 INFO impl.YarnClientImpl: Submitted application application_1468241424339_0006

16/07/12 17:07:18 INFO mapreduce.Job: The url to track the job: http://10-120-180-170:26000/

proxy/application_1468241424339_0006/

16/07/12 17:07:18 INFO mapreduce.Job: Running job: job_1468241424339_0006

16/07/12 17:07:31 INFO mapreduce.Job: Job job_1468241424339_0006 running in uber mode : false

16/07/12 17:07:31 INFO mapreduce.Job: map 0% reduce 0%

16/07/12 17:07:41 INFO mapreduce.Job: map 50% reduce 0%

16/07/12 17:07:43 INFO mapreduce.Job: map 100% reduce 0%

16/07/12 17:07:51 INFO mapreduce.Job: map 100% reduce 100%

16/07/12 17:07:51 INFO mapreduce.Job: Job job_1468241424339_0006 completed successfully 16/07/12 17:07:51 INFO mapreduce.Job: Counters: 49

File System Counters

Reduce shuffle bytes=114

– 在Linux环境下执行

yarn application -status

<ApplicationId> ,可以通过执 行结果显示正在执行的应用的运行情况。例如:

yarn application -status application_1468241424339_0006 Application Report :

Application-Id : application_1468241424339_0006 Application-Name : Collect Female Info

Tracking-URL : http://10-120-180-170:26012/jobhistory/job/job_1468241424339_0006 RPC Port : 27100

AM Host : 10-120-169-46

Aggregate Resource Allocation : 172153 MB-seconds, 64 vcore-seconds Log Aggregation Status : SUCCEEDED

Diagnostics : Application finished execution.

Application Node Label Expression : <Not set>

AM container Node Label Expression : <DEFAULT_PARTITION>

● 查看MapReduce日志获取应用运行情况。

您可以查看MapReduce日志了解应用运行情况,并根据日志信息调整应用程序。