• 沒有找到結果。

2.10 使用 Spark 作业跨源访问数据源

2.10.7 对接 Redis

}

2.10.7 对接 Redis

2.10.7.1 scala 样例代码 开发说明

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

<dependency>

<groupId>redis.clients</groupId>

<artifactId>jedis</artifactId>

<version>3.1.0</version>

</dependency>

<dependency>

<groupId>com.redislabs</groupId>

<artifactId>spark-redis</artifactId>

<version>2.4.0</version>

</dependency>

import相关依赖包

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import org.apache.spark.sql.types._

import com.redislabs.provider.redis._

import scala.reflect.runtime.universe._

import org.apache.spark.{SparkConf, SparkContext}

● 通过DataFrame API访问 a. 创建session

val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate()

b. 构造schema

//method one

var schema = StructType(Seq(StructField("name", StringType, false), StructField("age", IntegerType, false)))

var rdd = sparkSession.sparkContext.parallelize(Seq(Row("abc",34),Row("Bob",19))) var dataFrame = sparkSession.createDataFrame(rdd, schema)

// //method two

// var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23)))

// val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2",

"age")

// //method three

// case class Person(name: String, age: Int)

// val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45))) 说明

case class Person(name: String, age: Int) 须写在object之外,可参考•通过 DataFrame API访问。

c. 导入数据到Redis

dataFrame .write .format("redis")

.option("host","192.168.4.199") .option("port","6379") .option("table","person") .option("password","******") .option("key.column","name") .mode(SaveMode.Overwrite) .save()

table 对应Redis中的Key或Hash Key。

● 插入redis数据时必填。

● 查询redis数据时与“keys.pattern”参数二选一。

keys.patt

ern 使用正则表达式匹配多个Key或Hash Key。该参数仅用于查询 时使用。查询redis数据时与“table”参数二选一。

参数 描述 scan.coun

t 每批次读取的数据记录数,默认为100。如果在读取过程中,

redis集群中的CPU使用率还有提升空间,可以调大该参数。

iterator.gr ouping.siz e

每批次插入的数据记录数,默认为100。如果在插入过程中,

redis集群中的CPU使用率还有提升空间,可以调大该参数。

timeout 连接redis的超时时间,单位ms,默认值2000(2秒超时)。

说明

● 保存类型:Overwrite、Append、ErrorIfExis、Ignore 四种

● 如果需要保存嵌套的DataFrame,则通过“.option("model","binary")”进行保

● 指定数据过期时间:“.option("ttl",1000)”;秒为单位2-45 获取 redis 的 ip 及端口

d. 读取Redis上的数据

sparkSession.read .format("redis")

.option("host","192.168.4.199") .option("port","6379") .option("table", "person") .option("password","######") .option("key.column","name") .load()

.show()

操作结果:

● RDD操作 a. 创建连接

val sparkContext = new SparkContext(new SparkConf() .setAppName("datasource_redis")

.set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "######")

.set("spark.driver.allowMultipleContexts","true")) 说明

spark.driver.allowMultipleContexts:true 表示在启动多个context时,只使用当前 的,防止发生context调用冲突。

b. 插入数据 i. String 保存

val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]

(("high","111"), ("together","333"))) sparkContext.toRedisKV(stringRedisData)

ii. hash 保存

val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]

(("saprk","123"), ("data","222")))

sparkContext.toRedisHASH(hashRedisData, "hashRDD")

iii. list 保存

val data = List(("school","112"), ("tom","333"))

val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString())) sparkContext.toRedisLIST(listRedisData, "listRDD")

iv. set 保存

val setData = Set(("bob","133"),("kity","322"))

val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)]

(setData.mkString))

sparkContext.toRedisSET(setRedisData, "setRDD")

v. zset 保存

val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]

(("whight","234"), ("bobo","343")))

sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")

c. 查询数据

i. 通过遍历key查询

val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD",

"listRDD", "setRDD","zsetRDD"), 6) keysRDD.getKV().collect().foreach(println)

a. 创建DLI关联跨源访问 Redis的关联表。

sparkSession.sql(

"CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (

'host' = '192.168.4.199', 'port' = '6379',

'password' = '######', table 'person')".stripMargin)

b. 插入数据

sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)

c. 查询数据

sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)

● 提交Spark作业

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

<dependency>

<groupId>redis.clients</groupId>

<artifactId>jedis</artifactId>

<version>3.1.0</version>

</dependency>

<dependency>

<groupId>com.redislabs</groupId>

<artifactId>spark-redis</artifactId>

<version>2.4.0</version>

</dependency>

● 通过SQL API访问

import org.apache.spark.sql.{SparkSession};

object Test_Redis_SQL {

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate();

sparkSession.sql(

"CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (

'host' = '192.168.4.199', 'port' = '6379', 'password' = '******',table 'person')".stripMargin) sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)

sparkSession.close() }}

● 通过DataFrame API访问

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import org.apache.spark.sql.types._

object Test_Redis_SparkSql {

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate() // Set cross-source connection parameters.

val host = "192.168.4.199"

val port = "6379"

val table = "person"

val auth = "######"

val key_column = "name"

// ******** setting DataFrame ********

// method one

var schema = StructType(Seq(StructField("name", StringType, false),StructField("age", IntegerType, false)))

var rdd = sparkSession.sparkContext.parallelize(Seq(Row("huawei",34),Row("Bob",19))) var dataFrame = sparkSession.createDataFrame(rdd, schema)

// // method two

// var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23)))

// val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2", "age") // // method three

// val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45))) // Write data to redis

dataFrame.write.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).mode(SaveMode.Overwrite).save()

// Read data from redis

sparkSession.read.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).load().show()

// Close session sparkSession.close() }}

// methoe two

// case class Person(name: String, age: Int)

● RDD 操作

import com.redislabs.provider.redis._

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object Test_Redis_RDD {

def main(args: Array[String]): Unit = { // Create a SparkSession session.

val sparkContext = new SparkContext(new SparkConf() .setAppName("datasource_redis")

.set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "@@@@@@")

.set("spark.driver.allowMultipleContexts","true")) //***************** Write data to redis **********************

// Save String type data

val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]

(("high","111"), ("together","333"))) sparkContext.toRedisKV(stringRedisData) // Save Hash type data

val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]

(("saprk","123"), ("data","222")))

sparkContext.toRedisHASH(hashRedisData, "hashRDD")

// Save List type data

val data = List(("school","112"), ("tom","333"));

val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString())) sparkContext.toRedisLIST(listRedisData, "listRDD")

// Save Set type data

val setData = Set(("bob","133"),("kity","322"))

val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)](setData.mkString)) sparkContext.toRedisSET(setRedisData, "setRDD")

// Save ZSet type data

val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]

(("whight","234"), ("bobo","343")))

sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")

// ***************************** Read data from redis *******************************************

// Traverse the specified key and get the value

val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD", "listRDD",

"setRDD","zsetRDD"), 6)

keysRDD.getKV().collect().foreach(println) keysRDD.getHash().collect().foreach(println) keysRDD.getList().collect().foreach(println) keysRDD.getSet().collect().foreach(println) keysRDD.getZSet().collect().foreach(println) // Read String type data//

val stringRDD = sparkContext.fromRedisKV("keyPattern *")

sparkContext.fromRedisKV(Array( "high","together")).collect().foreach{println}

// Read Hash type data//

val hashRDD = sparkContext.fromRedisHash("keyPattern *")

sparkContext.fromRedisHash(Array("hashRDD")).collect().foreach{println}

// Read List type data//

val listRDD = sparkContext.fromRedisList("keyPattern *")

sparkContext.fromRedisList(Array("listRDD")).collect().foreach{println}

// Read Set type data//

val setRDD = sparkContext.fromRedisSet("keyPattern *")

sparkContext.fromRedisSet(Array("setRDD")).collect().foreach{println}

// Read ZSet type data//

val zsetRDD = sparkContext.fromRedisZSet("keyPattern *")

sparkContext.fromRedisZSet(Array("zsetRDD")).collect().foreach{println}

// close session sparkContext.stop() }}

2.10.7.2 pyspark 样例代码 开发说明

redis只支持增强型跨源。只能使用包年包月队列。

● 前提条件

在DLI管理控制台上已完成创建增强跨源连接,并绑定包年包月队列。具体操作请 参考《数据湖探索用户指南》。

● 通过DataFrame API 访问 a. import相关依赖

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession

b. 创建session

sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate()

c. 设置连接参数

host = "192.168.4.199"

port = "6379"

table = "person"

auth = "@@@@@@"

d. 创建DataFrame i. 方式一

dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)]) schema = StructType([StructField("id", IntegerType(), False),

StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) dataFrame = sparkSession.createDataFrame(dataList, schema)

ii. 方式二

jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)])

dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2",

"name").withColumnRenamed("_3", "age")

e. 导入数据到redis

dataFrame.write .format("redis") .option("host", host) .option("port", port) .option("table", table) .option("password", auth) .mode("Overwrite") .save()

说明

● 保存类型:Overwrite、Append、ErrorIfExis、Ignore 四种

● 如果需要指定key,则通过“.option("key.column","name")”指定,name为列

sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()

g. 操作结果

● 通过SQL API 访问

a. 创建DLI关联跨源访问 Redis的关联表。

sparkSession.sql(

"CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (

'host' = '192.168.4.199', 'port' = '6379', 'password' = '######', table 'person')".stripMargin)

b. 插入数据

sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)

c. 查询数据

sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)

● 提交Spark作业

● 通过DataFrame API 访问

# _*_ coding: utf-8 _*_

from __future__ import print_function

from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession

if __name__ == "__main__":

# Create a SparkSession session.

sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate() # Set cross-source connection parameters.

host = "192.168.4.199"

port = "6379"

table = "person"

auth = "######"

# Create a DataFrame and initialize the DataFrame data.

# ******* method noe *********

dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)])

schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False),StructField("age", IntegerType(), False)])

dataFrame_one = sparkSession.createDataFrame(dataList, schema) # ****** method two ******

# jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)])

# dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2",

"name").withColumnRenamed("_3", "age") # Write data to the redis table

dataFrame.write.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).mode("Overwrite").save()

# Read data

sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()

# close session sparkSession.stop()

● 通过SQL API 访问

# _*_ coding: utf-8 _*_

from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__":

# Create a SparkSession

sparkSession = SparkSession.builder.appName("datasource_redis").getOrCreate()

sparkSession.sql("CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (

'host' = '192.168.4.199', 'port' = '6379',

'password' = '######', 'table'= 'person')".stripMargin);

sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)

# close session sparkSession.stop()

2.10.7.3 java 样例代码

开发说明

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

SparkConf sparkConf = new SparkConf();

sparkConf.setAppName("datasource-redis") .set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "******")

.set("spark.driver.allowMultipleContexts","true");

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

SQLContext sqlContext = new SQLContext(javaSparkContext);

● 通过DataFrame API 访问 a. 读取json数据为DataFrame

JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(

"{\"id\":\"1\",\"name\":\"zhangsan\",\"age\":\"18\"}", "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));

Dataset dataFrame = sqlContext.read().json(javaRDD);

b. 构造redis连接配置参数

Map map = new HashMap<String, String>();

map.put("table","person");

map.put("key.column","id");

c. 保存数据到redis

dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();

d. 读取redis中数据

sqlContext.read().format("redis").options(map).load().show();

e. 操作结果

● 提交Spark作业

a. 将写好的java代码文件上传至DLI中。控制台操作请参考《数据湖探索用户指 南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

b. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>

《创建批处理作业》。

说明

● 提交作业时,需要指定Module模块,名称为:sys.datasource.redis。

● 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参 数说明”。

● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。

完整示例代码

public class Test_Redis_DaraFrame { public static void main(String[] args) { //create a SparkSession session

SparkConf sparkConf = new SparkConf();

sparkConf.setAppName("datasource-redis") .set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "******")

.set("spark.driver.allowMultipleContexts","true");

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

SQLContext sqlContext = new SQLContext(javaSparkContext);

//Read RDD in JSON format to create DataFrame

JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(

"{\"id\":\"1\",\"name\":\"zhangsan\",\"age\":\"18\"}", "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));

Dataset dataFrame = sqlContext.read().json(javaRDD);

Map map = new HashMap<String, String>();

map.put("table","person");

map.put("key.column","id");

dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();

sqlContext.read().format("redis").options(map).load().show();

}}

2.10.7.4 故障处理

问题 1:将代码直接复制到 py 文件中后,'\'后出现“unexpected character”问 题。

● 问题

将代码直接复制到py文件中后,'\'后出现“unexpected character”问题。

● 解决方案

将'\'后面的缩进或是空格全部删除。