2.10 使用 Spark 作业跨源访问数据源
2.10.5 对接 OpenTSDB
2.10.5.1 scala 样例代码 开发说明
支持对接CloudTable的OpenTSDB和MRS的OpenTSDB。
● 前提条件
在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指 南》。
● 构造依赖信息,创建SparkSession a. 导入依赖。
涉及到mvn依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
import相关依赖包
import scala.collection.mutable
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._
b. 创建会话。
val sparkSession = SparkSession.builder().getOrCreate()
c. 创建DLI关联跨源访问 OpenTSDB的关联表。
sparkSession.sql("create table opentsdb_test using opentsdb options(
'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ctopentsdb',
'tags'='city,location')")
表2-27 创建表参数
参数 说明
host OpenTSDB连接地址。
● 访问CloudTable OpenTSDB,填写OpenTSDB链接地 址,具体可以登录CloudTable控制台,单击“集群模式
> 集群名称”,在集群信息获取OpenTSDB链接地址。
● 访问MRS OpenTSDB,若使用增强型跨源连接,填写 OpenTSDB所在节点IP与端口,格式为"IP:PORT",
OpenTSDB存在多个节点时,用分号隔开,获取方式请 参考“图 MRS集群OpenTSDB IP信息”和“图 MRS集 群OpenTSDB 端口信息”。若使用经典型跨源,填写经 典型跨源返回的连接地址,管理控制台操作请参考《数 据湖探索用户指南》。
metric 所创建的dli表对应的OpenTSDB中的指标名称。
tags metric对应的标签,用于归类、过滤、快速检索等操作,
可以是1到8个,以“,”分隔,包括对应metric下的所有 tagk的值。
图2-42 MRS 集群 OpenTSDB IP 信息
图2-43 MRS 集群 OpenTSDB 端口信息
● 通过SQL API访问 a. 插入数据
sparkSession.sql("insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)")
b. 查询数据
sparkSession.sql("select * from opentsdb_test").show()
返回结果:
● 通过DataFrame API访问 a. 构造schema
val attrTag1Location = new StructField("location", StringType) val attrTag2Name = new StructField("name", StringType) val attrTimestamp = new StructField("timestamp", LongType) val attrValue = new StructField("value", DoubleType)
val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp, attrValue)
b. 根据schema的类型构造数据
val mutableRow: Seq[Any] = Seq("beijing", "abc", 123456L, 30.0) val rddData: RDD[Row] =
sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
c. 导入数据到OpenTSDB
sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test")
d. 读取OpenTSDB上的数据
val map = new mutable.HashMap[String, String]() map("metric") = "ctopentsdb"
map("tags") = "city,location"
map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
sparkSession.read.format("opentsdb").options(map.toMap).load().show()
返回结果:
● 提交Spark作业
a. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索用户 指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
b. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>
《创建批处理作业》。
说明
● 提交作业时,需要指定Module模块,名称为:sys.datasource.opentsdb。
● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说 明”表说明。
● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。
完整示例代码
● Maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
● 通过SQL API访问
import org.apache.spark.sql.SparkSession object Test_OpenTSDB_CT {
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val sparkSession = SparkSession.builder().getOrCreate() // Create a data table for DLI association OpenTSDB
sparkSession.sql("create table opentsdb_test using opentsdb options(
'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ctopentsdb',
'tags'='city,location')")
//*****************************SQL module***********************************
sparkSession.sql("insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)") sparkSession.sql("select * from opentsdb_test").show()
sparkSession.close() }}
● 通过DataFrame API访问
import scala.collection.mutable
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._
object Test_OpenTSDB_CT {
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val sparkSession = SparkSession.builder().getOrCreate() // Create a data table for DLI association OpenTSDB
sparkSession.sql("create table opentsdb_test using opentsdb options(
'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ctopentsdb',
'tags'='city,location')")
//*****************************DataFrame model***********************************
// Setting schema
val attrTag1Location = new StructField("location", StringType) val attrTag2Name = new StructField("name", StringType) val attrTimestamp = new StructField("timestamp", LongType) val attrValue = new StructField("value", DoubleType)
val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp,attrValue) // Populate data according to the type of schema
val mutableRow: Seq[Any] = Seq("beijing", "abc", 123456L, 30.0)
val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
//Import the constructed data into OpenTSDB
sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test") //Read data on OpenTSDB
val map = new mutable.HashMap[String, String]() map("metric") = "ctopentsdb"
map("tags") = "city,location"
map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
sparkSession.read.format("opentsdb").options(map.toMap).load().show() sparkSession.close()
}}
2.10.5.2 pyspark 样例代码
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession
b. 创建会话
sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()
c. 创建DLI跨源访问 OpenTSDB的关联表
sparkSession.sql("create table opentsdb_test using opentsdb options(
'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', 'metric'='ct_opentsdb',
'tags'='city,location')") 说明
Host、metric、tags三个参数详情讲解可参考表2-27。
● 通过SQL API访问 a. 插入数据
sparkSession.sql("insert into opentsdb_test values('beijing', 'abc', '2021-06-30 18:00:00', 30.0)")
b. 查询数据
result = sparkSession.sql("SELECT * FROM opentsdb_test")
● 通过DataFrame API 访问 a. 构造schema
dataList = sparkSession.sparkContext.parallelize([("beijing", "abc", 123456L, 30.0)])
c. 创建DataFrame
dataFrame = sparkSession.createDataFrame(dataList, schema)
d. 导入数据到OpenTSDB
dataFrame.write.insertInto("opentsdb_test")
e. 读取OpenTSDB上的数据
jdbdDF = sparkSession.read .format("opentsdb")\
.option("Host","opentsdb-3xcl8dir15m58z3.cloudtable.com:4242")\
.option("metric","ctopentsdb")\
.option("tags","city,location")\
.load() jdbdDF.show()
f. 操作结果
● 提交Spark作业
a. 将写好的python代码文件上传至DLI中。控制台操作请参考《数据湖探索用户 指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
b. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>
《创建批处理作业》。
说明
● 提交作业时,需要指定Module模块,名称为:sys.datasource.opentsdb。
● 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说 明”表说明。
● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。
完整示例代码
● 通过SQL API访问MRS的OpenTSDB
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate()
# Create a DLI cross-source association opentsdb data table sparkSession.sql("create table opentsdb_test using opentsdb options(\
'Host'='10.0.0.171:4242',\
'metric'='cts_opentsdb',\
'tags'='city,location')")
sparkSession.sql("insert into opentsdb_test values('beijing', 'abc', '2021-06-30 18:00:00', 30.0)") result = sparkSession.sql("SELECT * FROM opentsdb_test")
result.show() # close session sparkSession.stop()
● 通过DataFrame API访问OpenTSDB
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table
sparkSession.sql("create table opentsdb_test using opentsdb options(
'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
'metric'='ct_opentsdb', 'tags'='city,location')")
# Create a DataFrame and initialize the DataFrame data.
dataList = sparkSession.sparkContext.parallelize([("beijing", "abc", 123456L, 30.0)]) # Setting schema
dataFrame = sparkSession.createDataFrame(dataList, schema) # Set cross-source connection parameters
metric = "ctopentsdb"
tags = "city,location"
Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
# Write data to the cloudtable-opentsdb dataFrame.write.insertInto("opentsdb_test")
# ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.*******
# dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save()
# Read data on CloudTable-OpenTSDB jdbdDF = sparkSession.read\ jdbdDF.show() # close session sparkSession.stop()
2.10.5.3 java 样例代码 开发说明
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
▪
import相关依赖包import org.apache.spark.sql.SparkSession;
b. 创建会话
sparkSession = SparkSession.builder().appName("datasource-opentsdb").getOrCreate();
● 通过SQL API 访问
– 创建DLI跨源访问MRS OpenTSDB的关联表,填写连接参数。
sparkSession.sql("create table opentsdb_new_test using opentsdb
options('Host'='10.0.0.171:4242','metric'='ctopentsdb','tags'='city,location')");
说明
Host、metric、tags三个参数详情讲解可参考表2-27。
– 插入数据
sparkSession.sql("insert into opentsdb_new_test values('Penglai', 'abc', '2021-06-30 18:00:00', 30.0)");
– 查询数据
sparkSession.sql("select * from opentsdb_new_test").show();
插入数据后:
● 提交Spark作业
a. 将写好的代码文件生成jar包,上传至DLI中。控制台操作请参考《数据湖探索 用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
b. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>
《创建批处理作业》。
说明
● 提交作业时,需要指定Module模块,名称为:sys.datasource.opentsdb。
● 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参 数说明”。
● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。
完整示例代码
● Maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
● 通过SQL API访问
import org.apache.spark.sql.SparkSession;
public class java_mrs_opentsdb {
private static SparkSession sparkSession = null;
public static void main(String[] args) { //create a SparkSession session
sparkSession = SparkSession.builder().appName("datasource-opentsdb").getOrCreate();
sparkSession.sql("create table opentsdb_new_test using opentsdb options('Host'='10.0.0.171:4242','metric'='ctopentsdb','tags'='city,location')");
//*****************************SQL module***********************************
sparkSession.sql("insert into opentsdb_new_test values('Penglai', 'abc', '2021-06-30 18:00:00', 30.0)");
System.out.println("Penglai new timestamp");
sparkSession.sql("select * from opentsdb_new_test").show();
sparkSession.close();
} }
2.10.5.4 故障处理
运行 Spark 作业,作业运行失败,作业日志中提示 No respond 错误
● 问题现象
运行Spark作业,作业运行失败,作业日志中提示No respond错误
● 解决方案
重新创建Spark作业,创建作业时需要在“Spark参数(--conf)”中添加配置:
“spark.sql.mrs.opentsdb.ssl.enabled=true”。