• 沒有找到結果。

2.10 使用 Spark 作业跨源访问数据源

2.10.8 对接 Mongo

2.10.8.1 scala 样例代码 开发说明

mongo只支持增强型跨源。只能使用包年包月队列。

● 前提条件

在DLI管理控制台上已完成创建增强跨源连接,并绑定包年/包月队列。具体操作 请参考《数据湖探索用户指南》。

● 构造依赖信息,创建SparkSession a. 导入依赖。

涉及到mvn依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

import相关依赖包

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

创建session

val sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()

● 通过SQL API访问

a. 创建DLI跨源访问 mongo的关联表

sparkSession.sql(

"create table test_mongo(id string, name string, age int) using mongo options(

'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db',

'database' = 'test', 'collection' = 'test', 'user' = 'rwuser',

'password' = '######')")

2-30 创建表参数

参数 说明

url ● url的格式为:

"IP:PORT[,IP:PORT]/[DATABASE][.COLLECTION]

[AUTH_PROPERTIES]"

例如:

"192.168.4.62:8635/test?authSource=admin"

● url需要在mongo(DDS)的连接地址的截取得到。

说明获取到的mongo的连接地址格式为:"协议头://用户名:密码@访问地址:

访问端口/数据库名?authSource=admin"

例如:

mongodb://rwuser:****@192.168.4.62:8635,192.168.5.134:8635/test?

authSource=admin

uri uri的格式为:mongodb://username:pwd@host:8635/db 其中以下参数需要修改为实际值:

● “username”为创建的mongo(DDS)数据库用户名。

● “pwd”为创建的mongo(DDS)数据库用户名对应的密 码。

● “host”为创建的mongo(DDS)数据库实例IP。

● “db”为创建的mongo(DDS)数据库名称。

mongo(DDS)数据库用户创建详见:创建DDS数据库帐户。

databas

e DDS的数据库名,如果在"url"中同时指定了数据库名,则

"url"中的数据库名不生效。

collectio

n DDS中的collection名,如果在"url"中同时指定了collection,则

"url"中的collection不生效。

说明如果在DDS中已存在collection,则建表可以不指定schema信息,DLI会

根据collection中的数据自动生成schema信息。

user 访问DDS集群用户名。

passwor

d 访问DDS集群密码。

2-46 mongo 的链接地址信息

b. 插入数据

sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)")

c. 查询数据

sparkSession.sql("select * from test_mongo").show()

操作结果

● 通过DataFrame API访问 a. 设置连接参数

val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"

val uri = "mongodb://username:pwd@host:8635/db"

val user = "rwuser"

val database = "test"

val collection = "test"

val password = "######"

b. 构造schema

val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))

c. 构造DataFrame

val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema)

d. 导入数据到mongo

dataFrame.write.format("mongo") .option("url", url)

.option("uri", uri)

.option("database", database) .option("collection", collection) .option("user", user)

.option("password", password) .mode(SaveMode.Overwrite) .save()

说明

保存类型:Overwrite、Append、ErrorIfExis、Ignore 四种。

e. 读取mongo上的数据

val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url)

.option("uri", uri)

.option("database", database) .option("collection", collection) .option("user", user)

.option("password", password) .load()

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

● 通过SQL API访问

import org.apache.spark.sql.SparkSession object TestMongoSql {

def main(args: Array[String]): Unit = {

val sparkSession = SparkSession.builder().getOrCreate() sparkSession.sql(

"create table test_mongo(id string, name string, age int) using mongo options(

'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db',

'database' = 'test', 'collection' = 'test', 'user' = 'rwuser',

'password' = '######')")

sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)") sparkSession.sql("select * from test_mongo").show()

sparkSession.close() }}

● 通过DataFrame API访问

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object Test_Mongo_SparkSql {

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val spark = SparkSession.builder().appName("mongodbTest").getOrCreate() // Set the connection configuration parameters.

val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"

val uri = "mongodb://username:pwd@host:8635/db"

val user = "rwuser"

val database = "test"

val collection = "test"

val password = "######"

// Setting up the schema

val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))

// Setting up the DataFrame

val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema)

// Write data to mongo

dataFrame.write.format("mongo") .option("url", url)

.option("uri", uri)

.option("database", database) .option("collection", collection) .option("user", user)

.option("password", password) .mode(SaveMode.Overwrite) .save()

// Reading data from mongo

val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url)

.option("uri", uri)

.option("database", database) .option("collection", collection) .option("user", user)

.option("password", password) .load()

jdbcDF.show() spark.close() }}

2.10.8.2 pyspark 样例代码

开发说明

mongo只支持增强型跨源。只能使用包年包月队列。

● 前提条件

在DLI管理控制台上已完成创建增强跨源连接,并绑定包年/包月队列。具体操作 请参考《数据湖探索用户指南》。

● 通过DataFrame API 访问 a. import相关依赖

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession

b. 创建session

sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()

c. 设置连接参数

url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"

uri = "mongodb://username:pwd@host:8635/db"

user = "rwuser"

database = "test"

collection = "test"

password = "######"

说明

详细的参数说明请参考表2-30。

d. 创建DataFrame

dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)]) schema = StructType([StructField("id", IntegerType(), False),

StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) dataFrame = sparkSession.createDataFrame(dataList, schema)

e. 导入数据到mongo

dataFrame.write.format("mongo") .option("url", url)

.option("uri", uri) .option("user",user)

.option("password",password) .option("database",database) .option("collection",collection) .mode("Overwrite")

.save()

f. 读取Mongo上的数据

jdbcDF = sparkSession.read .format("mongo") .option("url", url) .option("uri", uri) .option("user",user)

.option("password",password) .option("database",database) .option("collection",collection) .load()

jdbcDF.show()

g. 操作结果

● 通过SQL API 访问

a. 创建DLI关联跨源访问 Mongo的关联表。

sparkSession.sql(

"create table test_mongo(id string, name string, age int) using mongo options(

'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db',

'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')")

说明

详细的参数说明请参考表2-30。

b. 插入数据

sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)")

c. 查询数据

sparkSession.sql("select * from test_mongo").show()

● 提交Spark作业

● 通过DataFrame API 访问

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession

if __name__ == "__main__":

# Create a SparkSession session.

sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Create a DataFrame and initialize the DataFrame data.

dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)]) # Setting schema

schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False), StructField("age", IntegerType(), False)])

# Create a DataFrame from RDD and schema

dataFrame = sparkSession.createDataFrame(dataList, schema) # Setting connection parameters

url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"

uri = "mongodb://username:pwd@host:8635/db"

user = "rwuser"

database = "test"

collection = "test"

password = "######"

# Write data to the mongodb table dataFrame.write.format("mongo") .option("url", url)

.option("uri", uri) .option("user",user)

.option("password",password) .option("database",database) .option("collection",collection) .mode("Overwrite").save()

# Read data

jdbcDF = sparkSession.read.format("mongo") .option("url", url)

.option("uri", uri) .option("user",user)

.option("password",password) .option("database",database) .option("collection",collection) .load()

jdbcDF.show() # close session sparkSession.stop()

● 通过SQL API 访问

from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__":

# Create a SparkSession session.

sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Createa data table for DLI - associated mongo

sparkSession.sql(

"create table test_mongo(id string, name string, age int) using mongo options(

'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db',

'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')") # Insert data into the DLI-table

sparkSession.sql("insert into test_mongo values('3', 'zhangsan',23)") # Read data from DLI-table

sparkSession.sql("select * from test_mongo").show() # close session

sparkSession.stop()

2.10.8.3 java 样例代码 开发说明

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SQLContext;

import org.apache.spark.sql.SaveMode;

b. 创建会话

SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("datasource-mongo"));

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);

SQLContext sqlContext = new SQLContext(javaSparkContext);

● 通过DataFrame API 访问 a. 读取json数据为DataFrame

JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList("{\"id\":\"5\",\"name\":

\"zhangsan\",\"age\":\"23\"}"));

Dataset<Row> dataFrame = sqlContext.read().json(javaRDD);

b. 设置连接参数

String url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin";

String uri = "mongodb://username:pwd@host:8635/db";

String user = "rwuser";

String database = "test";

String collection = "test";

String password = "######";

说明

详细的参数说明请参考表2-30。

c. 导入数据到mongo

dataFrame.write().format("mongo") .option("url",url)

.option("uri",uri)

.option("database",database) .option("collection",collection) .option("user",user)

.option("password",password) .mode(SaveMode.Overwrite) .save();

d. 读取mongo上的数据

sqlContext.read().format("mongo") .option("url",url)

.option("uri",uri)

.option("database",database) .option("collection",collection) .option("user",user)

.option("password",password) .load().show();