• 沒有找到結果。

2.10 使用 Spark 作业跨源访问数据源

2.10.2 对接 CSS

2.10.2.2 scala 样例代码

在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。

CSS 非安全集群

● 开发说明

– 构造依赖信息,创建SparkSession i. 导入依赖

涉及到的mvn依赖库

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

import相关依赖包

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

ii. 创建会话。

val sparkSession = SparkSession.builder().getOrCreate()

– 通过SQL API访问

i. 创建DLI跨源访问 CSS的关联表。

sparkSession.sql("create table css_table(id int, name string) using css options(

'es.nodes' 'to-css-1174404221-Y2bKVIqY.datasource.com:9200', 'es.nodes.wan.only'='true',

'resource' '/mytest/css')")

2-21 创建表参数

参数 说明

es.node

s CSS的连接地址,需要先创建跨源连接。具体操作请参考

《数据湖探索用户指南》。

创建经典型跨源连接后,使用经典型跨源连接中返回的连接 地址。

创建增强型跨源连接后,使用CSS提供的"内网访问地址",

格式为"IP1:PORT1,IP2:PORT2"。

参数 说明

● ES 6.X版本中,单个Index只支持唯一type,type名可以自定 义。

● ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支 持自定义。若访问ES 7.X版本时,该参数只需要填写index即

● 相同/index/type下的Document id是唯一的。如果作为 Document id的字段存在重复值,则在执行插入es时,重复id的

sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")

iii. 查询数据。

val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show()

插入数据前:

插入数据后:

iv. 删除数据表

sparkSession.sql("drop table css_table")

– 通过DataFrame API访问 i. 连接配置。

val resource = "/mytest/css"

val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"

ii. 构造schema,并添加数据。

val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))

val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))

iii. 导入数据到CSS。

val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) dataFrame_1.write

.format("css")

.option("resource", resource) .option("es.nodes", nodes) .mode(SaveMode.Append) .save()

说明

SaveMode 有四种保存类型:

● ErrorIfExis:如果已经存在数据,则抛出异常。

● Overwrite:如果已经存在数据,则覆盖原数据。

● Append:如果已经存在数据,则追加保存。

● Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在 则创建表”。

iv. 读取CSS上的数据

val dataFrameR =

sparkSession.read.format("css").option("resource",resource).option("es.nodes", nodes).load()

dataFrameR.show()

插入数据前:

插入数据后:

– 提交Spark作业

i. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索 用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

ii. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台 操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API 参考》>《创建批处理作业》。

说明

● 提交作业时,需要指定Module模块,名称为:sys.datasource.css。

● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参 数说明”表说明。

● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中

“表2-请求参数说明”关于“modules”参数的说明。

● 完整示例代码 – Maven依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

– 通过SQL API访问

import org.apache.spark.sql.SparkSession object Test_SQL_CSS {

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val sparkSession = SparkSession.builder().getOrCreate() // Create a DLI data table for DLI-associated CSS

sparkSession.sql("create table css_table(id long, name string) using css options(

'es.nodes' = 'to-css-1174404217-QG2SwbVV.datasource.com:9200', 'es.nodes.wan.only' = 'true',

'resource' = '/mytest/css')")

//*****************************SQL model***********************************

// Insert data into the DLI data table

sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')") // Read data from DLI data table

val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show()

// drop table

sparkSession.sql("drop table css_table") sparkSession.close()

}}

– 通过DataFrame API访问

import org.apache.spark.sql.{Row, SaveMode, SparkSession};

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType};

object Test_SQL_CSS {

def main(args: Array[String]): Unit = { //Create a SparkSession session.

val sparkSession = SparkSession.builder().getOrCreate()

//*****************************DataFrame model***********************************

// Setting the /index/type of CSS val resource = "/mytest/css"

// Define the cross-origin connection address of the CSS cluster val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"

//Setting schema

val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))

// Construction data

val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob"))) // Create a DataFrame from RDD and schema

val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) //Write data to the CSS

dataFrame_1.write.format("css") .option("resource", resource) .option("es.nodes", nodes) .mode(SaveMode.Append) .save()

//Read data

val dataFrameR = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load()

dataFrameR.show() spardSession.close() }}

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

import相关依赖包

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

ii. 创建会话,并设置AK/SK。

val sparkSession = SparkSession.builder().getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak)

sparkSession.conf.set("fs.obs.secret.key", sk) sparkSession.conf.set("fs.obs.endpoint", enpoint)

sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")

– 通过SQL API访问

i. 创建DLI跨源访问 CSS的关联表。

sparkSession.sql("create table css_table(id int, name string) using css options(

'es.nodes' 'to-css-1174404221-Y2bKVIqY.datasource.com:9200', 'es.nodes.wan.only'='true',

'resource'='/mytest/css', 'es.net.ssl'='true',

'es.net.ssl.keystore.location'='obs://AK:SK@桶名/path/transport-keystore.jks', 'es.net.ssl.keystore.pass'='***',

'es.net.ssl.truststore.location'='obs://AK:SK@桶名/path/truststore.jks', 'es.net.ssl.truststore.pass'='***',

'es.net.http.auth.user'='admin', 'es.net.http.auth.pass'='***')")

2-22 创建表参数

说明1. ES 6.X版本中,单个Index只支持唯一type,type名可以自 定义。

2. ES 7.X版本中,单个Index将使用“_doc”作为type名,不 再支持自定义。若访问ES 7.X版本时,该参数只需要填写

参数 说明 es.mapping.

id 指定一个字段,其值作为es中Document的id。

说明

● 相同/index/type下的Document id是唯一的。如果作为 Document id的字段存在重复值,则在执行插入es时,重 复id的Document将会被覆盖。

● 该特性可以用作容错解决方案。当插入数据执行一半时,

DLI作业失败,会有部分数据已经插入到es中,这部分为冗 余数据。如果设置了Document id,则在重新执行DLI作业 时,会覆盖上一次的冗余数据。

es.net.ssl 连接安全CSS集群,默认值为“false”。

es.net.ssl.ke ystore.locati on

安全CSS集群的证书,生成的keystore文件在OBS上的 地址。

es.net.ssl.ke

ystore.pass 安全CSS集群的证书,生成的keystore文件时的密码。

es.net.ssl.tr uststore.loc ation

安全CSS集群的证书,生成的truststore文件在OBS上的 地址。

es.net.ssl.tr uststore.pas s

安全CSS集群的证书,生成的truststore文件时的密码。

es.net.http.

auth.user 安全CSS集群的用户名。

es.net.http.

auth.pass 安全CSS集群的密码。

说明

“batch.size.entries”和“batch.size.bytes”分别对数据条数和数据量大小进行 限制。

ii. 插入数据。

sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")

iii. 查询数据。

val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show()

插入数据前:

插入数据后:

iv. 删除数据表

sparkSession.sql("drop table css_table")

– 通过DataFrame API访问 i. 连接配置。

val resource = "/mytest/css"

val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"

ii. 构造schema,并添加数据。

val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))

val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))

iii. 导入数据到CSS。

val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) dataFrame_1.write

.format("css")

.option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true")

.option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***")

.option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")

.option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode(SaveMode.Append)

.save()

val dataFrameR = sparkSession.read.format("css") .option("resource",resource)

.option("es.nodes", nodes) .option("es.net.ssl", "true")

.option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***")

.option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")

.option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .load()

dataFrameR.show()

插入数据前:

插入数据后:

– 提交Spark作业

i. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索 用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

ii. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台 操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API 参考》>《创建批处理作业》。

说明

● 提交作业时,需要指定Module模块,名称为:sys.datasource.css。

● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参 数说明”表说明。

● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中

“表2-请求参数说明”关于“modules”参数的说明。

● 完整示例代码 – Maven依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

– 通过SQL API访问

import org.apache.spark.sql.SparkSession object csshttpstest {

def main(args: Array[String]): Unit = { //Create a SparkSession session.

val sparkSession = SparkSession.builder().getOrCreate() // Create a DLI data table for DLI-associated CSS

sparkSession.sql("create table css_table(id long, name string) using css options('es.nodes' = '192.168.6.204:9200','es.nodes.wan.only' = 'false','resource' = '/

mytest','es.net.ssl'='true','es.net.ssl.keystore.location' = 'obs://xietest1/lzq/

keystore.jks','es.net.ssl.keystore.pass' = '**','es.net.ssl.truststore.location'='obs://xietest1/lzq/

truststore.jks','es.net.ssl.truststore.pass'='**','es.net.http.auth.user'='admin','es.net.http.auth.pass'='*

*')")

//*****************************SQL model***********************************

// Insert data into the DLI data table

sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')") // Read data from DLI data table

val dataFrame = sparkSession.sql("select * from css_table")

dataFrame.show() // drop table

sparkSession.sql("drop table css_table") sparkSession.close()

}}

– 通过DataFrame API访问

import org.apache.spark.sql.{Row, SaveMode, SparkSession};

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType};

object Test_SQL_CSS {

def main(args: Array[String]): Unit = { //Create a SparkSession session.

val sparkSession = SparkSession.builder().getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak)

sparkSession.conf.set("fs.obs.secret.key", sk)

//*****************************DataFrame model***********************************

// Setting the /index/type of CSS val resource = "/mytest/css"

// Define the cross-origin connection address of the CSS cluster val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"

//Setting schema

val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))

// Construction data

val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob"))) // Create a DataFrame from RDD and schema

val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) //Write data to the CSS

dataFrame_1.write .format("css") .option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true")

.option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***")

.option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")

.option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode(SaveMode.Append)

.save();

//Read data

val dataFrameR = sparkSession.read.format("css") .option("resource", resource)

.option("es.nodes", nodes) .option("es.net.ssl", "true")

.option("es.net.ssl.keystore.location", "obs://AK:SK@桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***")

.option("es.net.ssl.truststore.location", "obs://AK:SK@桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")

.option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .load()

dataFrameR.show() spardSession.close() }}