5.5 使用 DIS Spark Streaming 下载数据
5.5.3 自定义 SparkStreaming 作业
获取 DIS Spark Streaming Demo
步骤1 这里获取“dis-spark-streaming-X.X.X
.zip”压缩包。解压“dis-spark-streaming-X.X.X.zip”压缩包,解压之后获得以下目录:
“dis-spark-streaming-demo”目录包含一个Maven工程样例。
----结束
编写 SparkStreaming 作业
以IntelliJ IDEA社区版为例,说明如何编写SparkStreaming作业。请先确保在IDEA上 已经正确配置好
● JDK 1.8+
● Scala-sdk-2.11
● Maven 3.3.*
步骤1 打开IntelliJIDEA,选择“File > Open”。选择解压至本地的dis-spark-streaming-demo目录,双击pom.xml。
步骤2 当弹出如下对话框,请选择“Open as Project”,作为工程打开。
步骤3 单击“New Window”,在新窗口打开此工程。
步骤4 在新打开的IDEA窗口中,单击“File > Settings”。
步骤5 在搜索框搜索maven,找到maven的配置,请确保Maven home directory(Maven安 装路径),User settings file (settings.xml文件位置)和Local repository(本地仓库地址) 配置正确。
说明
若不正确,请修改,否则步骤2中安装的sdk无法找到。
步骤6 打开DISSparkStreamingExample文件,如果IDEA提示“No Scala SDK in
module”,请单击旁边的“Setup Scala SDK”,会显示Scala SDK列表(如果没有可 以创建一个并关联scala路径),选择2.11版本的即可。
步骤7 在pom.xml上单击右键,选择“Maven > Reimport”,重新引入maven依赖库。
步骤8 此时IDEA打开的DISSparkStreamingExample文件内没有错误即表示开发环境配置成 功,此文件的逻辑是读取DIS通道中的数据并统计每个单词出现次数。
1. DISSparkStreamingExample是一个使用Assign模式的样例,不具备停止再启动时 从上一次停止位置开始的能力。使用到的SDK构造方法如下:
ConsumerStrategies.Assign[String, String](streamName, params, startingOffsets)
● streamName为DIS通道名称。
● params为参数Map集合,至少包括endpoint(DIS网关地址),region(区域),
ak(用户ak),sk(用户sk),projectId(用户项目ID)。
● startingOffsets为读取DIS数据的起始位置,LATEST表示从最新的数据开始读取;
EARLIEST表示从最旧的数据开始读取;如果要指定每个分区的精确起始位置,则 可以写为json字符串,例如{"0":23,"1":-1,"2":-2}表示第0分区起始位置是23,第1 分区从最新数据的位置开始,第2分区从最老数据的位置开始,如果有分区没有指 定位置,则默认从最新数据位置开始。
2. DISSparkStreamingSubscribeExample是一个使用Subscribe模式的样例,具备停 止再启动时从上一次停止位置开始的能力。使用到的SDK构造方法如下:
ConsumerStrategies.Subscribe[String, String](Array(streamName), params)
● streamName为DIS通道名称
● params为参数Map集合,至少包括endpoint(DIS网关地址),region(区域),
ak(用户ak),sk(用户sk),projectId(用户项目ID),group.id(app名称,表 示某一个消费组);还可以包含auto.offset.reset,参数含义同Assign模式下的 startingOffsets;另外一个参数enable.auto.commit,设置为true会自动每隔 5000ms(可通过设置auto.commit.interval.ms修改)提交一次offset,设置为false 则不自动提交,用户可以手动调用commitAsync提交,参见示例代码中如下部分
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// commit offset to DIS async.
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }
----结束
验证 sparkStreaming 作业
实际场景中,SparkStreming作业需要提交在Spark集群上运行,但本次验证只介绍在 本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集群(如 MRS集群)并提交作业验证。
步骤1 使用注册帐户登录DIS控制台。
步骤2 单击管理控制台左上角的 ,选择区域和项目。
步骤3 参考开通DIS通道申请开通DIS通道,并持续上传数据到新创建的DIS通道。本次范例上 传的内容为hello world。
步骤4 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并 保存。
步骤5 右键单击pom.xml,选择“Maven > Reimport”,重新引入依赖包。
步骤6 在DISSparkStreamingExample文件内任意地方,右键选择“Create 'DISSparkStreamingExample'”。
步骤7 在打开的配置页面中,“VM options”中输入-Dspark.master=local[*],表示用 local模式运行spark作业;“Program arguments”中输入运行参数,格式为 :
DIS网关地址Region名称AK SK ProjectID 通道名称起始位置Streaming批次时间 如在华北-北京1测试,则参数示例为
https://dis.${region}.myhuaweicloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME latest 10
参数顺序与含义在示例代码中有,可以参考。
最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。
步骤8 在DISSparkStreamingExample文件内任意地方,右键选择“Run 'DISSparkStreamingExample'”,即可启动作业。
步骤9 启动过程中会报一个hadoop binary path的错误,这个可以忽略。
18/08/28 10:26:10 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
步骤10 如果没有其他错误,则作业会每隔duration运行一个批次,从DIS读取此批次内的数据 并输出结果,示例如下:
---Time: 1535423650000 ms
---(hello,30)
(world.,30)
步骤11 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释 (防止以后打包会把spark依赖也打进来),然后停止数据上传程序。
----结束