2.10 使用 Spark 作业跨源访问数据源
2.10.8 对接 Mongo
2.10.8.1 scala 样例代码 开发说明
mongo只支持增强型跨源。只能使用包年包月队列。
● 前提条件
在DLI管理控制台上已完成创建增强跨源连接,并绑定包年/包月队列。具体操作 请参考《数据湖探索用户指南》。
● 构造依赖信息,创建SparkSession a. 导入依赖。
涉及到mvn依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
import相关依赖包
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
创建session
val sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()
● 通过SQL API访问
a. 创建DLI跨源访问 mongo的关联表
sparkSession.sql(
"create table test_mongo(id string, name string, age int) using mongo options(
'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db',
'database' = 'test', 'collection' = 'test', 'user' = 'rwuser',
'password' = '######')")
表2-30 创建表参数
参数 说明
url ● url的格式为:
"IP:PORT[,IP:PORT]/[DATABASE][.COLLECTION]
[AUTH_PROPERTIES]"
例如:
"192.168.4.62:8635/test?authSource=admin"
● url需要在mongo(DDS)的连接地址的截取得到。
说明获取到的mongo的连接地址格式为:"协议头://用户名:密码@访问地址:
访问端口/数据库名?authSource=admin"
例如:
mongodb://rwuser:****@192.168.4.62:8635,192.168.5.134:8635/test?
authSource=admin
uri uri的格式为:mongodb://username:pwd@host:8635/db 其中以下参数需要修改为实际值:
● “username”为创建的mongo(DDS)数据库用户名。
● “pwd”为创建的mongo(DDS)数据库用户名对应的密 码。
● “host”为创建的mongo(DDS)数据库实例IP。
● “db”为创建的mongo(DDS)数据库名称。
mongo(DDS)数据库用户创建详见:创建DDS数据库帐户。
databas
e DDS的数据库名,如果在"url"中同时指定了数据库名,则
"url"中的数据库名不生效。
collectio
n DDS中的collection名,如果在"url"中同时指定了collection,则
"url"中的collection不生效。
说明如果在DDS中已存在collection,则建表可以不指定schema信息,DLI会
根据collection中的数据自动生成schema信息。
user 访问DDS集群用户名。
passwor
d 访问DDS集群密码。
图2-46 mongo 的链接地址信息
b. 插入数据
sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)")
c. 查询数据
sparkSession.sql("select * from test_mongo").show()
操作结果
● 通过DataFrame API访问 a. 设置连接参数
val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
val uri = "mongodb://username:pwd@host:8635/db"
val user = "rwuser"
val database = "test"
val collection = "test"
val password = "######"
b. 构造schema
val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
c. 构造DataFrame
val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema)
d. 导入数据到mongo
dataFrame.write.format("mongo") .option("url", url)
.option("uri", uri)
.option("database", database) .option("collection", collection) .option("user", user)
.option("password", password) .mode(SaveMode.Overwrite) .save()
说明
保存类型:Overwrite、Append、ErrorIfExis、Ignore 四种。
e. 读取mongo上的数据
val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url)
.option("uri", uri)
.option("database", database) .option("collection", collection) .option("user", user)
.option("password", password) .load()
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
● 通过SQL API访问
import org.apache.spark.sql.SparkSession object TestMongoSql {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().getOrCreate() sparkSession.sql(
"create table test_mongo(id string, name string, age int) using mongo options(
'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db',
'database' = 'test', 'collection' = 'test', 'user' = 'rwuser',
'password' = '######')")
sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)") sparkSession.sql("select * from test_mongo").show()
sparkSession.close() }}
● 通过DataFrame API访问
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object Test_Mongo_SparkSql {
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val spark = SparkSession.builder().appName("mongodbTest").getOrCreate() // Set the connection configuration parameters.
val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
val uri = "mongodb://username:pwd@host:8635/db"
val user = "rwuser"
val database = "test"
val collection = "test"
val password = "######"
// Setting up the schema
val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
// Setting up the DataFrame
val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema)
// Write data to mongo
dataFrame.write.format("mongo") .option("url", url)
.option("uri", uri)
.option("database", database) .option("collection", collection) .option("user", user)
.option("password", password) .mode(SaveMode.Overwrite) .save()
// Reading data from mongo
val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url)
.option("uri", uri)
.option("database", database) .option("collection", collection) .option("user", user)
.option("password", password) .load()
jdbcDF.show() spark.close() }}
2.10.8.2 pyspark 样例代码
开发说明
mongo只支持增强型跨源。只能使用包年包月队列。
● 前提条件
在DLI管理控制台上已完成创建增强跨源连接,并绑定包年/包月队列。具体操作 请参考《数据湖探索用户指南》。
● 通过DataFrame API 访问 a. import相关依赖
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
b. 创建session
sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
c. 设置连接参数
url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
uri = "mongodb://username:pwd@host:8635/db"
user = "rwuser"
database = "test"
collection = "test"
password = "######"
说明
详细的参数说明请参考表2-30。
d. 创建DataFrame
dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)]) schema = StructType([StructField("id", IntegerType(), False),
StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) dataFrame = sparkSession.createDataFrame(dataList, schema)
e. 导入数据到mongo
dataFrame.write.format("mongo") .option("url", url)
.option("uri", uri) .option("user",user)
.option("password",password) .option("database",database) .option("collection",collection) .mode("Overwrite")
.save()
f. 读取Mongo上的数据
jdbcDF = sparkSession.read .format("mongo") .option("url", url) .option("uri", uri) .option("user",user)
.option("password",password) .option("database",database) .option("collection",collection) .load()
jdbcDF.show()
g. 操作结果
● 通过SQL API 访问
a. 创建DLI关联跨源访问 Mongo的关联表。
sparkSession.sql(
"create table test_mongo(id string, name string, age int) using mongo options(
'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db',
'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')")
说明
详细的参数说明请参考表2-30。
b. 插入数据
sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)")
c. 查询数据
sparkSession.sql("select * from test_mongo").show()
● 提交Spark作业
● 通过DataFrame API 访问
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Create a DataFrame and initialize the DataFrame data.
dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)]) # Setting schema
schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False), StructField("age", IntegerType(), False)])
# Create a DataFrame from RDD and schema
dataFrame = sparkSession.createDataFrame(dataList, schema) # Setting connection parameters
url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
uri = "mongodb://username:pwd@host:8635/db"
user = "rwuser"
database = "test"
collection = "test"
password = "######"
# Write data to the mongodb table dataFrame.write.format("mongo") .option("url", url)
.option("uri", uri) .option("user",user)
.option("password",password) .option("database",database) .option("collection",collection) .mode("Overwrite").save()
# Read data
jdbcDF = sparkSession.read.format("mongo") .option("url", url)
.option("uri", uri) .option("user",user)
.option("password",password) .option("database",database) .option("collection",collection) .load()
jdbcDF.show() # close session sparkSession.stop()
● 通过SQL API 访问
from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Createa data table for DLI - associated mongo
sparkSession.sql(
"create table test_mongo(id string, name string, age int) using mongo options(
'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db',
'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')") # Insert data into the DLI-table
sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)") # Read data from DLI-table
sparkSession.sql("select * from test_mongo").show() # close session
sparkSession.stop()
2.10.8.3 java 样例代码 开发说明
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
b. 创建会话
SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("datasource-mongo"));
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
SQLContext sqlContext = new SQLContext(javaSparkContext);
● 通过DataFrame API 访问 a. 读取json数据为DataFrame
JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList("{\"id\":\"5\",\"name\":
\"zhangsan\",\"age\":\"23\"}"));
Dataset<Row> dataFrame = sqlContext.read().json(javaRDD);
b. 设置连接参数
String url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin";
String uri = "mongodb://username:pwd@host:8635/db";
String user = "rwuser";
String database = "test";
String collection = "test";
String password = "######";
说明
详细的参数说明请参考表2-30。
c. 导入数据到mongo
dataFrame.write().format("mongo") .option("url",url)
.option("uri",uri)
.option("database",database) .option("collection",collection) .option("user",user)
.option("password",password) .mode(SaveMode.Overwrite) .save();
d. 读取mongo上的数据
sqlContext.read().format("mongo") .option("url",url)
.option("uri",uri)
.option("database",database) .option("collection",collection) .option("user",user)
.option("password",password) .load().show();