• 沒有找到結果。

删除指定的行是在构造 Delete 对象的时候只传入行健

8.2 编程实例

• 8.2.1 使用 MapReduce 操作 HBase

• 代码参考: 8-2-1 中的 WordCountWriteToHBase.java

• 使用 HBase Shell 查看运行结果,如图 8-2 所示:

• 8.2.2 从 HBase 获取数据上传至 HDFS

将 8.2.1 生成的 HBase wordcount 表数据内容读取,并将其上传到 HDFS 中。 ReadHBaseMapper 中的 map 函数为:

map(ImmutableBytesWritable key,Result values,Context context);

该函数按行读取 HBase 表数据,其中 key 是行健, values 是单元格数据。

• 代码参考 8-2-2 中的 WordCountReadFromHBase.java

代码

代码

代码

8.2 编程实例

• 8.2.3 MapReduce 生成 HFile 入库到 HBase

如果我们一次性入库 HBase 巨量数据,处理速度慢不说,还特别占用 Region 资源,一个比较高效便捷的方法就是使用 Bulk Loading 方法,即 HBase 提供的 HFileOutputFormat 类。它是利用 HBase 的数据信息按照特定格式存储在 HDFS 内这一原理,直接生成这种 HDFS 内存储的数据 格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合 MapReduce 完成,高效便捷,而且不占用 Region 资源,增添负载。这 种方式适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况,并且 HBase 集群与 Hadoop 集群为同一集群,即 HBase 所基于 的 HDFS 为生成 HFile 的 MapReduce 的集群。

• 先通过 HBase shell 创建表 stu7

hbase(main):032:0> create 'stu7', 'info' 0 row(s) in 5.9200 seconds

=> Hbase::Table - stu7

• 代码参考 8-2-3 中的 HFileGenerator .java

• (1) 最终输出结果,无论是 Map 还是 Reduce ,输出部分 key 和 value 的类型必须是: <ImmutableBytesWritable, KeyValue> 或者 <

ImmutableBytesWritable, Put> 。

• (2) 最终输出部分, value 类型是 KeyValue 或 Put ,对应的 Sorter 分别是 KeyValueSortReducer 或 PutSortReducer 。

• (3)job.setOutputFormatClass(HFileOutputFormat.class) 中的 HFileOutputFormat 只适合对单列族组织成的 HFile 文件。

• (4)HFileOutputFormat.configureIncrementalLoad(job, table) 自动对 Job 进行配置。 SimpleTotalOrderPartitioner 是 需要先对 key 进行整体排序,然后划分到每个 Reduce 中,保证每一个 Reducer 中的的 key 最小最大值区间范围是不会有交集的。因为入库 到 HBase 的时候,作为一个整体的 Region , key 是绝对有序的。

8.2 编程实例

(5) 最后生成 HFile 存储在 HDFS 上,输出路径下的子目录是各个列族。如果对 HFile 进行入库 HBase ,相当于移动 HFile 到 HBase 的 Region 中,

HFile 子目录的列族内容没有了。

此时通过扫描 stu7 表,发现里面并没有数据,如图 8-6 所示

生成的 HFile 文件位于 /data/hbase/output3/info/f6f2ef7462c747a18835e6fd8f7dfe8b ,使用命令 hbase org.apache.hadoop.hbase .io.hfile.HFile -p –f 查看刚生成的 HFile 文件,如图 8-7 所示:

(6) HFile 入库到 HBase

代码参考: 8-2-3 HFileLoader.java

代码

代码

8.2 编程实例

• 8.2.4 同时写入多张表

通过 MapReduce 将读入的数据写入到多个表当中,数据如图 8-10 所示,第一列表示姓名,第二列表示语文成绩,第三列表示英语程序,第四列表示数 学成绩

代码参考: 8-2-4 MultipleTablesWriteToHBase.java

• 通过扫描 chinese , english 和 math 表

代码

代码

8.2 编程实例

• 8.2.5 从多个表读取数据

将 8.2.4 生成的 HBase 表中的数据读取出来,并将其写入到 HDFS 中。由于要读取多张表,因此要将 TableMapReduceUtil.initTableMapperJo b 函数中的第一个参数设置 List<Scan> 类型,并对每个 Scan 设置扫描条件和需要扫描的表

代码参考: 8-2-5 目录下 MultipleTablesReadFromHBase.java

结果如图:

代码

8.2 编程实例

• 8.2.6 通过读取 HBase 表删除 HBase 数据

通过 MapReduce 删除指定条件的数据,在 Mapper 的 map 函数中构造 Delete 对象,完成符合条件的删除,为了提高删除效率,我们可以构造一个 De lete 集合,在 map 执行最后完成批量删除。

代码参考: 8-2-6 目录下的 DropTableDemo.java

删除前

删除后

如果将 scan.setTimeStamp(new Long(timeStamp)) 去掉,表示删除所有数据

代码

代码

8.2 编程实例

• 8.2.7 通过读取 HBase 表数据复制到另外一张表

完成 HBase 表内容的复制,即在 Mapper 阶段每读取一行数据,就将结果装载到 Put 对象当中,并通过 write 函数将其写入到 HBase 表中。写入表的 一种方式是用 TableMapReduceUtil.initTableReducerJob 的方法,这里既可以在 Map 阶段输出,也能在 Reduce 阶段输出,区别是 Reduce 的 class 设置为 null 或者实际的 Reduce 。

• 首先在 HBase 中创建 stu8 表:

hbase(main):004:0> create 'stu8', 'info' 0 row(s) in 4.9200 seconds

=> Hbase::Table - stu8

• 代码参考 8-2-7 目录下 TableCopy.java

• 扫描 stu6 和 stu8 表结果如图 8-18 和 8-19 所示

代码

代码

8.2 编程实例

代码参考:目录 8-2-8 下 CreateHbaseIndex.java 结果效果

行健 列族 info

age name

rw001 Lucy 16

rw002 Linda 18

rw003 John 19

代码

代码

8.2 编程实例

• 8.2.9 将 MapReduce 输出结果到 MySQL

为了方便 MapReduce 直接访问关系型数据库 (MySQL , Oracle 等 ) , Hadoop 提供了 DBInputFormat 和 DBOutputFormat 两个类。通过 DBInp utFormat 类把数据库表数据读入到 HDFS ,然后根据 DBOutputFormat 类把 MapReduce 产生的结果集导入到数据库表中。

要将数据从 HBase 中导入 MySQL ,要实现五个类,分别是负责读取 HBase 数据的 Mapper 类,负责写入 MySQL 的 Reducer 类, Combine 类,数据 库信息读写的接口类,以及最后让程序运行起来的主类。

• 先来看下连接 MySQL 需要的操作

• 代码参考: 8-2-9 下的 WriteToMysql.java

• 在 slave2 节点执行查询结果如图 mysql> create database hbase;

Query OK, 1 row affected (0.05 sec) mysql> use hbase;

Database changed

mysql> create table stuInfo(name varchar(20), sex varchar(10), age int);

Query OK, 0 rows affected (1.00 sec)

mysql> CREATE USER hbase@localhost IDENTIFIED BY 'HBase123@';

Query OK, 0 rows affected (0.10 sec)

mysql> grant all on hbase.* to hbase@'%' identified by 'HBase123@';

Query OK, 0 rows affected, 1 warning (0.04 sec)

mysql> grant all on hbase.* to hbase@'localhost' identified by 'HBase123@';

Query OK, 0 rows affected, 1 warning (0.00 sec) mysql> flush privileges;

Query OK, 0 rows affected (0.04 sec)

代码

代码

8.2 编程实例

• 8.2.10 利用 MapReduce 完成 MySQL 数据读写

MapReduce 默认提供了 DBInputFormat 和 DBOutputFormat ,分别用于数据库的读取和数据库的写入。通过 DBInputFormat.setInput 设置从 哪个表哪些字段读取数据,在从 MySQL 读取数据的时候,以主键 ID 作为 Mapper 阶段的 key ,整条记录加分隔符作为 value 进行输出;在 Reduce 阶段则类似于 8.2.9 中的读。

• 首先在 slave2 节点执行如下 SQL 命令

• 代码参考 8-2-10 下的 MysqlToMR.java

• 执行结果

create table stu2(id int ,name varchar(20), sex varchar(10), age int);

create table stu(id int ,name varchar(20), sex varchar(10), age int);

insert into stu values(1,"lucy", "female", 16);

insert into stu values(2,"john", "male", 18);

insert into stu values(3,"llinda", "female", 17);

insert into stu values(4,"smith", "male", 20);

insert into stu values(5,"ming", "male", 11);

insert into stu values(6,"niki", "male", 13);

代码

代码

代码

代码

代码

相關文件