2.10 使用 Spark 作业跨源访问数据源
2.10.2 对接 CSS
2.10.2.2 scala 样例代码
在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。
CSS 非安全集群
● 开发说明
– 构造依赖信息,创建SparkSession i. 导入依赖
涉及到的mvn依赖库
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
import相关依赖包
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
ii. 创建会话。
val sparkSession = SparkSession.builder().getOrCreate()
– 通过SQL API访问
i. 创建DLI跨源访问 CSS的关联表。
sparkSession.sql("create table css_table(id int, name string) using css options(
'es.nodes' 'to-css-1174404221-Y2bKVIqY.datasource.com:9200', 'es.nodes.wan.only'='true',
'resource' '/mytest/css')")
表2-21 创建表参数
参数 说明
es.node
s CSS的连接地址,需要先创建跨源连接。具体操作请参考
《数据湖探索用户指南》。
创建经典型跨源连接后,使用经典型跨源连接中返回的连接 地址。
创建增强型跨源连接后,使用CSS提供的"内网访问地址",
格式为"IP1:PORT1,IP2:PORT2"。
参数 说明
● ES 6.X版本中,单个Index只支持唯一type,type名可以自定 义。
● ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支 持自定义。若访问ES 7.X版本时,该参数只需要填写index即
● 相同/index/type下的Document id是唯一的。如果作为 Document id的字段存在重复值,则在执行插入es时,重复id的
sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
iii. 查询数据。
val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show()
插入数据前:
插入数据后:
iv. 删除数据表
sparkSession.sql("drop table css_table")
– 通过DataFrame API访问 i. 连接配置。
val resource = "/mytest/css"
val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
ii. 构造schema,并添加数据。
val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
iii. 导入数据到CSS。
val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) dataFrame_1.write
.format("css")
.option("resource", resource) .option("es.nodes", nodes) .mode(SaveMode.Append) .save()
说明
SaveMode 有四种保存类型:
● ErrorIfExis:如果已经存在数据,则抛出异常。
● Overwrite:如果已经存在数据,则覆盖原数据。
● Append:如果已经存在数据,则追加保存。
● Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在 则创建表”。
iv. 读取CSS上的数据
val dataFrameR =
sparkSession.read.format("css").option("resource",resource).option("es.nodes", nodes).load()
dataFrameR.show()
插入数据前:
插入数据后:
– 提交Spark作业
i. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索 用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
ii. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台 操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API 参考》>《创建批处理作业》。
说明
● 提交作业时,需要指定Module模块,名称为:sys.datasource.css。
● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参 数说明”表说明。
● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中
“表2-请求参数说明”关于“modules”参数的说明。
● 完整示例代码 – Maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
– 通过SQL API访问
import org.apache.spark.sql.SparkSession object Test_SQL_CSS {
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val sparkSession = SparkSession.builder().getOrCreate() // Create a DLI data table for DLI-associated CSS
sparkSession.sql("create table css_table(id long, name string) using css options(
'es.nodes' = 'to-css-1174404217-QG2SwbVV.datasource.com:9200', 'es.nodes.wan.only' = 'true',
'resource' = '/mytest/css')")
//*****************************SQL model***********************************
// Insert data into the DLI data table
sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')") // Read data from DLI data table
val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show()
// drop table
sparkSession.sql("drop table css_table") sparkSession.close()
}}
– 通过DataFrame API访问
import org.apache.spark.sql.{Row, SaveMode, SparkSession};
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType};
object Test_SQL_CSS {
def main(args: Array[String]): Unit = { //Create a SparkSession session.
val sparkSession = SparkSession.builder().getOrCreate()
//*****************************DataFrame model***********************************
// Setting the /index/type of CSS val resource = "/mytest/css"
// Define the cross-origin connection address of the CSS cluster val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
//Setting schema
val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
// Construction data
val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob"))) // Create a DataFrame from RDD and schema
val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) //Write data to the CSS
dataFrame_1.write.format("css") .option("resource", resource) .option("es.nodes", nodes) .mode(SaveMode.Append) .save()
//Read data
val dataFrameR = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load()
dataFrameR.show() spardSession.close() }}
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
import相关依赖包
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
ii. 创建会话,并设置AK/SK。
val sparkSession = SparkSession.builder().getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak)
sparkSession.conf.set("fs.obs.secret.key", sk) sparkSession.conf.set("fs.obs.endpoint", enpoint)
sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
– 通过SQL API访问
i. 创建DLI跨源访问 CSS的关联表。
sparkSession.sql("create table css_table(id int, name string) using css options(
'es.nodes' 'to-css-1174404221-Y2bKVIqY.datasource.com:9200', 'es.nodes.wan.only'='true',
'resource'='/mytest/css', 'es.net.ssl'='true',
'es.net.ssl.keystore.location'='obs://AK:SK@桶名/path/transport-keystore.jks', 'es.net.ssl.keystore.pass'='***',
'es.net.ssl.truststore.location'='obs://AK:SK@桶名/path/truststore.jks', 'es.net.ssl.truststore.pass'='***',
'es.net.http.auth.user'='admin', 'es.net.http.auth.pass'='***')")
表2-22 创建表参数
说明1. ES 6.X版本中,单个Index只支持唯一type,type名可以自 定义。
2. ES 7.X版本中,单个Index将使用“_doc”作为type名,不 再支持自定义。若访问ES 7.X版本时,该参数只需要填写
参数 说明 es.mapping.
id 指定一个字段,其值作为es中Document的id。
说明
● 相同/index/type下的Document id是唯一的。如果作为 Document id的字段存在重复值,则在执行插入es时,重 复id的Document将会被覆盖。
● 该特性可以用作容错解决方案。当插入数据执行一半时,
DLI作业失败,会有部分数据已经插入到es中,这部分为冗 余数据。如果设置了Document id,则在重新执行DLI作业 时,会覆盖上一次的冗余数据。
es.net.ssl 连接安全CSS集群,默认值为“false”。
es.net.ssl.ke ystore.locati on
安全CSS集群的证书,生成的keystore文件在OBS上的 地址。
es.net.ssl.ke
ystore.pass 安全CSS集群的证书,生成的keystore文件时的密码。
es.net.ssl.tr uststore.loc ation
安全CSS集群的证书,生成的truststore文件在OBS上的 地址。
es.net.ssl.tr uststore.pas s
安全CSS集群的证书,生成的truststore文件时的密码。
es.net.http.
auth.user 安全CSS集群的用户名。
es.net.http.
auth.pass 安全CSS集群的密码。
说明
“batch.size.entries”和“batch.size.bytes”分别对数据条数和数据量大小进行 限制。
ii. 插入数据。
sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
iii. 查询数据。
val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show()
插入数据前:
插入数据后:
iv. 删除数据表
sparkSession.sql("drop table css_table")
– 通过DataFrame API访问 i. 连接配置。
val resource = "/mytest/css"
val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
ii. 构造schema,并添加数据。
val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
iii. 导入数据到CSS。
val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) dataFrame_1.write
.format("css")
.option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true")
.option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***")
.option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")
.option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode(SaveMode.Append)
.save()
val dataFrameR = sparkSession.read.format("css") .option("resource",resource)
.option("es.nodes", nodes) .option("es.net.ssl", "true")
.option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***")
.option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")
.option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .load()
dataFrameR.show()
插入数据前:
插入数据后:
– 提交Spark作业
i. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索 用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
ii. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台 操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API 参考》>《创建批处理作业》。
说明
● 提交作业时,需要指定Module模块,名称为:sys.datasource.css。
● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参 数说明”表说明。
● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中
“表2-请求参数说明”关于“modules”参数的说明。
● 完整示例代码 – Maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
– 通过SQL API访问
import org.apache.spark.sql.SparkSession object csshttpstest {
def main(args: Array[String]): Unit = { //Create a SparkSession session.
val sparkSession = SparkSession.builder().getOrCreate() // Create a DLI data table for DLI-associated CSS
sparkSession.sql("create table css_table(id long, name string) using css options('es.nodes' = '192.168.6.204:9200','es.nodes.wan.only' = 'false','resource' = '/
mytest','es.net.ssl'='true','es.net.ssl.keystore.location' = 'obs://xietest1/lzq/
keystore.jks','es.net.ssl.keystore.pass' = '**','es.net.ssl.truststore.location'='obs://xietest1/lzq/
truststore.jks','es.net.ssl.truststore.pass'='**','es.net.http.auth.user'='admin','es.net.http.auth.pass'='*
*')")
//*****************************SQL model***********************************
// Insert data into the DLI data table
sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')") // Read data from DLI data table
val dataFrame = sparkSession.sql("select * from css_table")
dataFrame.show() // drop table
sparkSession.sql("drop table css_table") sparkSession.close()
}}
– 通过DataFrame API访问
import org.apache.spark.sql.{Row, SaveMode, SparkSession};
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType};
object Test_SQL_CSS {
def main(args: Array[String]): Unit = { //Create a SparkSession session.
val sparkSession = SparkSession.builder().getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak)
sparkSession.conf.set("fs.obs.secret.key", sk)
//*****************************DataFrame model***********************************
// Setting the /index/type of CSS val resource = "/mytest/css"
// Define the cross-origin connection address of the CSS cluster val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
//Setting schema
val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
// Construction data
val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob"))) // Create a DataFrame from RDD and schema
val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) //Write data to the CSS
dataFrame_1.write .format("css") .option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true")
.option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***")
.option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")
.option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode(SaveMode.Append)
.save();
//Read data
val dataFrameR = sparkSession.read.format("css") .option("resource", resource)
.option("es.nodes", nodes) .option("es.net.ssl", "true")
.option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***")
.option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")
.option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .load()
dataFrameR.show() spardSession.close() }}