• 沒有找到結果。

2.10 使用 Spark 作业跨源访问数据源

2.10.4 对接 HBase

2.10.4.1 MRS 配置

}

2.10.4 对接 HBase

2.10.4.1 MRS 配置

DLI 跨源连接中配置 MRS 主机信息

1. 在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指 南》。

2. 对接MRS HBase需要在DLI队列的host文件中添加MRS集群节点的/etc/hosts信 息。

详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。

开启 Kerberos 认证时的相关配置文件

1. 参考《从零开始使用Kerberos认证集群》中的“创建安全集群并登录其

Manager”章节创建Kerberos认证集群。参考“创建角色和用户”章节添加用户 并赋权。

2. 参考《使用HBase客户端》使用1中创建的用户认证登录。“人机”用户第一次登 录时需修改密码。

3. 登录Manager界面,选择“系统 > 权限 > 用户”,选择新建用户,选择“更多 >

下载认证凭据”,保存后解压得到用户的keytab文件与krb5.conf文件。

创建 MRS HBase 表

创建DLI表关联MRS HBase表之前确保HBase的表是存在的。以样例代码为例,具体的 流程是:

1. 远程登录ECS,通过hbase shell命令查看表信息。其中,“hbtest”是要查询的表 名。describe 'hbtest'

2. (可选)如果不存在对应的HBase表,可以创建该表,具体的命令是:

create ‘hbtest’, ‘info’, ‘detail’

其中,“hbtest”是表名,其余为列族名。

3. 配置好连接信息。“TableName”对应HBase表的表名,“Rowkey”和“Cols”

请参考《创建DLI表关联HBase》进行配置。

2.10.4.2 scala 样例代码 开发说明

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

import相关依赖包

import scala.collection.mutable

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._

b. 创建会话。

val sparkSession = SparkSession.builder().getOrCreate()

c. 创建DLI跨源访问 HBase的关联表。

如果对接的HBase集群未开启Kerberos认证,则样例代码参考如下。

sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN,

'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (

'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1',

'RowKey'='id:5,location:6,city:7',

'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf, doublef:CF1.doublef')"

)

如果对接的HBase集群开启了Kerberos认证,则样例代码参考如下。

sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN,

'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (

'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1',

'RowKey'='id:5,location:6,city:7',

'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf, doublef:CF1.doublef',

'krb5conf'='./krb5.conf', 'keytab' = './user.keytab',

sparkSession.sql("insert into test_hbase values('12345','abc','guiyang',false,null,3,23,2.3,2.34)")

b. 查询数据

sparkSession.sql("select * from test_hbase").show ()

返回结果:

● 通过DataFrame API访问 a. 构造schema

val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType)

val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType)

val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef)

b. 根据schema的类型构造数据

val mutableRow: Seq[Any] = Seq("12345","abc","guiyang",false,null,3,23,2.3,2.34) val rddData: RDD[Row] =

sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)

c. 导入数据到HBase

sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase")

d. 读取HBase上的数据

val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1"

map("RowKey") = "id:5,location:6,city:7"

map("Cols") =

"booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.

doublef"

map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,

cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"

sparkSession.read.schema(new

StructType(attrs)).format("hbase").options(map.toMap).load().show()

返回结果:

● 提交Spark作业

a. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索用户 指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

b. 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和 user.keytab文件添加到作业的其他依赖文件中,未开启Kerberos认证该步骤 忽略。如图2-39所示:

2-39 添加依赖文件

c. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

● 通过SQL API访问

– 未开启Kerberos认证样例代码

import org.apache.spark.sql.SparkSession object Test_SparkSql_HBase {

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val sparkSession = SparkSession.builder().getOrCreate() /**

* Create an association table for the DLI association Hbase table */

sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN,

'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (

'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1',

'RowKey'='id:5,location:6,city:7',

'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf, longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')") //*****************************SQL model***********************************

sparkSession.sql("insert into test_hbase values('12345','abc','guiyang',false,null,3,23,2.3,2.34)") sparkSession.sql("select * from test_hbase").collect()

sparkSession.close() }}

– 开启Kerberos认证样例代码

import org.apache.spark.SparkFiles import org.apache.spark.sql.SparkSession

import java.io.{File, FileInputStream, FileOutputStream}

object Test_SparkSql_HBase_Kerberos {

def copyFile2(Input:String)(OutPut:String): Unit ={

val fis = new FileInputStream(Input) val fos = new FileOutputStream(OutPut) val buf = new Array[Byte](1024) var len = 0

while ({len = fis.read(buf);len} != -1){

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val sparkSession = SparkSession.builder().getOrCreate() val sc = sparkSession.sparkContext

sc.addFile("krb5.conf的obs地址") sc.addFile("user.keytab的obs地址") Thread.sleep(10)

val krb5_startfile = new File(SparkFiles.get("krb5.conf")) val keytab_startfile = new File(SparkFiles.get("user.keytab")) val path_user = System.getProperty("user.dir")

val keytab_endfile = new File(path_user + "/" + keytab_startfile.getName) val krb5_endfile = new File(path_user + "/" + krb5_startfile.getName) println(keytab_endfile)

println(krb5_endfile)

var krbinput = SparkFiles.get("krb5.conf") var krboutput = path_user+"/krb5.conf"

copyFile2(krbinput)(krboutput)

var keytabinput = SparkFiles.get("user.keytab") var keytaboutput = path_user+"/user.keytab"

copyFile2(keytabinput)(keytaboutput) Thread.sleep(10)

/**

* Create an association table for the DLI association Hbase table */

sparkSession.sql("CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " +

"using hbase OPTIONS(" + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') ")

//*****************************SQL model***********************************

sparkSession.sql("insert into testhbase values('newtest',true,1,2,3,4,5)") val result = sparkSession.sql("select * from testhbase")

result.show() sparkSession.close() }}

● 通过DataFrame API访问

import scala.collection.mutable

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._

object Test_SparkSql_HBase {

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val sparkSession = SparkSession.builder().getOrCreate()

// Create an association table for the DLI association Hbase table

sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf'

BOOLEAN,

'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS ( 'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,

cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1',

'RowKey'='id:5,location:6,city:7',

'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.

doublef')")

//*****************************DataFrame model***********************************

// Setting schema

val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType)

val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType)

val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef) // Populate data according to the type of schema

val mutableRow: Seq[Any] = Seq("12345","abc","guiyang",false,null,3,23,2.3,2.34)

val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)

// Import the constructed data into Hbase

sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase") // Read data on Hbase

val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1"

map("RowKey") = "id:5,location:6,city:7"

map("Cols") =

"booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doubl ef"

map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,

cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"

sparkSession.read.schema(new

StructType(attrs)).format("hbase").options(map.toMap).load().collect() sparkSession.close()

}}

2.10.4.3 pyspark 样例代码 开发说明

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType

from pyspark.sql import SparkSession

b. 创建会话

sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()

● 通过SQL API访问

a. 创建DLI跨源访问HBase的关联表

如果对接的HBase集群未开启Kerberos认证,样例代码参考如下。

sparkSession.sql(

"CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\

'ZKHost' = '192.168.0.189:2181',\

'TableName' = 'hbtest',\

'RowKey' = 'id:5',\

'Cols' = 'location:info.location,city:detail.city')")

如果对接的HBase集群开启了Kerberos认证,样例代码参考如下。

sparkSession.sql(

"CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\

'ZKHost' = '192.168.0.189:2181',\

'TableName' = 'hbtest',\

'RowKey' = 'id:5',\

'Cols' = 'location:info.location,city:detail.city',\

'krb5conf' = './krb5.conf',\

'keytab'='./user.keytab',\

'principal' ='krbtest')")

与未开启kerberos认证相比,开启了kerberos认证需要多设置三个参

sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")

c. 读取HBase上的数据

sparkSession.sql("select * from testhbase").show()

● 通过DataFrame API访问

a. 创建DLI跨源访问HBase的关联表

sparkSession.sql(

"CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,

floatf FLOAT, doublef DOUBLE) using hbase OPTIONS ( 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName' = 'table_DupRowkey1',

'RowKey' = 'id:5,location:6,city:7',

'Cols' = 'booleanf:CF1.booleanf, shortf:CF1.shortf, intf:CF1.intf, longf:CF1.longf, floatf:CF1.floatf, doublef:CF1.doublef')")

dataList = sparkSession.sparkContext.parallelize([("11111", "beijin", "beijing", False, 4, 3, 23, 2.3, 2.34)])

d. 创建DataFrame

dataFrame = sparkSession.createDataFrame(dataList, schema)

e. 导入数据到HBase

dataFrame.write.insertInto("test_hbase")

f. 读取HBase上的数据

// Set cross-source connection parameters TableName = "table_DupRowkey1"

RowKey = "id:5,location:6,city:7"

Cols =

"booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.

doublef"

ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,

cloudtable-cf82-zk1- WY09px9l.cloudtable.com:2181"

// select

jdbcDF = sparkSession.read.schema(schema)\

.format("hbase")\

jdbcDF.filter("id = '12333' or id='11111'").show() 说明

id、location、city:限定了长度,插入数据时须按长度给定数据值,否则查询时会发 生编码格式错误。

g. 操作结果;

● 提交Spark作业

a. 将写好的python代码文件上传至DLI中。控制台操作请参考《数据湖探索用户 指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

b. 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和 user.keytab文件添加到作业的其他依赖文件中,未开启Kerberos认证该步骤 忽略。如图2-40所示:

2-40 添加依赖文件

c. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>

《创建批处理作业》。

说明

● 提交作业时,需要指定Module模块,名称为:sys.datasource.hbase。

● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说 明”表说明。

● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。

完整示例代码

● 通过SQL API访问MRS HBase – 未开启kerberos认证样例代码

# _*_ coding: utf-8 _*_

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType

from pyspark.sql import SparkSession if __name__ == "__main__":

# Create a SparkSession session.

sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() sparkSession.sql(

"CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\

'ZKHost' = '192.168.0.189:2181',\

'TableName' = 'hbtest',\

'RowKey' = 'id:5',\

'Cols' = 'location:info.location,city:detail.city')")

sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show()

# close session sparkSession.stop()

– 开启kerberos认证样例代码

# _*_ coding: utf-8 _*_

from __future__ import print_function from pyspark import SparkFiles from pyspark.sql import SparkSession import shutil

import time import os

if __name__ == "__main__":

# Create a SparkSession session.

sparkSession =

SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate() sc = sparkSession.sparkContext

time.sleep(10)

krb5_startfile = SparkFiles.get("krb5.conf") keytab_startfile = SparkFiles.get("user.keytab") path_user = os.getcwd()

krb5_endfile = path_user + "/" + "krb5.conf"

keytab_endfile = path_user + "/" + "user.keytab"

shutil.copy(krb5_startfile, krb5_endfile) shutil.copy(keytab_startfile, keytab_endfile) time.sleep(20)

sparkSession.sql(

"CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " +

"using hbase OPTIONS(" + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') ")

sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show()

# close session sparkSession.stop()

● 通过DataFrame API访问HBase

# _*_ coding: utf-8 _*_

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType

from pyspark.sql import SparkSession if __name__ == "__main__":

# Create a SparkSession session.

sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() # Createa data table for DLI-associated ct

sparkSession.sql(

"CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,

floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName' = 'table_DupRowkey1',

'RowKey' = 'id:5,location:6,city:7', 'Cols' =

'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doubl ef')")

# Create a DataFrame and initialize the DataFrame data.

dataList = sparkSession.sparkContext.parallelize([("11111", "beijin", "beijing", False, 4, 3, 23, 2.3, 2.34)])

# Setting schema

schema = StructType([StructField("id", StringType()), StructField("location", StringType()),

StructField("city", StringType()),

StructField("booleanf", BooleanType()),

dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the cloudtable-hbase

dataFrame.write.insertInto("test_hbase") # Set cross-source connection parameters TableName = "table_DupRowkey1"

RowKey = "id:5,location:6,city:7"

Cols =

"booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doubl ef"

ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,

cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"

# Read data on CloudTable-HBase jdbcDF = sparkSession.read.schema(schema)\

.format("hbase")\

jdbcDF.filter("id = '12333' or id='11111'").show() # close session

sparkSession.stop()

2.10.4.4 java 样例代码

开发说明

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

import相关依赖包

import org.apache.spark.sql.SparkSession;

b. 创建会话

parkSession = SparkSession.builder().appName("datasource-HBase-MRS").getOrCreate();

● 通过SQL API 访问 – 未开启Kerberos认证

i. 创建DLI跨源访问MRS HBase的关联表,填写连接参数。

sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:

5','Cols'='location:info.location,city:detail.city') ");

ii. 插入数据

sparkSession.sql("insert into testhbase values('12345','abc','guiyang')");

iii. 查询数据

sparkSession.sql("select * from testhbase").show();

插入数据后:

– 开启Kerberos认证

i. 创建DLI跨源访问MRS HBase的关联表,填写连接参数。

sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:

5','Cols'='location:info.location,city:detail.city,'krb5conf'='./krb5.conf','keytab'='./

user.keytab','principal'='krbtest') ");

与未开启kerberos认证相比,开启了kerberos认证需要多设置三个参 数,如表2-26所示。

2-26 参数说明

参数名称与参数值 参数说明 'krb5conf' = './

krb5.conf' krb5.conf的地址。

'keytab'='./

user.keytab' Keytab的地址。

'principal'

='krbtest' 认证用户名。

krb5.conf和keytab文件获取请具体参考开启Kerberos认证时的相关配置 文件操作说明。

ii. 插入数据

sparkSession.sql("insert into testhbase values('95274','abc','Hongkong')");

iii. 查询数据

sparkSession.sql("select * from testhbase").show();

● 提交Spark作业

a. 将写好的代码文件生成jar包,上传至DLI中。控制台操作请参考《数据湖探索 用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

b. 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和 user.keytab文件添加到作业的依赖文件中,未开启Kerberos认证该步骤忽 略。如图2-41所示: