2.10 使用 Spark 作业跨源访问数据源
2.10.7 对接 Redis
}
2.10.7 对接 Redis
2.10.7.1 scala 样例代码 开发说明
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis</artifactId>
<version>2.4.0</version>
</dependency>
import相关依赖包
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types._
import com.redislabs.provider.redis._
import scala.reflect.runtime.universe._
import org.apache.spark.{SparkConf, SparkContext}
● 通过DataFrame API访问 a. 创建session
val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate()
b. 构造schema
//method one
var schema = StructType(Seq(StructField("name", StringType, false), StructField("age", IntegerType, false)))
var rdd = sparkSession.sparkContext.parallelize(Seq(Row("abc",34),Row("Bob",19))) var dataFrame = sparkSession.createDataFrame(rdd, schema)
// //method two
// var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23)))
// val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2",
"age")
// //method three
// case class Person(name: String, age: Int)
// val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45))) 说明
case class Person(name: String, age: Int) 须写在object之外,可参考•通过 DataFrame API访问。
c. 导入数据到Redis
dataFrame .write .format("redis")
.option("host","192.168.4.199") .option("port","6379") .option("table","person") .option("password","******") .option("key.column","name") .mode(SaveMode.Overwrite) .save()
table 对应Redis中的Key或Hash Key。
● 插入redis数据时必填。
● 查询redis数据时与“keys.pattern”参数二选一。
keys.patt
ern 使用正则表达式匹配多个Key或Hash Key。该参数仅用于查询 时使用。查询redis数据时与“table”参数二选一。
参数 描述 scan.coun
t 每批次读取的数据记录数,默认为100。如果在读取过程中,
redis集群中的CPU使用率还有提升空间,可以调大该参数。
iterator.gr ouping.siz e
每批次插入的数据记录数,默认为100。如果在插入过程中,
redis集群中的CPU使用率还有提升空间,可以调大该参数。
timeout 连接redis的超时时间,单位ms,默认值2000(2秒超时)。
说明
● 保存类型:Overwrite、Append、ErrorIfExis、Ignore 四种
● 如果需要保存嵌套的DataFrame,则通过“.option("model","binary")”进行保 存
● 指定数据过期时间:“.option("ttl",1000)”;秒为单位 图2-45 获取 redis 的 ip 及端口
d. 读取Redis上的数据
sparkSession.read .format("redis")
.option("host","192.168.4.199") .option("port","6379") .option("table", "person") .option("password","######") .option("key.column","name") .load()
.show()
操作结果:
● RDD操作 a. 创建连接
val sparkContext = new SparkContext(new SparkConf() .setAppName("datasource_redis")
.set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "######")
.set("spark.driver.allowMultipleContexts","true")) 说明
spark.driver.allowMultipleContexts:true 表示在启动多个context时,只使用当前 的,防止发生context调用冲突。
b. 插入数据 i. String 保存
val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]
(("high","111"), ("together","333"))) sparkContext.toRedisKV(stringRedisData)
ii. hash 保存
val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]
(("saprk","123"), ("data","222")))
sparkContext.toRedisHASH(hashRedisData, "hashRDD")
iii. list 保存
val data = List(("school","112"), ("tom","333"))
val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString())) sparkContext.toRedisLIST(listRedisData, "listRDD")
iv. set 保存
val setData = Set(("bob","133"),("kity","322"))
val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)]
(setData.mkString))
sparkContext.toRedisSET(setRedisData, "setRDD")
v. zset 保存
val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]
(("whight","234"), ("bobo","343")))
sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")
c. 查询数据
i. 通过遍历key查询
val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD",
"listRDD", "setRDD","zsetRDD"), 6) keysRDD.getKV().collect().foreach(println)
a. 创建DLI关联跨源访问 Redis的关联表。
sparkSession.sql(
"CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
'host' = '192.168.4.199', 'port' = '6379',
'password' = '######', table 'person')".stripMargin)
b. 插入数据
sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
c. 查询数据
sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
● 提交Spark作业
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis</artifactId>
<version>2.4.0</version>
</dependency>
● 通过SQL API访问
import org.apache.spark.sql.{SparkSession};
object Test_Redis_SQL {
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate();
sparkSession.sql(
"CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
'host' = '192.168.4.199', 'port' = '6379', 'password' = '******',table 'person')".stripMargin) sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
sparkSession.close() }}
● 通过DataFrame API访问
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types._
object Test_Redis_SparkSql {
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val sparkSession = SparkSession.builder().appName("datasource_redis").getOrCreate() // Set cross-source connection parameters.
val host = "192.168.4.199"
val port = "6379"
val table = "person"
val auth = "######"
val key_column = "name"
// ******** setting DataFrame ********
// method one
var schema = StructType(Seq(StructField("name", StringType, false),StructField("age", IntegerType, false)))
var rdd = sparkSession.sparkContext.parallelize(Seq(Row("huawei",34),Row("Bob",19))) var dataFrame = sparkSession.createDataFrame(rdd, schema)
// // method two
// var jdbcDF= sparkSession.createDataFrame(Seq(("Jack",23)))
// val dataFrame = jdbcDF.withColumnRenamed("_1", "name").withColumnRenamed("_2", "age") // // method three
// val dataFrame = sparkSession.createDataFrame(Seq(Person("John", 30), Person("Peter", 45))) // Write data to redis
dataFrame.write.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).mode(SaveMode.Overwrite).save()
// Read data from redis
sparkSession.read.format("redis").option("host",host).option("port",port).option("table", table).option("password",auth).load().show()
// Close session sparkSession.close() }}
// methoe two
// case class Person(name: String, age: Int)
● RDD 操作
import com.redislabs.provider.redis._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test_Redis_RDD {
def main(args: Array[String]): Unit = { // Create a SparkSession session.
val sparkContext = new SparkContext(new SparkConf() .setAppName("datasource_redis")
.set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "@@@@@@")
.set("spark.driver.allowMultipleContexts","true")) //***************** Write data to redis **********************
// Save String type data
val stringRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]
(("high","111"), ("together","333"))) sparkContext.toRedisKV(stringRedisData) // Save Hash type data
val hashRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]
(("saprk","123"), ("data","222")))
sparkContext.toRedisHASH(hashRedisData, "hashRDD")
// Save List type data
val data = List(("school","112"), ("tom","333"));
val listRedisData:RDD[String] = sparkContext.parallelize(Seq[(String)](data.toString())) sparkContext.toRedisLIST(listRedisData, "listRDD")
// Save Set type data
val setData = Set(("bob","133"),("kity","322"))
val setRedisData:RDD[(String)] = sparkContext.parallelize(Seq[(String)](setData.mkString)) sparkContext.toRedisSET(setRedisData, "setRDD")
// Save ZSet type data
val zsetRedisData:RDD[(String,String)] = sparkContext.parallelize(Seq[(String,String)]
(("whight","234"), ("bobo","343")))
sparkContext.toRedisZSET(zsetRedisData, "zsetRDD")
// ***************************** Read data from redis *******************************************
// Traverse the specified key and get the value
val keysRDD = sparkContext.fromRedisKeys(Array("high","together", "hashRDD", "listRDD",
"setRDD","zsetRDD"), 6)
keysRDD.getKV().collect().foreach(println) keysRDD.getHash().collect().foreach(println) keysRDD.getList().collect().foreach(println) keysRDD.getSet().collect().foreach(println) keysRDD.getZSet().collect().foreach(println) // Read String type data//
val stringRDD = sparkContext.fromRedisKV("keyPattern *")
sparkContext.fromRedisKV(Array( "high","together")).collect().foreach{println}
// Read Hash type data//
val hashRDD = sparkContext.fromRedisHash("keyPattern *")
sparkContext.fromRedisHash(Array("hashRDD")).collect().foreach{println}
// Read List type data//
val listRDD = sparkContext.fromRedisList("keyPattern *")
sparkContext.fromRedisList(Array("listRDD")).collect().foreach{println}
// Read Set type data//
val setRDD = sparkContext.fromRedisSet("keyPattern *")
sparkContext.fromRedisSet(Array("setRDD")).collect().foreach{println}
// Read ZSet type data//
val zsetRDD = sparkContext.fromRedisZSet("keyPattern *")
sparkContext.fromRedisZSet(Array("zsetRDD")).collect().foreach{println}
// close session sparkContext.stop() }}
2.10.7.2 pyspark 样例代码 开发说明
redis只支持增强型跨源。只能使用包年包月队列。
● 前提条件
在DLI管理控制台上已完成创建增强跨源连接,并绑定包年包月队列。具体操作请 参考《数据湖探索用户指南》。
● 通过DataFrame API 访问 a. import相关依赖
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
b. 创建session
sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate()
c. 设置连接参数
host = "192.168.4.199"
port = "6379"
table = "person"
auth = "@@@@@@"
d. 创建DataFrame i. 方式一
dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)]) schema = StructType([StructField("id", IntegerType(), False),
StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) dataFrame = sparkSession.createDataFrame(dataList, schema)
ii. 方式二
jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)])
dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2",
"name").withColumnRenamed("_3", "age")
e. 导入数据到redis
dataFrame.write .format("redis") .option("host", host) .option("port", port) .option("table", table) .option("password", auth) .mode("Overwrite") .save()
说明
● 保存类型:Overwrite、Append、ErrorIfExis、Ignore 四种
● 如果需要指定key,则通过“.option("key.column","name")”指定,name为列
sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()
g. 操作结果
● 通过SQL API 访问
a. 创建DLI关联跨源访问 Redis的关联表。
sparkSession.sql(
"CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
'host' = '192.168.4.199', 'port' = '6379', 'password' = '######', table 'person')".stripMargin)
b. 插入数据
sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin)
c. 查询数据
sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
● 提交Spark作业
● 通过DataFrame API 访问
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate() # Set cross-source connection parameters.
host = "192.168.4.199"
port = "6379"
table = "person"
auth = "######"
# Create a DataFrame and initialize the DataFrame data.
# ******* method noe *********
dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)])
schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False),StructField("age", IntegerType(), False)])
dataFrame_one = sparkSession.createDataFrame(dataList, schema) # ****** method two ******
# jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)])
# dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2",
"name").withColumnRenamed("_3", "age") # Write data to the redis table
dataFrame.write.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).mode("Overwrite").save()
# Read data
sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show()
# close session sparkSession.stop()
● 通过SQL API 访问
# _*_ coding: utf-8 _*_
from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__":
# Create a SparkSession
sparkSession = SparkSession.builder.appName("datasource_redis").getOrCreate()
sparkSession.sql("CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (
'host' = '192.168.4.199', 'port' = '6379',
'password' = '######', 'table'= 'person')".stripMargin);
sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println)
# close session sparkSession.stop()
2.10.7.3 java 样例代码
开发说明
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("datasource-redis") .set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "******")
.set("spark.driver.allowMultipleContexts","true");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(javaSparkContext);
● 通过DataFrame API 访问 a. 读取json数据为DataFrame
JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
"{\"id\":\"1\",\"name\":\"zhangsan\",\"age\":\"18\"}", "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
Dataset dataFrame = sqlContext.read().json(javaRDD);
b. 构造redis连接配置参数
Map map = new HashMap<String, String>();
map.put("table","person");
map.put("key.column","id");
c. 保存数据到redis
dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();
d. 读取redis中数据
sqlContext.read().format("redis").options(map).load().show();
e. 操作结果
● 提交Spark作业
a. 将写好的java代码文件上传至DLI中。控制台操作请参考《数据湖探索用户指 南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
b. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作 请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>
《创建批处理作业》。
说明
● 提交作业时,需要指定Module模块,名称为:sys.datasource.redis。
● 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参 数说明”。
● 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请 求参数说明”关于“modules”参数的说明。
完整示例代码
public class Test_Redis_DaraFrame { public static void main(String[] args) { //create a SparkSession session
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("datasource-redis") .set("spark.redis.host", "192.168.4.199") .set("spark.redis.port", "6379") .set("spark.redis.auth", "******")
.set("spark.driver.allowMultipleContexts","true");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(javaSparkContext);
//Read RDD in JSON format to create DataFrame
JavaRDD<String> javaRDD = javaSparkContext.parallelize(Arrays.asList(
"{\"id\":\"1\",\"name\":\"zhangsan\",\"age\":\"18\"}", "{\"id\":\"2\",\"name\":\"lisi\",\"age\":\"21\"}"));
Dataset dataFrame = sqlContext.read().json(javaRDD);
Map map = new HashMap<String, String>();
map.put("table","person");
map.put("key.column","id");
dataFrame.write().format("redis").options(map).mode(SaveMode.Overwrite).save();
sqlContext.read().format("redis").options(map).load().show();
}}
2.10.7.4 故障处理
问题 1:将代码直接复制到 py 文件中后,'\'后出现“unexpected character”问 题。
● 问题
将代码直接复制到py文件中后,'\'后出现“unexpected character”问题。
● 解决方案
将'\'后面的缩进或是空格全部删除。