• 沒有找到結果。

2.10 使用 Spark 作业跨源访问数据源

2.10.5 对接 OpenTSDB

2.10.5.1 scala 样例代码 开发说明

支持对接CloudTable的OpenTSDB和MRS的OpenTSDB。

● 前提条件

在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指 南》。

● 构造依赖信息,创建SparkSession a. 导入依赖。

涉及到mvn依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

import相关依赖包

import scala.collection.mutable

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._

b. 创建会话。

val sparkSession = SparkSession.builder().getOrCreate()

c. 创建DLI关联跨源访问 OpenTSDB的关联表。

sparkSession.sql("create table opentsdb_test using opentsdb options(

'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ctopentsdb',

'tags'='city,location')")

2-27 创建表参数

参数 说明

host OpenTSDB连接地址。

● 访问CloudTable OpenTSDB,填写OpenTSDB链接地 址,具体可以登录CloudTable控制台,单击“集群模式

> 集群名称”,在集群信息获取OpenTSDB链接地址。

● 访问MRS OpenTSDB,若使用增强型跨源连接,填写 OpenTSDB所在节点IP与端口,格式为"IP:PORT",

OpenTSDB存在多个节点时,用分号隔开,获取方式请 参考“图 MRS集群OpenTSDB IP信息”和“图 MRS集 群OpenTSDB 端口信息”。若使用经典型跨源,填写经 典型跨源返回的连接地址,管理控制台操作请参考《数 据湖探索用户指南》。

metric 所创建的dli表对应的OpenTSDB中的指标名称。

tags metric对应的标签,用于归类、过滤、快速检索等操作,

可以是1到8个,以“,”分隔,包括对应metric下的所有 tagk的值。

2-42 MRS 集群 OpenTSDB IP 信息

2-43 MRS 集群 OpenTSDB 端口信息

● 通过SQL API访问 a. 插入数据

sparkSession.sql("insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)")

b. 查询数据

sparkSession.sql("select * from opentsdb_test").show()

返回结果:

● 通过DataFrame API访问 a. 构造schema

val attrTag1Location = new StructField("location", StringType) val attrTag2Name = new StructField("name", StringType) val attrTimestamp = new StructField("timestamp", LongType) val attrValue = new StructField("value", DoubleType)

val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp, attrValue)

b. 根据schema的类型构造数据

val mutableRow: Seq[Any] = Seq("beijing", "abc", 123456L, 30.0) val rddData: RDD[Row] =

sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)

c. 导入数据到OpenTSDB

sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test")

d. 读取OpenTSDB上的数据

val map = new mutable.HashMap[String, String]() map("metric") = "ctopentsdb"

map("tags") = "city,location"

map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"

sparkSession.read.format("opentsdb").options(map.toMap).load().show()

返回结果:

● 提交Spark作业

a. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索用户 指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

b. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>

《创建批处理作业》。

说明

● 提交作业时,需要指定Module模块,名称为:sys.datasource.opentsdb。

● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说 明”表说明。

● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。

完整示例代码

● Maven依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

● 通过SQL API访问

import org.apache.spark.sql.SparkSession object Test_OpenTSDB_CT {

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val sparkSession = SparkSession.builder().getOrCreate() // Create a data table for DLI association OpenTSDB

sparkSession.sql("create table opentsdb_test using opentsdb options(

'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ctopentsdb',

'tags'='city,location')")

//*****************************SQL module***********************************

sparkSession.sql("insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)") sparkSession.sql("select * from opentsdb_test").show()

sparkSession.close() }}

● 通过DataFrame API访问

import scala.collection.mutable

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._

object Test_OpenTSDB_CT {

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val sparkSession = SparkSession.builder().getOrCreate() // Create a data table for DLI association OpenTSDB

sparkSession.sql("create table opentsdb_test using opentsdb options(

'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ctopentsdb',

'tags'='city,location')")

//*****************************DataFrame model***********************************

// Setting schema

val attrTag1Location = new StructField("location", StringType) val attrTag2Name = new StructField("name", StringType) val attrTimestamp = new StructField("timestamp", LongType) val attrValue = new StructField("value", DoubleType)

val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp,attrValue) // Populate data according to the type of schema

val mutableRow: Seq[Any] = Seq("beijing", "abc", 123456L, 30.0)

val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)

//Import the constructed data into OpenTSDB

sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test") //Read data on OpenTSDB

val map = new mutable.HashMap[String, String]() map("metric") = "ctopentsdb"

map("tags") = "city,location"

map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"

sparkSession.read.format("opentsdb").options(map.toMap).load().show() sparkSession.close()

}}

2.10.5.2 pyspark 样例代码

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession

b. 创建会话

sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()

c. 创建DLI跨源访问 OpenTSDB的关联表

sparkSession.sql("create table opentsdb_test using opentsdb options(

'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ct_opentsdb',

'tags'='city,location')") 说明

Host、metric、tags三个参数详情讲解可参考表2-27。

● 通过SQL API访问 a. 插入数据

sparkSession.sql("insert into opentsdb_test values('beijing', 'abc', '2021-06-30 18:00:00', 30.0)")

b. 查询数据

result = sparkSession.sql("SELECT * FROM opentsdb_test")

● 通过DataFrame API 访问 a. 构造schema

dataList = sparkSession.sparkContext.parallelize([("beijing", "abc", 123456L, 30.0)])

c. 创建DataFrame

dataFrame = sparkSession.createDataFrame(dataList, schema)

d. 导入数据到OpenTSDB

dataFrame.write.insertInto("opentsdb_test")

e. 读取OpenTSDB上的数据

jdbdDF = sparkSession.read .format("opentsdb")\

.option("Host","opentsdb-3xcl8dir15m58z3.cloudtable.com:4242")\

.option("metric","ctopentsdb")\

.option("tags","city,location")\

.load() jdbdDF.show()

f. 操作结果

● 提交Spark作业

a. 将写好的python代码文件上传至DLI中。控制台操作请参考《数据湖探索用户 指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

b. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>

《创建批处理作业》。

说明

● 提交作业时,需要指定Module模块,名称为:sys.datasource.opentsdb。

● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说 明”表说明。

● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。

完整示例代码

● 通过SQL API访问MRS的OpenTSDB

# _*_ coding: utf-8 _*_

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession

if __name__ == "__main__":

# Create a SparkSession session.

sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()

# Create a DLI cross-source association opentsdb data table sparkSession.sql("create table opentsdb_test using opentsdb options(\

'Host'='10.0.0.171:4242',\

'metric'='cts_opentsdb',\

'tags'='city,location')")

sparkSession.sql("insert into opentsdb_test values('beijing', 'abc', '2021-06-30 18:00:00', 30.0)") result = sparkSession.sql("SELECT * FROM opentsdb_test")

result.show() # close session sparkSession.stop()

● 通过DataFrame API访问OpenTSDB

# _*_ coding: utf-8 _*_

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession

if __name__ == "__main__":

# Create a SparkSession session.

sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table

sparkSession.sql("create table opentsdb_test using opentsdb options(

'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',

'metric'='ct_opentsdb', 'tags'='city,location')")

# Create a DataFrame and initialize the DataFrame data.

dataList = sparkSession.sparkContext.parallelize([("beijing", "abc", 123456L, 30.0)]) # Setting schema

dataFrame = sparkSession.createDataFrame(dataList, schema) # Set cross-source connection parameters

metric = "ctopentsdb"

tags = "city,location"

Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"

# Write data to the cloudtable-opentsdb dataFrame.write.insertInto("opentsdb_test")

# ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.*******

# dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save()

# Read data on CloudTable-OpenTSDB jdbdDF = sparkSession.read\ jdbdDF.show() # close session sparkSession.stop()

2.10.5.3 java 样例代码 开发说明

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

import相关依赖包

import org.apache.spark.sql.SparkSession;

b. 创建会话

sparkSession = SparkSession.builder().appName("datasource-opentsdb").getOrCreate();

● 通过SQL API 访问

– 创建DLI跨源访问MRS OpenTSDB的关联表,填写连接参数。

sparkSession.sql("create table opentsdb_new_test using opentsdb

options('Host'='10.0.0.171:4242','metric'='ctopentsdb','tags'='city,location')");

说明

Host、metric、tags三个参数详情讲解可参考表2-27。

– 插入数据

sparkSession.sql("insert into opentsdb_new_test values('Penglai', 'abc', '2021-06-30 18:00:00', 30.0)");

– 查询数据

sparkSession.sql("select * from opentsdb_new_test").show();

插入数据后:

● 提交Spark作业

a. 将写好的代码文件生成jar包,上传至DLI中。控制台操作请参考《数据湖探索 用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

b. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>

《创建批处理作业》。

说明

● 提交作业时,需要指定Module模块,名称为:sys.datasource.opentsdb。

● 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参 数说明”。

● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。

完整示例代码

● Maven依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

● 通过SQL API访问

import org.apache.spark.sql.SparkSession;

public class java_mrs_opentsdb {

private static SparkSession sparkSession = null;

public static void main(String[] args) { //create a SparkSession session

sparkSession = SparkSession.builder().appName("datasource-opentsdb").getOrCreate();

sparkSession.sql("create table opentsdb_new_test using opentsdb options('Host'='10.0.0.171:4242','metric'='ctopentsdb','tags'='city,location')");

//*****************************SQL module***********************************

sparkSession.sql("insert into opentsdb_new_test values('Penglai', 'abc', '2021-06-30 18:00:00', 30.0)");

System.out.println("Penglai new timestamp");

sparkSession.sql("select * from opentsdb_new_test").show();

sparkSession.close();

} }

2.10.5.4 故障处理

运行 Spark 作业,作业运行失败,作业日志中提示 No respond 错误

● 问题现象

运行Spark作业,作业运行失败,作业日志中提示No respond错误

● 解决方案

重新创建Spark作业,创建作业时需要在“Spark参数(--conf)”中添加配置:

“spark.sql.mrs.opentsdb.ssl.enabled=true”。