2.10 使用 Spark 作业跨源访问数据源
2.10.4 对接 HBase
2.10.4.1 MRS 配置
}
2.10.4 对接 HBase
2.10.4.1 MRS 配置
DLI 跨源连接中配置 MRS 主机信息
1. 在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指 南》。
2. 对接MRS HBase需要在DLI队列的host文件中添加MRS集群节点的/etc/hosts信 息。
详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。
开启 Kerberos 认证时的相关配置文件
1. 参考《从零开始使用Kerberos认证集群》中的“创建安全集群并登录其
Manager”章节创建Kerberos认证集群。参考“创建角色和用户”章节添加用户 并赋权。
2. 参考《使用HBase客户端》使用1中创建的用户认证登录。“人机”用户第一次登 录时需修改密码。
3. 登录Manager界面,选择“系统 > 权限 > 用户”,选择新建用户,选择“更多 >
下载认证凭据”,保存后解压得到用户的keytab文件与krb5.conf文件。
创建 MRS HBase 表
创建DLI表关联MRS HBase表之前确保HBase的表是存在的。以样例代码为例,具体的 流程是:
1. 远程登录ECS,通过hbase shell命令查看表信息。其中,“hbtest”是要查询的表 名。describe 'hbtest'
2. (可选)如果不存在对应的HBase表,可以创建该表,具体的命令是:
create ‘hbtest’, ‘info’, ‘detail’
其中,“hbtest”是表名,其余为列族名。
3. 配置好连接信息。“TableName”对应HBase表的表名,“Rowkey”和“Cols”
请参考《创建DLI表关联HBase》进行配置。
2.10.4.2 scala 样例代码 开发说明
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
import相关依赖包
import scala.collection.mutable
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._
b. 创建会话。
val sparkSession = SparkSession.builder().getOrCreate()
c. 创建DLI跨源访问 HBase的关联表。
▪
如果对接的HBase集群未开启Kerberos认证,则样例代码参考如下。sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN,
'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1',
'RowKey'='id:5,location:6,city:7',
'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf, doublef:CF1.doublef')"
)
▪
如果对接的HBase集群开启了Kerberos认证,则样例代码参考如下。sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN,
'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1',
'RowKey'='id:5,location:6,city:7',
'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf, doublef:CF1.doublef',
'krb5conf'='./krb5.conf', 'keytab' = './user.keytab',
sparkSession.sql("insert into test_hbase values('12345','abc','guiyang',false,null,3,23,2.3,2.34)")
b. 查询数据
sparkSession.sql("select * from test_hbase").show ()
返回结果:
● 通过DataFrame API访问 a. 构造schema
val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType)
val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType)
val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef)
b. 根据schema的类型构造数据
val mutableRow: Seq[Any] = Seq("12345","abc","guiyang",false,null,3,23,2.3,2.34) val rddData: RDD[Row] =
sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
c. 导入数据到HBase
sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase")
d. 读取HBase上的数据
val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1"
map("RowKey") = "id:5,location:6,city:7"
map("Cols") =
"booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.
doublef"
map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
sparkSession.read.schema(new
StructType(attrs)).format("hbase").options(map.toMap).load().show()
返回结果:
● 提交Spark作业
a. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索用户 指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
b. 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和 user.keytab文件添加到作业的其他依赖文件中,未开启Kerberos认证该步骤 忽略。如图2-39所示:
图2-39 添加依赖文件
c. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
● 通过SQL API访问
– 未开启Kerberos认证样例代码
import org.apache.spark.sql.SparkSession object Test_SparkSql_HBase {
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val sparkSession = SparkSession.builder().getOrCreate() /**
* Create an association table for the DLI association Hbase table */
sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf' BOOLEAN,
'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS (
'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1',
'RowKey'='id:5,location:6,city:7',
'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf, longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')") //*****************************SQL model***********************************
sparkSession.sql("insert into test_hbase values('12345','abc','guiyang',false,null,3,23,2.3,2.34)") sparkSession.sql("select * from test_hbase").collect()
sparkSession.close() }}
– 开启Kerberos认证样例代码
import org.apache.spark.SparkFiles import org.apache.spark.sql.SparkSession
import java.io.{File, FileInputStream, FileOutputStream}
object Test_SparkSql_HBase_Kerberos {
def copyFile2(Input:String)(OutPut:String): Unit ={
val fis = new FileInputStream(Input) val fos = new FileOutputStream(OutPut) val buf = new Array[Byte](1024) var len = 0
while ({len = fis.read(buf);len} != -1){
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val sparkSession = SparkSession.builder().getOrCreate() val sc = sparkSession.sparkContext
sc.addFile("krb5.conf的obs地址") sc.addFile("user.keytab的obs地址") Thread.sleep(10)
val krb5_startfile = new File(SparkFiles.get("krb5.conf")) val keytab_startfile = new File(SparkFiles.get("user.keytab")) val path_user = System.getProperty("user.dir")
val keytab_endfile = new File(path_user + "/" + keytab_startfile.getName) val krb5_endfile = new File(path_user + "/" + krb5_startfile.getName) println(keytab_endfile)
println(krb5_endfile)
var krbinput = SparkFiles.get("krb5.conf") var krboutput = path_user+"/krb5.conf"
copyFile2(krbinput)(krboutput)
var keytabinput = SparkFiles.get("user.keytab") var keytaboutput = path_user+"/user.keytab"
copyFile2(keytabinput)(keytaboutput) Thread.sleep(10)
/**
* Create an association table for the DLI association Hbase table */
sparkSession.sql("CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " +
"using hbase OPTIONS(" + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') ")
//*****************************SQL model***********************************
sparkSession.sql("insert into testhbase values('newtest',true,1,2,3,4,5)") val result = sparkSession.sql("select * from testhbase")
result.show() sparkSession.close() }}
● 通过DataFrame API访问
import scala.collection.mutable
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._
object Test_SparkSql_HBase {
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val sparkSession = SparkSession.builder().getOrCreate()
// Create an association table for the DLI association Hbase table
sparkSession.sql("CREATE TABLE test_hbase('id' STRING, 'location' STRING, 'city' STRING, 'booleanf'
BOOLEAN,
'shortf' SHORT, 'intf' INT, 'longf' LONG, 'floatf' FLOAT,'doublef' DOUBLE) using hbase OPTIONS ( 'ZKHost'='cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName'='table_DupRowkey1',
'RowKey'='id:5,location:6,city:7',
'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.
doublef')")
//*****************************DataFrame model***********************************
// Setting schema
val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType)
val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType)
val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef) // Populate data according to the type of schema
val mutableRow: Seq[Any] = Seq("12345","abc","guiyang",false,null,3,23,2.3,2.34)
val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
// Import the constructed data into Hbase
sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase") // Read data on Hbase
val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1"
map("RowKey") = "id:5,location:6,city:7"
map("Cols") =
"booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doubl ef"
map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
sparkSession.read.schema(new
StructType(attrs)).format("hbase").options(map.toMap).load().collect() sparkSession.close()
}}
2.10.4.3 pyspark 样例代码 开发说明
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
from pyspark.sql import SparkSession
b. 创建会话
sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
● 通过SQL API访问
a. 创建DLI跨源访问HBase的关联表
▪
如果对接的HBase集群未开启Kerberos认证,样例代码参考如下。sparkSession.sql(
"CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
'ZKHost' = '192.168.0.189:2181',\
'TableName' = 'hbtest',\
'RowKey' = 'id:5',\
'Cols' = 'location:info.location,city:detail.city')")
▪
如果对接的HBase集群开启了Kerberos认证,样例代码参考如下。sparkSession.sql(
"CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
'ZKHost' = '192.168.0.189:2181',\
'TableName' = 'hbtest',\
'RowKey' = 'id:5',\
'Cols' = 'location:info.location,city:detail.city',\
'krb5conf' = './krb5.conf',\
'keytab'='./user.keytab',\
'principal' ='krbtest')")
与未开启kerberos认证相比,开启了kerberos认证需要多设置三个参
sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
c. 读取HBase上的数据
sparkSession.sql("select * from testhbase").show()
● 通过DataFrame API访问
a. 创建DLI跨源访问HBase的关联表
sparkSession.sql(
"CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,
floatf FLOAT, doublef DOUBLE) using hbase OPTIONS ( 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName' = 'table_DupRowkey1',
'RowKey' = 'id:5,location:6,city:7',
'Cols' = 'booleanf:CF1.booleanf, shortf:CF1.shortf, intf:CF1.intf, longf:CF1.longf, floatf:CF1.floatf, doublef:CF1.doublef')")
dataList = sparkSession.sparkContext.parallelize([("11111", "beijin", "beijing", False, 4, 3, 23, 2.3, 2.34)])
d. 创建DataFrame
dataFrame = sparkSession.createDataFrame(dataList, schema)
e. 导入数据到HBase
dataFrame.write.insertInto("test_hbase")
f. 读取HBase上的数据
// Set cross-source connection parameters TableName = "table_DupRowkey1"
RowKey = "id:5,location:6,city:7"
Cols =
"booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.
doublef"
ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
cloudtable-cf82-zk1- WY09px9l.cloudtable.com:2181"
// select
jdbcDF = sparkSession.read.schema(schema)\
.format("hbase")\
jdbcDF.filter("id = '12333' or id='11111'").show() 说明
id、location、city:限定了长度,插入数据时须按长度给定数据值,否则查询时会发 生编码格式错误。
g. 操作结果;
● 提交Spark作业
a. 将写好的python代码文件上传至DLI中。控制台操作请参考《数据湖探索用户 指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
b. 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和 user.keytab文件添加到作业的其他依赖文件中,未开启Kerberos认证该步骤 忽略。如图2-40所示:
图2-40 添加依赖文件
c. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>
《创建批处理作业》。
说明
● 提交作业时,需要指定Module模块,名称为:sys.datasource.hbase。
● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说 明”表说明。
● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。
完整示例代码
● 通过SQL API访问MRS HBase – 未开启kerberos认证样例代码
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
from pyspark.sql import SparkSession if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() sparkSession.sql(
"CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
'ZKHost' = '192.168.0.189:2181',\
'TableName' = 'hbtest',\
'RowKey' = 'id:5',\
'Cols' = 'location:info.location,city:detail.city')")
sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show()
# close session sparkSession.stop()
– 开启kerberos认证样例代码
# _*_ coding: utf-8 _*_
from __future__ import print_function from pyspark import SparkFiles from pyspark.sql import SparkSession import shutil
import time import os
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession =
SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate() sc = sparkSession.sparkContext
time.sleep(10)
krb5_startfile = SparkFiles.get("krb5.conf") keytab_startfile = SparkFiles.get("user.keytab") path_user = os.getcwd()
krb5_endfile = path_user + "/" + "krb5.conf"
keytab_endfile = path_user + "/" + "user.keytab"
shutil.copy(krb5_startfile, krb5_endfile) shutil.copy(keytab_startfile, keytab_endfile) time.sleep(20)
sparkSession.sql(
"CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " +
"using hbase OPTIONS(" + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') ")
sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show()
# close session sparkSession.stop()
● 通过DataFrame API访问HBase
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
from pyspark.sql import SparkSession if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() # Createa data table for DLI-associated ct
sparkSession.sql(
"CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,
floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181', 'TableName' = 'table_DupRowkey1',
'RowKey' = 'id:5,location:6,city:7', 'Cols' =
'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doubl ef')")
# Create a DataFrame and initialize the DataFrame data.
dataList = sparkSession.sparkContext.parallelize([("11111", "beijin", "beijing", False, 4, 3, 23, 2.3, 2.34)])
# Setting schema
schema = StructType([StructField("id", StringType()), StructField("location", StringType()),
StructField("city", StringType()),
StructField("booleanf", BooleanType()),
dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the cloudtable-hbase
dataFrame.write.insertInto("test_hbase") # Set cross-source connection parameters TableName = "table_DupRowkey1"
RowKey = "id:5,location:6,city:7"
Cols =
"booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doubl ef"
ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
# Read data on CloudTable-HBase jdbcDF = sparkSession.read.schema(schema)\
.format("hbase")\
jdbcDF.filter("id = '12333' or id='11111'").show() # close session
sparkSession.stop()
2.10.4.4 java 样例代码
开发说明
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
▪
import相关依赖包import org.apache.spark.sql.SparkSession;
b. 创建会话
parkSession = SparkSession.builder().appName("datasource-HBase-MRS").getOrCreate();
● 通过SQL API 访问 – 未开启Kerberos认证
i. 创建DLI跨源访问MRS HBase的关联表,填写连接参数。
sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:
5','Cols'='location:info.location,city:detail.city') ");
ii. 插入数据
sparkSession.sql("insert into testhbase values('12345','abc','guiyang')");
iii. 查询数据
sparkSession.sql("select * from testhbase").show();
插入数据后:
– 开启Kerberos认证
i. 创建DLI跨源访问MRS HBase的关联表,填写连接参数。
sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:
5','Cols'='location:info.location,city:detail.city,'krb5conf'='./krb5.conf','keytab'='./
user.keytab','principal'='krbtest') ");
与未开启kerberos认证相比,开启了kerberos认证需要多设置三个参 数,如表2-26所示。
表2-26 参数说明
参数名称与参数值 参数说明 'krb5conf' = './
krb5.conf' krb5.conf的地址。
'keytab'='./
user.keytab' Keytab的地址。
'principal'
='krbtest' 认证用户名。
krb5.conf和keytab文件获取请具体参考开启Kerberos认证时的相关配置 文件操作说明。
ii. 插入数据
sparkSession.sql("insert into testhbase values('95274','abc','Hongkong')");
iii. 查询数据
sparkSession.sql("select * from testhbase").show();
● 提交Spark作业
a. 将写好的代码文件生成jar包,上传至DLI中。控制台操作请参考《数据湖探索 用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
b. 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和 user.keytab文件添加到作业的依赖文件中,未开启Kerberos认证该步骤忽 略。如图2-41所示: