• 沒有找到結果。

应用开发简介_MapReduce服务 MRS_开发指南(适用于2.x及之前)_OpenTSDB应用开发_概述_华为云

N/A
N/A
Protected

Academic year: 2022

Share "应用开发简介_MapReduce服务 MRS_开发指南(适用于2.x及之前)_OpenTSDB应用开发_概述_华为云"

Copied!
603
0
0

加載中.... (立即查看全文)

全文

(1)

开发指南

文档版本 16

发布日期 2021-07-27

(2)

非经本公司书面许可,任何单位和个人不得擅自摘抄、复制本文档内容的部分或全部,并不得以任何形式传 播。

商标声明

和其他华为商标均为华为技术有限公司的商标。

本文档提及的其他所有商标或注册商标,由各自的所有人拥有。

注意

您购买的产品、服务或特性等应受华为公司商业合同和条款的约束,本文档中描述的全部或部分产品、服务或 特性可能不在您的购买或使用范围之内。除非合同另有约定,华为公司对本文档内容不做任何明示或暗示的声 明或保证。

由于产品版本升级或其他原因,本文档内容会不定期进行更新。除非另有约定,本文档仅作为使用指导,本文 档中的所有陈述、信息和建议不构成任何明示或暗示的担保。

(3)

目 录

1 简介...1

2 MapReduce 服务样例工程构建方式...2

3 HBase 应用开发... 5

3.1 概述... 5

3.1.1 应用开发简介...5

3.1.2 常用概念... 6

3.1.3 开发流程... 6

3.2 环境准备...8

3.2.1 开发和运行环境简介... 8

3.2.2 准备开发用户...9

3.2.3 配置并导入样例工程... 11

3.3 开发程序... 14

3.3.1 典型场景开发思路... 14

3.3.2 创建 Configuration... 16

3.3.3 创建 Connection... 17

3.3.4 创建表... 17

3.3.5 删除表... 19

3.3.6 修改表... 20

3.3.7 插入数据... 21

3.3.8 删除数据... 23

3.3.9 使用 Get 读取数据... 24

3.3.10 使用 Scan 读取数据... 25

3.3.11 使用过滤器 Filter... 26

3.3.12 添加二级索引...27

3.3.13 启用/禁用二级索引... 29

3.3.14 查询二级索引列表... 31

3.3.15 使用二级索引读取数据... 31

3.3.16 删除二级索引...32

3.3.17 写 MOB 表... 34

3.3.18 读 MOB 数据... 36

3.3.19 Region 的多点分割... 37

3.3.20 ACL 安全配置...38

(4)

3.4 调测程序... 39

3.4.1 在 Windows 中调测程序... 39

3.4.1.1 编译并运行程序... 39

3.4.1.2 查看调测结果...42

3.4.2 在 Linux 中调测程序... 43

3.4.2.1 安装客户端时编译并运行程序... 43

3.4.2.2 未安装客户端时编译并运行程序... 44

3.4.2.3 查看调测结果...45

3.4.3 HBase Phoenix 样例代码调测...46

3.4.4 HBase python 样例代码调测... 48

3.5 更多信息... 50

3.5.1 SQL 查询... 50

3.5.2 配置 HBase 文件存储... 51

3.5.3 HFS 的 JAVA API... 52

3.6 HBase 接口... 54

3.6.1 Shell...55

3.6.2 Java API... 56

3.6.3 Phoenix...60

3.6.4 REST... 64

3.7 FAQ... 67

3.7.1 运行 HBase 应用开发程序产生异常... 67

3.7.2 bulkload 和 put 应用场景... 67

3.8 开发规范... 68

3.8.1 规则... 68

3.8.2 建议... 73

3.8.3 示例... 75

3.8.4 附录... 80

4 Hive 应用开发... 82

4.1 概述... 82

4.1.1 应用开发简介... 82

4.1.2 常用概念... 82

4.1.3 开发流程... 83

4.2 环境准备... 84

4.2.1 开发环境简介... 84

4.2.2 准备环境... 86

4.2.3 准备开发用户... 86

4.2.4 准备 JDBC 客户端开发环境... 88

4.2.5 准备 HCatalog 开发环境... 90

4.3 开发程序... 92

4.3.1 典型场景说明... 92

4.3.2 创建表... 94

4.3.3 数据加载... 96

(5)

4.3.4 数据查询... 96

4.3.5 用户自定义函数...97

4.3.6 样例程序指导... 99

4.4 调测程序... 103

4.4.1 在 Windows 中调测程序... 103

4.4.1.1 JDBC 客户端运行及结果查询... 103

4.4.2 在 Linux 中调测程序... 104

4.4.2.1 JDBC 客户端运行及结果查看... 105

4.4.2.2 HCatalog 运行及结果查看... 105

4.5 Hive 接口...107

4.5.1 JDBC... 107

4.5.2 HiveQL... 107

4.5.3 WebHCat... 107

4.6 开发规范... 134

4.6.1 规则... 134

4.6.2 建议... 138

4.6.3 示例... 140

5 MapReduce 应用开发...149

5.1 概述... 149

5.1.1 MapReduce 简介... 149

5.1.2 常用概念... 149

5.1.3 开发流程... 150

5.2 环境准备... 152

5.2.1 开发环境简介...152

5.2.2 准备开发用户...152

5.2.3 准备 Eclipse 与 JDK... 153

5.2.4 准备 Linux 客户端运行环境... 153

5.2.5 获取并导入样例工程...154

5.2.6 准备 kerberos 认证... 155

5.3 开发程序... 156

5.3.1 MapReduce 统计样例程序... 156

5.3.2 MapReduce 访问多组件样例程序... 161

5.4 调测程序... 167

5.4.1 编译并运行程序... 167

5.4.2 查看调测结果...169

5.5 MapReduce 接口... 171

5.5.1 Java API... 171

5.6 FAQ... 173

5.6.1 提交 MapReduce 任务时客户端长时间无响应...174

5.7 开发规范... 174

5.7.1 规则... 174

5.7.2 建议... 176

(6)

5.7.3 示例... 177

6 HDFS 应用开发... 180

6.1 概述... 180

6.1.1 HDFS 简介... 180

6.1.2 常用概念... 180

6.1.3 开发流程... 181

6.2 环境准备... 183

6.2.1 开发环境简介...183

6.2.2 准备开发用户...183

6.2.3 准备 Eclipse 与 JDK... 184

6.2.4 准备 Linux 客户端运行环境... 185

6.2.5 获取并导入样例工程...186

6.3 开发程序... 187

6.3.1 场景及开发思路... 187

6.3.2 HDFS 初始化...188

6.3.3 写文件...191

6.3.4 追加文件内容...191

6.3.5 读文件...192

6.3.6 删除文件... 193

6.3.7 Colocation...193

6.3.8 设置存储策略...196

6.3.9 访问 OBS...197

6.4 调测程序... 197

6.4.1 在 Linux 中调测程序... 197

6.4.1.1 安装客户端时编译并运行程序... 198

6.4.1.2 查看调测结果... 199

6.5 HDFS 接口... 199

6.5.1 Java API... 200

6.5.2 C API... 203

6.5.3 HTTP REST API... 209

6.5.4 Shell 命令介绍...220

6.6 开发规范... 221

6.6.1 规则... 221

6.6.2 建议... 225

7 Spark 应用开发...227

7.1 概述... 227

7.1.1 Spark 应用开发简介... 227

7.1.2 常用概念... 228

7.1.3 开发流程... 233

7.2 环境准备... 235

7.2.1 环境简介... 235

7.2.2 准备开发用户...236

(7)

7.2.3 准备 Java 开发环境... 237

7.2.4 准备 Scala 开发环境... 241

7.2.5 准备 Python 开发环境...248

7.2.6 准备运行环境...248

7.2.7 下载并导入样例工程...249

7.2.8 新建工程(可选)... 256

7.2.9 准备认证机制代码... 258

7.3 开发程序... 261

7.3.1 Spark Core 程序... 261

7.3.1.1 场景说明... 261

7.3.1.2 Java 样例代码... 263

7.3.1.3 Scala 样例代码...264

7.3.1.4 Python 样例代码... 265

7.3.2 Spark SQL 程序... 265

7.3.2.1 场景说明... 265

7.3.2.2 Java 样例代码... 267

7.3.2.3 Scala 样例代码...268

7.3.3 Spark Streaming 程序... 268

7.3.3.1 场景说明... 268

7.3.3.2 Java 样例代码... 270

7.3.3.3 Scala 样例代码...272

7.3.4 通过 JDBC 访问 Spark SQL 的程序...274

7.3.4.1 场景说明... 274

7.3.4.2 Java 样例代码... 275

7.3.4.3 Scala 样例代码...276

7.3.4.4 Python 样例代码... 278

7.3.5 Spark on HBase 程序...279

7.3.5.1 场景说明... 279

7.3.5.2 Java 样例代码... 280

7.3.5.3 Scala 样例代码...282

7.3.6 从 HBase 读取数据再写入 HBase... 284

7.3.6.1 场景说明... 284

7.3.6.2 Java 样例代码... 285

7.3.6.3 Scala 样例代码...287

7.3.7 从 Hive 读取数据再写入 HBase...289

7.3.7.1 场景说明... 289

7.3.7.2 Java 样例代码... 291

7.3.7.3 Scala 样例代码...292

7.3.8 Streaming 从 Kafka 读取数据再写入 HBase...294

7.3.8.1 场景说明... 294

7.3.8.2 Java 样例代码... 295

7.3.8.3 Scala 样例代码...297

(8)

7.3.9 Spark Streaming 对接 kafka0-10 程序...299

7.3.9.1 场景说明... 299

7.3.9.2 Java 样例代码... 301

7.3.9.3 Scala 样例代码...303

7.3.10 Structured Streaming 程序... 306

7.3.10.1 场景说明... 306

7.3.10.2 Java 样例代码... 307

7.3.10.3 Scala 样例代码... 308

7.4 调测程序... 309

7.4.1 编包并运行程序... 309

7.4.2 查看调测结果...321

7.5 调优程序... 322

7.5.1 Spark Core 调优... 322

7.5.1.1 数据序列化...322

7.5.1.2 配置内存... 323

7.5.1.3 设置并行度...323

7.5.1.4 使用广播变量... 324

7.5.1.5 使用 External Shuffle Service 提升性能...324

7.5.1.6 Yarn 模式下动态资源调度... 325

7.5.1.7 配置进程参数... 327

7.5.1.8 设计 DAG... 328

7.5.1.9 经验总结... 330

7.5.2 SQL 和 DataFrame 调优...331

7.5.2.1 Spark SQL join 优化... 332

7.5.2.2 INSERT...SELECT 操作调优...333

7.5.3 Spark Streaming 调优... 333

7.5.4 Spark CBO 调优... 335

7.6 Spark 接口... 336

7.6.1 Java... 336

7.6.2 Scala... 341

7.6.3 Python... 345

7.6.4 REST API... 349

7.6.5 ThriftServer 接口介绍... 355

7.6.6 常用命令介绍...357

7.7 FAQ... 359

7.7.1 如何添加自定义代码的依赖包... 359

7.7.2 如何处理自动加载的依赖包...361

7.7.3 运行 SparkStreamingKafka 样例工程时报“类不存在”问题...361

7.7.4 执行 Spark Core 应用,尝试收集大量数据到 Driver 端,当 Driver 端内存不足时,应用挂起不退出.... 362

7.7.5 Spark 应用名在使用 yarn-cluster 模式提交时不生效... 364

7.7.6 如何采用 Java 命令提交 Spark 应用... 364

7.7.7 SparkSQL UDF 功能的权限控制机制...366

(9)

7.7.8 由于 kafka 配置的限制,导致 Spark Streaming 应用运行失败... 366

7.7.9 如何使用 IDEA 远程调试... 367

7.7.10 使用 IBM JDK 产生异常,提示“Problem performing GSS wrap”信息... 370

7.7.11 Structured Streaming 的 cluster 模式,在数据处理过程中终止 ApplicationManager,应用失败... 370

7.7.12 Spark on Yarn 的 client 模式下 spark-submit 提交任务出现 FileNotFoundException 异常... 371

7.7.13 Spark 任务读取 HBase 报错“had a not serializable result”... 372

7.7.14 本地运行 Spark 程序连接 MRS 集群的 Hive、HDFS...373

7.8 开发规范... 374

7.8.1 规则... 374

7.8.2 建议... 377

8 Storm 应用开发... 381

8.1 概述... 381

8.1.1 应用开发简介...381

8.1.2 常用概念... 381

8.1.3 开发流程... 382

8.2 Linux 客户端环境准备... 383

8.3 Windows 开发环境准备... 384

8.3.1 开发环境简介...384

8.3.2 准备 Eclipse 与 JDK... 385

8.3.3 配置并导入工程... 385

8.4 开发程序... 387

8.4.1 典型场景说明...387

8.4.2 创建 Spout... 388

8.4.3 创建 Bolt... 388

8.4.4 创建 Topology... 389

8.5 运行应用... 391

8.5.1 生成示例 Jar 包... 391

8.5.2 Linux 中安装客户端时提交拓扑... 391

8.5.3 查看结果... 392

8.6 更多信息... 393

8.6.1 Storm-Kafka 开发指引... 393

8.6.2 Storm-JDBC 开发指引... 395

8.6.3 Storm-HDFS 开发指引...397

8.6.4 Storm-OBS 开发指引... 400

8.6.5 Storm-HBase 开发指引... 401

8.6.6 Flux 开发指引... 404

8.6.7 对外接口... 409

8.7 开发规范... 409

8.7.1 规则... 410

8.7.2 建议... 410

9 Kafka 应用开发...411

9.1 概述... 411

(10)

9.1.1 应用开发简介...411

9.1.2 常用概念... 411

9.1.3 开发流程... 412

9.2 环境准备... 413

9.2.1 开发环境简介...413

9.2.2 准备 Maven 和 JDK... 414

9.2.3 导入样例工程...414

9.2.4 准备安全认证...415

9.3 开发程序... 416

9.3.1 典型场景说明...416

9.3.2 Old Producer API 使用样例... 417

9.3.3 Old Consumer API 使用样例... 417

9.3.4 Producer API 使用样例... 418

9.3.5 Consumer API 使用样例... 419

9.3.6 多线程 Producer API 使用样例... 420

9.3.7 多线程 Consumer API 使用样例... 421

9.3.8 SimpleConsumer API 使用样例... 422

9.3.9 样例工程配置文件说明... 423

9.4 调测程序... 425

9.4.1 在 Linux 中调测程序... 425

9.5 Kafka 接口... 426

9.5.1 Shell 命令... 427

9.5.2 Java API... 428

9.5.3 安全接口说明...428

9.6 FAQ... 428

9.6.1 已经拥有 Topic 访问权限,但是运行 Producer.java 样例运行获取元数据失败“ERROR fetching topic metadata for topics...”的解决办法... 428

9.7 开发规范... 428

9.7.1 规则... 429

9.7.2 建议... 429

10 Presto 应用开发...430

10.1 概述... 430

10.1.1 应用开发简介... 430

10.1.2 常用概念... 430

10.1.3 开发流程... 430

10.2 环境准备... 431

10.2.1 开发环境简介... 432

10.2.2 准备环境... 433

10.2.3 准备开发用户... 434

10.2.4 准备 JDBC 客户端开发环境... 435

10.2.5 准备 HCatalog 开发环境...436

10.3 开发程序... 437

(11)

10.3.1 典型场景说明... 438

10.3.2 样例代码说明... 439

10.4 调测程序... 440

10.4.1 在 Windows 中调测程序... 441

10.4.2 在 Linux 中调测程序...442

10.5 Presto 接口...443

10.6 FAQ... 443

10.6.1 在集群外节点运行 PrestoJDBCExample 缺少证书... 444

10.6.2 在集群外节点连接开启 Kerberos 认证的集群,HTTP 在 Kerberos 数据库中无法找到相应的记录...446

11 OpenTSDB 应用开发... 450

11.1 概述... 450

11.1.1 应用开发简介... 450

11.1.2 常用概念... 450

11.1.3 开发流程... 451

11.2 环境准备... 453

11.2.1 开发环境简介... 453

11.2.2 准备环境... 453

11.2.3 准备开发用户... 454

11.2.4 配置并导入样例工程... 457

11.3 开发程序... 458

11.3.1 典型场景开发思路...458

11.3.2 配置参数... 463

11.3.3 写入数据... 464

11.3.4 查询数据... 465

11.3.5 删除数据... 466

11.4 调测程序... 467

11.4.1 在 Windows 中调测程序... 467

11.4.1.1 编译并运行程序... 468

11.4.1.2 查看调测结果... 469

11.4.2 在 Linux 中调测程序...469

11.4.2.1 编译并运行程序... 470

11.4.2.2 查看调测结果... 470

11.5 OpenTSDB 接口...471

11.5.1 CLI Tools...471

11.5.2 HTTP API...473

12 Flink 应用开发... 475

12.1 概述... 475

12.1.1 应用开发简介... 475

12.1.2 常用概念... 476

12.1.3 开发流程... 476

12.2 环境准备... 478

12.2.1 开发和运行环境简介... 478

(12)

12.2.2 准备开发用户... 479

12.2.3 安装客户端...480

12.2.4 配置并导入样例工程... 481

12.2.5 新建工程(可选)...505

12.2.6 准备安全认证... 507

12.3 开发程序... 512

12.3.1 DataStream 程序... 512

12.3.1.1 场景说明... 512

12.3.1.2 Java 样例代码... 513

12.3.1.3 Scala 样例代码... 515

12.3.2 向 Kafka 生产并消费数据程序... 516

12.3.2.1 场景说明... 516

12.3.2.2 Java 样例代码... 518

12.3.2.3 Scala 样例代码... 519

12.3.3 异步 Checkpoint 机制程序... 521

12.3.3.1 场景说明... 521

12.3.3.2 Java 样例代码... 521

12.3.3.3 Scala 样例代码... 524

12.3.4 Stream SQL Join 程序... 526

12.3.4.1 场景说明... 526

12.3.4.2 Java 样例代码... 527

12.4 调测程序... 530

12.4.1 编译并运行程序... 530

12.4.2 查看调测结果... 537

12.5 性能调优... 538

12.6 更多信息... 541

12.6.1 Savepoints CLI 介绍... 541

12.6.2 Flink Client CLI 介绍... 543

12.7 FAQ... 544

12.7.1 Savepoints 相关问题解决方案... 544

12.7.2 如何处理 checkpoint 设置 RocksDBStateBackend 方式,且当数据量大时,执行 checkpoint 会很慢的 问题?... 545

12.7.3 如何处理 blob.storage.directory 配置/home 目录时,启动 yarn-session 失败的问题?... 547

12.7.4 为什么非 static 的 KafkaPartitioner 类对象去构造 FlinkKafkaProducer010,运行时会报错?... 548

12.7.5 为什么新创建的 Flink 用户提交任务失败,报 ZooKeeper 文件目录权限不足?...549

12.7.6 为什么 Flink Web 页面无法直接连接?... 549

13 Impala 应用开发... 550

13.1 概述... 550

13.1.1 应用开发简介... 550

13.1.2 常用概念... 551

13.1.3 开发流程... 551

13.2 环境准备... 552

(13)

13.2.1 开发环境简介... 552

13.2.2 准备环境... 553

13.2.3 准备开发用户... 554

13.2.4 准备 JDBC 客户端开发环境... 555

13.3 开发程序... 557

13.3.1 典型场景说明... 557

13.3.2 创建表... 559

13.3.3 数据加载... 560

13.3.4 数据查询... 561

13.3.5 用户自定义函数... 562

13.3.6 样例程序指导... 563

13.4 调测程序... 564

13.4.1 在 Windows 中调测程序... 564

13.4.1.1 JDBC 客户端运行及结果查询... 565

13.4.2 在 Linux 中调测程序...566

13.4.2.1 JDBC 客户端运行及结果查看... 566

13.5 Impala 接口... 566

13.5.1 JDBC... 566

13.5.2 Impala SQL... 566

13.6 开发规范... 566

13.6.1 规则...567

13.6.2 建议...568

13.6.3 示例...569

14 Alluxio 应用开发... 572

14.1 概述... 572

14.1.1 应用开发简介... 572

14.1.2 常用概念... 572

14.1.3 开发流程... 573

14.2 环境准备... 575

14.2.1 开发环境简介... 575

14.2.2 准备环境... 575

14.2.3 获取并导入样例工程... 576

14.3 开发程序... 577

14.3.1 场景说明... 577

14.3.2 Alluxio 初始化...578

14.3.3 写文件... 578

14.3.4 读文件... 579

14.4 调测程序... 579

14.5 Alluxio 接口... 580

15 附录... 581

15.1 登录 MRS Manager... 581

15.2 下载 MRS 客户端... 585

(14)

15.3 修订记录... 587

(15)

1 简介

概述

本文档提供Hadoop、Spark、HBase、Hive、MapReduce、Kafka、Storm大数据组件 的应用开发的流程和操作指导,以及各个大数据组件的样例工程,同时提供了常见的 问题解答和开发规范。

开发准备

● 您已经开通了华为云MapReduce服务。

● 您已经对Hadoop、Spark、HBase、Hive、MapReduce、Kafka、Storm大数据组 件具备一定的认识。

● 您已经对Java语法具备一定的认识。

● 您已经对华为云弹性云服务器的使用方式和MapReduce服务开发组件有一定的了 解。

● 您已经对Maven构建方式具备一定的认识和使用方法有一定了解。

(16)

2 MapReduce 服务样例工程构建方式

构建流程

MapReduce服务样例工程构建流程包括三个主要步骤:

1. 下载样例工程的Maven工程源码和配置文件,请参见样例工程获取地址。

2. 配置华为云镜像站中SDK的Maven镜像仓库,请参见华为云开源镜像配置方式。

3. 根据用户自身需求,按照各个组件“环境准备”章节构建完整的Maven工程。

样例工程获取地址

● 华为云MRS服务1.8之前版本的样例工程下载地址为:http://

mapreduceservice.obs-website.cn-north-1.myhuaweicloud.com/。

● 华为云MRS服务1.8.x版本的样例工程下载地址为:https://github.com/

huaweicloud/huaweicloud-mrs-example/tree/mrs-1.8。

● 华为云MRS服务1.9.x版本的样例工程下载地址为:https://github.com/

huaweicloud/huaweicloud-mrs-example/tree/mrs-1.9。

● 华为云MRS服务2.1.x版本的样例工程下载地址为:https://github.com/

huaweicloud/huaweicloud-mrs-example/tree/mrs-2.1。

2-1 样例代码下载

华为云开源镜像配置方式

华为云提供开源镜像站(网址为https://mirrors.huaweicloud.com/),MRS服务样 例工程依赖的jar包都需要在华为开源镜像站下载,剩余所依赖的开源jar包请直接从 Maven中央库下载。

(17)

说明

本地环境使用开发工具下载依赖的jar包前,需要确认以下信息。

● 确认本地环境网络正常。

打开浏览器访问:华为提供开源镜像站(https://mirrors.huaweicloud.com/repository/

maven/huaweicloudsdk/),查看网站是否能正常访问。如果访问异常,请先开通本地网 络。

● 确认当前开发工具是否开启代理。下载jar包前需要确保开发工具代理关闭。

比如以2020.2版本的IntelliJ IDEA开发工具为例,单击“File > Settings > Appearance &

Behavior > System Settings > HTTP Proxy”,选择“No proxy”,单击“OK”保存配置。

华为云开源镜像配置方式如下所示:

步骤1 使用前请确保您已安装JDK 1.8及以上版本和Maven 3.0及以上版本。

步骤2 打开网址https://mirrors.huaweicloud.com/并登录华为开源镜像站。

步骤3 点击此处,下载华为开源镜像站提供的“settings.xml”文件,覆盖至“<本地Maven

安装目录>/conf/settings.xml”文件即可。

若无法直接下载,在华为开源镜像站搜索并单击名称为“HuaweiCloud SDK”的版 块,按照页面弹出的设置方法进行操作。

步骤4 参考以下方法手动修改“setting.xml”配置文件或者组件样例工程中的“pom.xml”

文件,配置镜像仓地址。

● 配置方法一:

手动在setting.xml配置文件的mirrors节点中添加开源镜像仓地址:

<mirror>

<id>repo2</id>

<mirrorOf>central</mirrorOf>

<url>https://repo1.maven.org/maven2/</url>

</mirror>

在setting.xml配置文件的profiles节点中添加如下镜像仓地址:

<profile>

<id>huaweicloudsdk</id>

<repositories>

<repository>

<id>huaweicloudsdk</id>

<url>https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/</url>

<releases><enabled>true</enabled></releases>

<snapshots><enabled>true</enabled></snapshots>

</repository>

</repositories>

</profile>

在setting.xml配置文件的activeProfiles节点中添加如下镜像仓地址:

<activeProfile>huaweicloudsdk</activeProfile>

说明

华为云开源镜像站不提供第三方开源jar包下载,请配置华为云开源镜像后,额外配置第三 方Maven镜像仓库地址。

● 配置方法二:

在二次开发工程样例工程中的pom.xml文件添加如下镜像仓地址:

<repositories>

<repository>

<id>huaweicloudsdk</id>

<url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url>

(18)

<releases><enabled>true</enabled></releases>

<snapshots><enabled>true</enabled></snapshots>

</repository>

<repository>

<id>central</id>

<name>Mavn Centreal</name>

<url>https://repo1.maven.org/maven2/</url>

</repository>

</repositories>

----结束

(19)

3 HBase 应用开发

3.1 概述

3.1.1 应用开发简介

HBase 简介

HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。HBase设计目标 是用来解决关系型数据库在处理海量数据时的局限性。

HBase使用场景有如下几个特点:

● 处理海量数据(TB或PB级别以上)。

● 具有高吞吐量。

● 在海量数据中实现高效的随机读取。

● 具有很好的伸缩能力。

● 能够同时处理结构化和非结构化的数据。

● 不需要完全拥有传统关系型数据库所具备的ACID特性。ACID特性指原子性

(Atomicity)、一致性(Consistency)、隔离性(Isolation,又称独立性)、持 久性(Durability)。

● HBase中的表具有如下特点:

– 大:一个表可以有上亿行,上百万列。

– 面向列:面向列(族)的存储和权限控制,列(族)独立检索。

– 稀疏:对于为空(null)的列,并不占用存储空间,因此,表可以设计的非常 稀疏。

接口类型简介

由于HBase本身是由java语言开发出来的,且java语言具有简洁通用易懂的特性,推荐 用户使用java语言进行HBase应用程序开发。

HBase采用的接口与Apache HBase保持一致,请参见:http://hbase.apache.org/

apidocs/index.html。

(20)

HBase通过接口调用,可提供的功能如表3-1所示。

3-1 HBase 接口提供的功能

功能 说明

CRUD数据读写功能 增查改删

高级特性 过滤器、协处理器

管理功能 表管理、集群管理

3.1.2 常用概念

● 过滤器

过滤器提供了非常强大的特性来帮助用户提高HBase处理表中数据的效率。用户 不仅可以使用HBase中预定义好的过滤器,而且可以实现自定义的过滤器。

● 协处理器

允许用户执行region级的操作,并且可以使用与RDBMS中触发器类似的功能。

Client

客户端直接面向用户,可通过Java API或HBase Shell访问服务端,对HBase的表 进行读写操作。本文中的HBase客户端特指从装有HBase服务的MRS Manager上 下载的HBase client安装包,里面包含通过Java API访问HBase的样例代码。

3.1.3 开发流程

本文档主要基于Java API对HBase进行应用开发。

开发流程中各阶段的说明如图3-1和表3-2所示。

(21)

3-1 HBase 应用程序开发流程

3-2 HBase 应用开发的流程说明

阶段 说明 参考文档

了解基本概念 在开始开发应用前,需要

了解HBase的基本概念,

了解场景需求,设计表 等。

常用概念

准备开发环境和运行环境 HBase的应用程序当前推 荐使用Java语言进行开 发。可使用Eclipse工具。

HBase的运行环境即 HBase客户端,请根据指 导完成客户端的安装和配 置。

开发和运行环境简介

(22)

阶段 说明 参考文档

准备工程 HBase提供了不同场景下

的样例程序,您可以导入 样例工程进行程序学习。

或者您可以根据指导,新 建一个HBase工程。

配置并导入样例工程

根据场景开发工程 提供了Java语言的样例工 程,包含从建表、写入到 删除表全流程的样例工 程。

典型场景开发思路

编译并运行程序 指导用户将开发好的程序

编译并提交运行。

调测程序

查看程序运行结果 程序运行结果会写在用户

指定的路径下。用户还可 以通过UI查看应用运行情 况。

查看调测结果

3.2 环境准备

3.2.1 开发和运行环境简介

在进行二次开发时,要准备的开发环境如表3-3所示。同时需要准备运行调测的Linux 环境,用于验证应用程序运行正常。

3-3 开发环境

准备项 说明

操作系统 Windows系统,推荐Windows 7及以上

版本。

安装JDK 开发环境的基本配置。版本要求:1.8及

以上。

安装和配置Eclipse 用于开发HBase应用程序的工具。

安装Maven 用于编译样例工程。

网络 确保客户端与HBase服务主机在网络上互

通。

● 选择Windows开发环境下,安装Eclipse,安装JDK。

请安装JDK1.8及以上版本。Eclipse使用支持JDK1.8及以上的版本,并安装JUnit插件。

(23)

说明

● 若使用IBM JDK,请确保Eclipse中的JDK配置为IBM JDK。

● 若使用Oracle JDK,请确保Eclipse中的JDK配置为Oracle JDK。

● 不同的Eclipse不要使用相同的workspace和相同路径下的示例工程。

● 准备一个应用程序运行测试的Linux环境。

准备运行调测环境

步骤1 在弹性云服务器管理控制台,申请一个新的弹性云服务器,用于应用开发、运行、调 测。

● 弹性云服务器的安全组需要和MRS集群Master节点的安全组相同。

● 弹性云服务器的VPC需要与MRS集群在同一个VPC中。

● 弹性云服务器的网卡需要与MRS集群在同一个网段中。

步骤2 申请弹性IP,绑定新申请的ECS的IP,并配置安全组出入规则。

步骤3 下载客户端程序,请参考下载MRS客户端。

步骤4 登录存放下载的客户端的节点,再安装客户端。

1. 执行以下命令解压客户端包:

cd /opt

tar -xvf /opt/MRS_Services_Client.tar 2. 执行以下命令校验安装文件包:

sha256sum -c /opt/MRS_Services_ClientConfig.tar.sha256

MRS_Services_ClientConfig.tar:OK

3. 执行以下命令解压安装文件包:

tar -xvf /opt/MRS_Services_ClientConfig.tar

4. 执行如下命令安装客户端到指定目录(绝对路径),例如“/opt/client”。目录会 自动创建。

cd /opt/MRS_Services_ClientConfig sh install.sh /opt/client

Components client installation is complete.

----结束

3.2.2 准备开发用户

开发用户用于运行样例工程。用户需要有HBase权限,才能运行HBase样例工程。

前提条件

MRS服务集群开启了Kerberos认证,没有开启Kerberos认证的集群忽略该步骤。

操作步骤

步骤1 登录MRS Manager,请参考登录MRS Manager。

步骤2 在MRS Manager界面选择“系统设置 > 角色管理 > 添加角色”,如图 1 添加角色所 示。

(24)

3-2 添加角色

1. 填写角色的名称,例如hbaserole。

2. 编辑角色,在“权限”的表格中选择“HBase> HBase Scope”,勾选

“Admin”、“Create”、“Read”、“Write”和“Execute”,单击“确定”

保存,如图3-3所示。

3-3 编辑角色

步骤3 单击“系统设置 > 用户管理 > 添加用户”,为样例工程创建一个用户。

步骤4 填写用户名,例如hbaseuser,用户类型为“机机”用户,加入用户组supergroup,

设置其“主组”为supergroup,并绑定角色hbaserole取得权限,单击“确定”,如 图3-4所示。

(25)

3-4 添加用户

步骤5 在MRS Manager界面选择“系统设置 > 用户管理”,在用户名中选择hbaseuser,然 后在右侧“操作”列中选择“更多 >下载认证凭据”,保存后解压得到用户的

user.keytab文件与krb5.conf文件,用于在样例工程中进行安全认证,如图3-5所示。

3-5 下载认证凭据

----结束

参考信息

如果修改了组件的配置参数,需重新下载客户端配置文件并更新运行调测环境上的客 户端。

3.2.3 配置并导入样例工程

前提条件

确保本地PC的时间与MRS集群的时间差要小于5分钟。MRS集群的时间可通过MRS Manager页面右上角查看。

(26)

3-6 MRS 集群的时间

操作步骤

步骤1 在样例工程获取地址 获取HBase示例工程。

步骤2 在HBase示例工程根目录,即HBase样例工程的“pom.xml”层目录下,打开cmd命令 行窗口,执行 mvn install 进行编译。

步骤3 在步骤2中打开的cmd命令行窗口中,执行 mvn eclipse:eclipse 创建Eclipse工程。

步骤4 设置Eclipse开发环境。

1.在Eclipse的菜单栏中,选择“Window > Preferences”。

弹出“Preferences”窗口。

2.在左边导航上选择“General > Workspace”,在“Text file encoding”区域,选中

“Other”,并设置参数值为“UTF-8”,单击“Apply”,

如图 2 设置Eclipse的编码格式所示。

(27)

3-7 设置 Eclipse 的编码格式

3.在左边导航上选择“Maven > User Settings”, 在“User Settings”中点击

“Browse”导入Maven的settings.xml配置,单击“Apply”,点击“OK”, 如图3-8所示。

(28)

3-8 设置 Eclipse 的 Maven 开发环境

步骤5 在应用开发环境中,导入样例工程到Eclipse开发环境。

1. 选择“File > Import > General > Existing Projects into Workspace > Next

>Browse”。

显示“浏览文件夹”对话框。

2. 选择样例工程文件夹,单击“Finish”。

----结束

3.3 开发程序

3.3.1 典型场景开发思路

通过典型场景,我们可以快速学习和掌握HBase的开发过程,并且对关键的接口函数 有所了解。

场景说明

假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,如表3-4所 示,A业务操作流程如下:

● 创建用户信息表。

(29)

● 在用户信息中新增用户的学历、职称等信息。

● 根据用户编号查询用户姓名和地址。

● 根据用户姓名进行查询。

● 查询年龄段在[20–29]之间的用户信息。

● 数据统计,统计用户信息表的人员数、年龄最大值、年龄最小值、平均年龄。

● 用户销户,删除用户信息表中该用户的数据。

● A业务结束后,删除用户信息表。

3-4 用户信息

编号 姓名 性别 年龄 地址

120050002

01 Zhang

San Male 19 Shenzhen City, Guangdong Province 120050002

02 Li

Wanti ng

Femal

e 23 Hangzhou City, Zhejiang Province

120050002

03 Wang

Ming Male 26 Ningbo City, Zhejiang Province 120050002

04 Li

Gang Male 18 Xiangyang City, Hubei Province 120050002

05 Zhao

Enru Femal

e 21 Shangrao City, Jiangxi Province 120050002

06 Chen

Long Male 32 Zhuzhou City, Hunan Province 120050002

07 Zhou

Wei Femal

e 29 Nanyang City, Henan Province 120050002

08 Yang

Yiwen Femal

e 30 Wenzhou City, Zhejiang Province 120050002

09 Xu

Bing Male 26 Weinan City, Shaanxi Province 120050002

10 Xiao

Kai Male 25 Dalian City, Liaoning Province

数据规划

合理地设计表结构、行键、列名能充分利用HBase的优势。本样例工程以唯一编号作 为RowKey,列都存储在info列族中。

功能分解

根据上述的业务场景进行功能分解,需要开发的功能点如表3-5所示。

(30)

3-5 在 HBase 中开发的功能

序号 步骤 代码实现

1 根据表3-4中的信息创建表。 请参见创建表。

2 导入用户数据。 请参见插入数据。

3 增加“教育信息”列族,在用户信息中新增用 户的学历、职称等信息。

请参见修改表。

4 根据用户编号查询用户姓名和地址。 请参见使用Get读取数

据。

5 根据用户姓名进行查询。 请参见使用过滤器Filter。

6 用户销户,删除用户信息表中该用户的数据。 请参见删除数据。

7 A业务结束后,删除用户信息表。 请参见删除表。

关键设计原则

HBase是以RowKey为字典排序的分布式数据库系统,RowKey的设计对性能影响很 大,具体的RowKey设计请考虑与业务结合。

3.3.2 创建 Configuration

功能介绍

HBase通过加载配置文件来获取配置项,包括用户登录信息配置项。

代码样例

下面代码片段在com.huawei.bigdata.hbase.examples包中。

调用类TestMain下的init()方法会初始化Configuration对象:

private static void init() throws IOException { // load hbase client info

if(clientInfo == null) {

clientInfo = new ClientInfo(CONF_DIR + HBASE_CLIENT_PROPERTIES);

restServerInfo = clientInfo.getRestServerInfo();

}

// Default load from conf directory conf = HBaseConfiguration.create();

conf.addResource(CONF_DIR + "core-site.xml");

conf.addResource(CONF_DIR + "hdfs-site.xml");

conf.addResource(CONF_DIR + "hbase-site.xml");

}

(31)

3.3.3 创建 Connection

功能介绍

HBase通过ConnectionFactory.createConnection(configuration)方法创建Connection 对象。传递的参数为上一步创建的Configuration。

Connection封装了底层与各实际服务器的连接以及与ZooKeeper的连接。Connection 通过ConnectionFactory类实例化。创建Connection是重量级操作,Connection是线程 安全的,因此,多个客户端线程可以共享一个Connection。

典型的用法,一个客户端程序共享一个单独的Connection,每一个线程获取自己的 Admin或Table实例,然后调用Admin对象或Table对象提供的操作接口。不建议缓存或 者池化Table、Admin。Connection的生命周期由调用者维护,调用者通过调用

close(),释放资源。

代码样例

以下代码片段是创建Connection的示例:

private TableName tableName = null;

private Configuration conf = null;

private Connection conn = null;

public static final String TABLE_NAME = "hbase_sample_table";

public HBaseExample(Configuration conf) throws IOException { this.conf = conf;

this.tableName = TableName.valueOf(TABLE_NAME);

this.conn = ConnectionFactory.createConnection(conf);

}

说明

1. 样例代码中有很多的操作,如建表、查询、删表等等,这里只列举了建表testCreateTable和 删除表dropTable这2种操作。可参考对应章节样例。

2. 创建表操作所需的Admin对象是从Connection对象获取。

3. 登录代码要避免重复调用。

3.3.4 创建表

功能简介

HBase通过org.apache.hadoop.hbase.client.Admin对象的createTable方法来创建表,

并指定表名、列族名。创建表有两种方式,建议采用预分Region建表方式:

● 快速建表,即创建表后整张表只有一个Region,随着数据量的增加会自动分裂成 多个Region。

● 预分Region建表,即创建表时预先分配多个Region,此种方法建表可以提高写入 大量数据初期的数据写入速度。

说明

表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。

(32)

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的 testCreateTable方法中

MRS 3.x及以后版本请使用以下代码创建表:

public static void testCreateTable() { LOG.info("Entering testCreateTable.");

// Set the column family name.

byte [] fam = Bytes.toBytes("info");

ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam) // Set data encoding methods. HBase provides DIFF,FAST_DIFF,PREFIX

// HBase 2.0 removed `PREFIX_TREE` Data Block Encoding from column families.

.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)

// Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY

// GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data

// SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data.

// it is advised to use SANPPY

.setCompressionType(Compression.Algorithm.SNAPPY) .build();

TableDescriptor htd =

TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(familyDescriptor).build();

Admin admin = null;

try {

// Instantiate an Admin object.

admin = conn.getAdmin();

if (!admin.tableExists(tableName)) { LOG.info("Creating table...");

// create table

admin.createTable(htd);

LOG.info(admin.getClusterMetrics());

LOG.info(admin.listNamespaceDescriptors());

LOG.info("Table created successfully.");

} else {

LOG.warn("table already exists");

}

} catch (IOException e) {

LOG.error("Create table failed.", e);

} finally {

if (admin != null) { try {

// Close the Admin object.

admin.close();

} catch (IOException e) {

LOG.error("Failed to close admin ", e);

} } }

LOG.info("Exiting testCreateTable.");

}MRS 3.x之前版本请使用以下代码创建表:

public static void testCreateTable() { LOG.info("Entering testCreateTable.");

// Specify the table descriptor.

HTableDescriptor htd = new HTableDescriptor(tableName);

// Set the column family name to info.

HColumnDescriptor hcd = new HColumnDescriptor("info");

// Set data encoding methods.HBase provides DIFF,FAST_DIFF,PREFIX // and PREFIX_TREE

hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);

// Set compression methods,HBase provides two default compression // methods:GZ and SNAPPY

(33)

// GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data

// SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data.

// it is advised to use SANPPY

hcd.setCompressionType(Compression.Algorithm.SNAPPY);

htd.addFamily(hcd);

Admin admin = null;

try {

// Instantiate an Admin object.

admin = conn.getAdmin();

if (!admin.tableExists(tableName)) { LOG.info("Creating table...");

// create table

admin.createTable(htd);

LOG.info(admin.getClusterStatus());

LOG.info(admin.listNamespaceDescriptors());

LOG.info("Table created successfully.");

} else {

LOG.warn("table already exists");

}

} catch (IOException e) {

LOG.error("Create table failed.", e);

} finally {

if (admin != null) { try {

// Close the Admin object.

admin.close();

} catch (IOException e) {

LOG.error("Failed to close admin ", e);

} } }

LOG.info("Exiting testCreateTable.");

}

3.3.5 删除表

功能简介

HBase通过org.apache.hadoop.hbase.client.Admin的deleteTable方法来删除表。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的 dropTable方法中

public void dropTable() {

LOG.info("Entering dropTable.");

Admin admin = null;

try {

admin = conn.getAdmin();

if (admin.tableExists(tableName)) { // Disable the table before deleting it.

admin.disableTable(tableName);//注[1]

// Delete table.

admin.deleteTable(tableName);

}

LOG.info("Drop table successfully.");

} catch (IOException e) {

LOG.error("Drop table failed ", e);

} finally {

(34)

if (admin != null) { try {

// Close the Admin object.

admin.close();

} catch (IOException e) {

LOG.error("Close admin failed ", e);

} } }

LOG.info("Exiting dropTable.");

}

注意事项

注[1]只有表被disable时,才能被删除掉,所以deleteTable常与disableTable,

enableTable,tableExists,isTableEnabled,isTableDisabled结合在一起使用。

3.3.6 修改表

功能简介

HBase通过org.apache.hadoop.hbase.client.Admin的modifyTable方法修改表信息。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的 testModifyTable方法中

MRS 3.x及以后的版本使用以下代码修改表:

public void testModifyTable() {

LOG.info("Entering testModifyTable.");

// Specify the column family name.

byte[] familyName = Bytes.toBytes("education");

Admin admin = null;

try {

// Instantiate an Admin object.

admin = conn.getAdmin();

// Obtain the table descriptor.

TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName).build();

// Check whether the column family is specified before modification.

if (!htd.hasColumnFamily(familyName)) { // Create the column descriptor.

ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(familyName);

TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName)) .setColumnFamily(cfd)

.build();

// Disable the table to get the table offline before modifying // the table.

admin.disableTable(tableName);

// Submit a modifyTable request.

admin.modifyTable(td);

// Enable the table to get the table online after modifying the // table.

admin.enableTable(tableName);

}

LOG.info("Modify table successfully.");

} catch (IOException e) {

LOG.error("Modify table failed ", e);

} finally {

if (admin != null) { try {

// Close the Admin object.

admin.close();

} catch (IOException e) {

(35)

LOG.error("Close admin failed ", e);

} } }

LOG.info("Exiting testModifyTable.");

}MRS 3.x之前的版本使用以下代码修改表:

public void testModifyTable() {

LOG.info("Entering testModifyTable.");

// Specify the column family name.

byte[] familyName = Bytes.toBytes("education");

Admin admin = null;

try {

// Instantiate an Admin object.

admin = conn.getAdmin();

// Obtain the table descriptor.

HTableDescriptor htd = admin.getTableDescriptor(tableName);

// Check whether the column family is specified before modification.

if (!htd.hasFamily(familyName)) { // Create the column descriptor.

HColumnDescriptor hcd = new HColumnDescriptor(familyName);

htd.addFamily(hcd);

// Disable the table to get the table offline before modifying // the table.

admin.disableTable(tableName);//注[1]

// Submit a modifyTable request.

admin.modifyTable(tableName, htd);

// Enable the table to get the table online after modifying the // table.

admin.enableTable(tableName);

}

LOG.info("Modify table successfully.");

} catch (IOException e) {

LOG.error("Modify table failed ", e);

} finally {

if (admin != null) { try {

// Close the Admin object.

admin.close();

} catch (IOException e) {

LOG.error("Close admin failed ", e);

} } }

LOG.info("Exiting testModifyTable.");

}

说明

注[1]modifyTable只有表被disable时,才能生效。

3.3.7 插入数据

功能简介

HBase是一个面向列的数据库,一行数据,可能对应多个列族,而一个列族又可以对 应多个列。通常,写入数据的时候,我们需要指定要写入的列(含列族名称和列名 称)。HBase通过HTable的put方法来Put数据,可以是一行数据也可以是数据集。

(36)

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的 testPut方法中

public void testPut() {

LOG.info("Entering testPut.");

// Specify the column family name.

byte[] familyName = Bytes.toBytes("info");

// Specify the column name.

byte[][] qualifiers = {Bytes.toBytes("name"), Bytes.toBytes("gender"), Bytes.toBytes("age"), Bytes.toBytes("address")};

Table table = null;

try {

// Instantiate an HTable object.

table = conn.getTable(tableName);

List<Put> puts = new ArrayList<Put>();

// Instantiate a Put object.

Put put = new Put(Bytes.toBytes("012005000201"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Zhang San"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Male"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("19"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Shenzhen, Guangdong"));

puts.add(put);

put = new Put(Bytes.toBytes("012005000202"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Li Wanting"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Female"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("23"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Shijiazhuang, Hebei"));

puts.add(put);

put = new Put(Bytes.toBytes("012005000203"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Wang Ming"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Male"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("26"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Ningbo, Zhejiang"));

puts.add(put);

put = new Put(Bytes.toBytes("012005000204"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Li Gang"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Male"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("18"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Xiangyang, Hubei"));

puts.add(put);

put = new Put(Bytes.toBytes("012005000205"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Zhao Enru"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Female"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("21"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Shangrao, Jiangxi"));

puts.add(put);

put = new Put(Bytes.toBytes("012005000206"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Chen Long"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Male"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("32"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Zhuzhou, Hunan"));

puts.add(put);

put = new Put(Bytes.toBytes("012005000207"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Zhou Wei"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Female"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("29"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Nanyang, Henan"));

puts.add(put);

(37)

put = new Put(Bytes.toBytes("012005000208"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Yang Yiwen"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Female"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("30"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Kaixian, Chongqing"));

puts.add(put);

put = new Put(Bytes.toBytes("012005000209"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Xu Bing"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Male"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("26"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Weinan, Shaanxi"));

puts.add(put);

put = new Put(Bytes.toBytes("012005000210"));

put.addColumn(familyName, qualifiers[0], Bytes.toBytes("Xiao Kai"));

put.addColumn(familyName, qualifiers[1], Bytes.toBytes("Male"));

put.addColumn(familyName, qualifiers[2], Bytes.toBytes("25"));

put.addColumn(familyName, qualifiers[3], Bytes.toBytes("Dalian, Liaoning"));

puts.add(put);

// Submit a put request.

table.put(puts);

LOG.info("Put successfully.");

} catch (IOException e) { LOG.error("Put failed ", e);

} finally {

if (table != null) { try {

// Close the HTable object.

table.close();

} catch (IOException e) {

LOG.error("Close table failed ", e);

} } }

LOG.info("Exiting testPut.");

}

注意事项

不允许多个线程在同一时间共用同一个HTable实例。HTable是一个非线程安全类,因 此,同一个HTable实例,不应该被多个线程同时使用,否则可能会带来并发问题。

3.3.8 删除数据

功能简介

HBase通过Table实例的delete方法来Delete数据,可以是一行数据也可以是数据集。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的 testDelete方法中

public void testDelete() {

LOG.info("Entering testDelete.");

byte[] rowKey = Bytes.toBytes("012005000201");

Table table = null;

try {

// Instantiate an HTable object.

(38)

table = conn.getTable(tableName);

// Instantiate a Delete object.

Delete delete = new Delete(rowKey);

// Submit a delete request.

table.delete(delete);

LOG.info("Delete table successfully.");

} catch (IOException e) {

LOG.error("Delete table failed ", e);

} finally {

if (table != null) { try {

// Close the HTable object.

table.close();

} catch (IOException e) {

LOG.error("Close table failed ", e);

} } }

LOG.info("Exiting testDelete.");

}

3.3.9 使用 Get 读取数据

功能简介

要从表中读取一条数据,首先需要实例化该表对应的Table实例,然后创建一个Get对 象。也可以为Get对象设定参数值,如列族的名称和列的名称。查询到的行数据存储在 Result对象中,Result中可以存储多个Cell。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的 testGet方法中

public void testGet() {

LOG.info("Entering testGet.");

// Specify the column family name.

byte[] familyName = Bytes.toBytes("info");

// Specify the column name.

byte[][] qualifier = {Bytes.toBytes("name"), Bytes.toBytes("address")};

// Specify RowKey.

byte[] rowKey = Bytes.toBytes("012005000201");

Table table = null;

try {

// Create the Configuration instance.

table = conn.getTable(tableName);

// Instantiate a Get object.

Get get = new Get(rowKey);

// Set the column family name and column name.

get.addColumn(familyName, qualifier[0]);

get.addColumn(familyName, qualifier[1]);

// Submit a get request.

Result result = table.get(get);

// Print query results.

for (Cell cell : result.rawCells()) {

LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"

+ Bytes.toString(CellUtil.cloneFamily(cell)) + ","

(39)

+ Bytes.toString(CellUtil.cloneQualifier(cell)) + ","

+ Bytes.toString(CellUtil.cloneValue(cell)));

}

LOG.info("Get data successfully.");

} catch (IOException e) {

LOG.error("Get data failed ", e);

} finally {

if (table != null) { try {

// Close the HTable object.

table.close();

} catch (IOException e) {

LOG.error("Close table failed ", e);

} } }

LOG.info("Exiting testGet.");

}

3.3.10 使用 Scan 读取数据

功能简介

要从表中读取数据,首先需要实例化该表对应的Table实例,然后创建一个Scan对象,

并针对查询条件设置Scan对象的参数值,为了提高查询效率,最好指定StartRow和 StopRow。查询结果的多行数据保存在ResultScanner对象中,每行数据以Result对象 形式存储,Result中存储了多个Cell。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的 testScanData方法中

public void testScanData() {

LOG.info("Entering testScanData.");

Table table = null;

// Instantiate a ResultScanner object.

ResultScanner rScanner = null;

try {

// Create the Configuration instance.

table = conn.getTable(tableName);

// Instantiate a Get object.

Scan scan = new Scan();

scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));

// Set the StartRow

scan.setStartRow(Bytes.toBytes("012005000202"));//注[1]

// Set the StopRow

scan.setStopRow(Bytes.toBytes("012005000210"));//注[1]

// Set the Caching size.

scan.setCaching(1000);//注[2]

// Set the Batch size.

scan.setBatch(100);//注[2]

// Submit a scan request.

rScanner = table.getScanner(scan);

// Print query results.

for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) {

(40)

LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"

+ Bytes.toString(CellUtil.cloneFamily(cell)) + ","

+ Bytes.toString(CellUtil.cloneQualifier(cell)) + ","

+ Bytes.toString(CellUtil.cloneValue(cell)));

} }

LOG.info("Scan data successfully.");

} catch (IOException e) {

LOG.error("Scan data failed ", e);

} finally {

if (rScanner != null) { // Close the scanner object.

rScanner.close();

}

if (table != null) { try {

// Close the HTable object.

table.close();

} catch (IOException e) {

LOG.error("Close table failed ", e);

} } }

LOG.info("Exiting testScanData.");

}

注意事项

1. 建议Scan时指定StartRow和StopRow,一个有确切范围的Scan,性能会更好些。

2. 可以设置Batch和Caching关键参数。

– Batch

使用Scan调用next接口每次最大返回的记录数,与一次读取的列数有关。

– Caching

RPC请求返回next记录的最大数量,该参数与一次RPC获取的行数有关。

3.3.11 使用过滤器 Filter

功能简介

HBase Filter主要在Scan和Get过程中进行数据过滤,通过设置一些过滤条件来实现,

如设置RowKey、列名或者列值的过滤条件。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的 testFilterList方法中

public void testFilterList() {

LOG.info("Entering testFilterList.");

Table table = null;

// Instantiate a ResultScanner object.

ResultScanner rScanner = null;

try {

// Create the Configuration instance.

table = conn.getTable(tableName);

// Instantiate a Get object.

Scan scan = new Scan();

(41)

scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));

// Instantiate a FilterList object in which filters have "and"

// relationship with each other.

FilterList list = new FilterList(Operator.MUST_PASS_ALL);

// Obtain data with age of greater than or equal to 20.

//MRS 3.x及其之后版本使用CompareOperator替换CompareOp

list.addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(new Long(20))));

// Obtain data with age of less than or equal to 29.

//MRS 3.x及其之后版本使用CompareOperator替换CompareOp

list.addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(new Long(29))));

scan.setFilter(list);

// Submit a scan request.

rScanner = table.getScanner(scan);

// Print query results.

for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) {

LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"

+ Bytes.toString(CellUtil.cloneFamily(cell)) + ","

+ Bytes.toString(CellUtil.cloneQualifier(cell)) + ","

+ Bytes.toString(CellUtil.cloneValue(cell)));

} }

LOG.info("Filter list successfully.");

} catch (IOException e) {

LOG.error("Filter list failed ", e);

} finally {

if (rScanner != null) { // Close the scanner object.

rScanner.close();

}

if (table != null) { try {

// Close the HTable object.

table.close();

} catch (IOException e) {

LOG.error("Close table failed ", e);

} } }

LOG.info("Exiting testFilterList.");

}

3.3.12 添加二级索引

功能介绍

您可以使用org.apache.hadoop.hbase.hindex.client.HIndexAdmin中提供的方法来管 理HIndexes。 该类提供了将索引添加到现有表的方法:

根据用户是否希望在添加索引操作期间构建索引数据,有两种不同的方法可将索引添 加到表中:

● addIndicesWithData()

● addIndices()

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HIndexExample”类的 addIndicesExample方法中:

(42)

addIndices(): 将索引添加到没有数据的表中

public void addIndicesExample() { LOG.info("Entering Adding a Hindex.");

// Create index instance

TableIndices tableIndices = new TableIndices();

HIndexSpecification spec = new HIndexSpecification(indexNameToAdd);

//MRS 3.x及其以后版本推荐使用ColumnFamilyDescriptorBuilder替换HColumnDescriptor来构造添加列操 作。如下,

//

spec.addIndexColumn(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build(),"name", //ValueType.STRING, null);

spec.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.STRING);

tableIndices.addIndex(spec);

Admin admin = null;

HIndexAdmin iAdmin = null;

try {

admin = conn.getAdmin();

iAdmin = HIndexClient.newHIndexAdmin(admin);

// add index to the table

iAdmin.addIndices(tableName, tableIndices);

// Alternately, add the specified indices with data // iAdmin.addIndicesWithData(tableName, tableIndices);

LOG.info("Successfully added indices to the table " + tableName);

} catch (IOException e) {

LOG.error("Add Indices failed for table " + tableName + "." + e);

} finally {

if (iAdmin != null) { try {

// Close the HIndexAdmin object.

iAdmin.close();

} catch (IOException e) {

LOG.error("Failed to close HIndexAdmin ", e);

} }

if (admin != null) { try {

// Close the Admin object.

admin.close();

} catch (IOException e) {

LOG.error("Failed to close admin ", e);

} } }

LOG.info("Exiting Adding a Hindex.");

}

以下代码片段在com.huawei.bigdata.hbase.examples包的“HIndexExample”类的 addIndicesExampleWithData方法中:

addIndicesWithData():将索引添加到具有大量预先存在数据的表中

public void addIndicesExampleWithData() { LOG.info("Entering Adding a Hindex With Data.");

// Create index instance

TableIndices tableIndices = new TableIndices();

HIndexSpecification spec = new HIndexSpecification(indexNameToAdd);

//MRS 3.x及其以后版本推荐使用ColumnFamilyDescriptorBuilder替换HColumnDescriptor来构造添加列操 作。如下,

//spec.addIndexColumn(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info")).build(),"age", //ValueType.STRING, null);

spec.addIndexColumn(new HColumnDescriptor("info"), "age", ValueType.STRING);

tableIndices.addIndex(spec);

Admin admin = null;

HIndexAdmin iAdmin = null;

try {

admin = conn.getAdmin();

iAdmin = HIndexClient.newHIndexAdmin(admin);

// add index to the table

數據

表 3-5 在 HBase 中开发的功能 序号 步骤 代码实现 1 根据表3-4中的信息创建表。 请参见创建表。 2 导入用户数据。 请参见插入数据。 3 增加“教育信息”列族,在用户信息中新增用 户的学历、职称等信息。 请参见修改表。 4 根据用户编号查询用户姓名和地址。 请参见使用Get读取数 据。 5 根据用户姓名进行查询。 请参见使用过滤器Filter。 6 用户销户,删除用户信息表中该用户的数据。 请参见删除数据。 7 A业务结束后,删除用户信息表。 请参见删除表。 关键设计原则 HBase是以R
表 3-6 org.apache.hadoop.hbase.client.replication.ReplicationAdmin
表 3-8 org.apache.hadoop.hbase.replication.ReplicationLoadSink 方法 描述 getAgeOfLastAppliedOp() 返回类型:long 返回:上次成功的应用wal编辑的持续 毫秒数 getTimeStampsOfLastAppliedOp() 返回类型:long 返回:上次成功的应用wal编辑的时间 戳 getAppliedBatches() 返回类型:long 返回:应用的数据总批数 getAppliedOps() 返回类型:long 返
表 4-4 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 步骤2 数据分析。 数据分析代码实现,请见数据查询。 ● 查看薪水支付币种为美元的雇员联系方式。 ● 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果
+5

參考文獻

相關文件

interface ITextBox : IControl// 繼承了介面 Icontrol 的方法 Paint() { void SetText(string text); }. interface IListBox : IControl// 繼承了介面 Icontrol 的方法 Paint() {

 Context level: Teacher familiarizes the students with the writing topic/ background (through videos/ pictures/ pre- task)..  Text level: Show a model consequential explanation

private void answerLB Click(object sender private void answerLB_Click(object sender,. System.EventArgs

鉴于课程发展和教学方法的研究和实践一日千里,加上教育局课程发展处多 年来透过不同途径,搜集各界对历史课程及教学等方面的意见,课程发展议会于

密碼系統中,通常將想要保護的密碼訊息稱為 plain text。而將經過加密後產生的加密訊息稱為 cipher text。在這 中間的過程,會用到可以對外供應的 Public Key 以及私人保

MASS::lda(Y~.,data) Linear discriminant analysis MASS::qda(Y~.,data) Quadratic Discriminant Analysis class::knn(X,X,Y,k,prob) k-Nearest Neighbour(X 為變數資料;Y 為分類)

打商务电话要用“您好”开头,多 说“请”字,以“谢谢”收尾。通话态 度要亲和,声音大小要适中,始终保持

All necessary information is alive in IRIS, and is contin- uously updated according to agreed procedures (PDCA) to support business processes Data Migration No analysis of