5.6 使用 DIS Flink Connector 上传与下载数据
5.6.3 自定义 Flink Streaming 作业
获取 DIS Flink Connector Demo
步骤1 这里获取“dis-flink-connector-X.X.X
.zip”压缩包。解压“dis-flink-connector-X.X.X.zip”压缩包,解压之后获得以下目录:
● “huaweicloud-dis-flink-connector-demo”目录包含一个Maven工程样例。
----结束
Intellij IDEA 中导入 Demo 工程
以IntelliJ IDEA社区版为例,说明如何编写Flink作业。请先确保在IDEA上已经正确配 置好。
● JDK 1.8+
● Scala-sdk-2.11
● Maven 3.3.*
步骤1 打开IntelliJIDEA,选择“File > New > > Project from Existing Sources...”。选择解 压至本地的huaweicloud-dis-flink-connector-demo目录,单击确认。
步骤2 选择导入Maven工程,保持默认配置,一直单击下一步即可。
步骤3 单击“New Window”,在新窗口打开此工程。
步骤4 在pom.xml上单击右键,选择“Maven > Reimport”,重新引入maven依赖库。
----结束
验证 Flink Streaming Source 作业
实际场景中,Flink Streaming作业需要提交在Flink集群上运行,但本次验证只介绍在 本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集(如 MRS集群)并提交作业验证。
步骤1 使用注册帐户登录DIS控制台。
步骤2 单击管理控制台左上角的 ,选择区域和项目。
步骤3 参考开通DIS通道申请开通DIS通道,并持续上传数据到新创建的DIS通道。本次范例上 传的内容为hello world。
步骤4 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并 保存。
步骤5 右键单击pom.xml,选择“Maven > Reimport”,重新引入依赖包。
步骤6 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSourceJavaExample'”。
步骤7 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :
DIS网关地址 Region名称 AK SK ProjectID 通道名称 起始位置 消费者标识 如在华北-北京1测试,则参数示例为
https://dis.${region}.myhuaweicloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME latest GROUP_ID
参数顺序与含义在示例代码中有,可以参考。
// DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com String endpoint;
// DIS服务所在区域ID,如 cn-north-1 String region;
// 用户的AK String ak;
// 用户的SK String sk;
// 用户的项目ID String projectId;
// DIS通道名称 String streamName;
// 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略;如果存在有效 的Checkpoint,则会从此Checkpoint开始继续消费
// 取值有: LATEST 从最新的数据开始消费,此策略会忽略通道中已有数据 // EARLIEST 从最老的数据开始消费,此策略会获取通道中所有的有效数据 String startingOffsets;
// 消费组标识,同一个消费组下的不同客户端可以同时消费同一个通道 String groupId;
说明
● 断点消费 必须指定checkpoint 或者按照如下設置自动打上消费点 。 disConfig.put(DisConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
disConfig.put(DisConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
disConfig.put(DisConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "LATEST"); //LATEST 表示 从最新的数据开始消费 。
● 如果都不设置 默认的是从lastest开始消费。
最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。
步骤8 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSourceJavaExample'”,即可启动作业。
步骤9 如果没有其他错误,将从DIS读取数据并输出到控制台,示例如下:
2> hello world 2> hello world 2> hello world
步骤10 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释 (防止以后打包会把flink依赖也打进来),然后停止数据上传程序。
----结束
验证 Flink Streaming Sink 作业
实际场景中,Flink作业需要提交在Flink集群上运行,但本次验证只介绍在本地IDE上 测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集群(如MRS集群)
并提交作业验证。
步骤1 使用注册帐户登录DIS控制台。
步骤2 单击管理控制台左上角的 ,选择区域和项目。
步骤3 参考开通DIS通道申请开通DIS通道。
步骤4 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并 保存。
步骤5 右键单击pom.xml,选择“Maven > Reimport”,重新引入依赖包。
步骤6 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSinkJavaExample'”。
步骤7 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :
DIS网关地址 Region名称 AK SK ProjectID 通道名称
https://dis.${region}.myhuaweicloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME
参数顺序与含义在示例代码中有,可以参考。
// DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com String endpoint;
// DIS服务所在区域ID,如 cn-north-1 String region;
// 用户的AK String ak;
// 用户的SK String sk;
// 用户的项目ID String projectId;
// DIS通道名称 String streamName;
最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。
步骤8 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSinkJavaExample'”,即可启动作业。
步骤9 如果没有其他错误,可以到DIS控制台通道监控页面查看数据是否上传成功。
步骤10 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释 (防止以后打包会把flink依赖也打进来),然后停止数据上传程序。
----结束