• 沒有找到結果。

5.6 使用 DIS Flink Connector 上传与下载数据

5.6.3 自定义 Flink Streaming 作业

获取 DIS Flink Connector Demo

步骤1 这里获取“dis-flink-connector-X.X.X

.zip”压缩包。解压“dis-flink-connector-X.X.X.zip”压缩包,解压之后获得以下目录:

● “huaweicloud-dis-flink-connector-demo”目录包含一个Maven工程样例。

----结束

Intellij IDEA 中导入 Demo 工程

以IntelliJ IDEA社区版为例,说明如何编写Flink作业。请先确保在IDEA上已经正确配 置好。

● JDK 1.8+

● Scala-sdk-2.11

● Maven 3.3.*

步骤1 打开IntelliJIDEA,选择“File > New > > Project from Existing Sources...”。选择解 压至本地的huaweicloud-dis-flink-connector-demo目录,单击确认。

步骤2 选择导入Maven工程,保持默认配置,一直单击下一步即可。

步骤3 单击“New Window”,在新窗口打开此工程。

步骤4 在pom.xml上单击右键,选择“Maven > Reimport”,重新引入maven依赖库。

----结束

验证 Flink Streaming Source 作业

实际场景中,Flink Streaming作业需要提交在Flink集群上运行,但本次验证只介绍在 本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集(如 MRS集群)并提交作业验证。

步骤1 使用注册帐户登录DIS控制台。

步骤2 单击管理控制台左上角的 ,选择区域和项目。

步骤3 参考开通DIS通道申请开通DIS通道,并持续上传数据到新创建的DIS通道。本次范例上 传的内容为hello world。

步骤4 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并 保存。

步骤5 右键单击pom.xml,选择“Maven > Reimport”,重新引入依赖包。

步骤6 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSourceJavaExample'”。

步骤7 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :

DIS网关地址 Region名称 AK SK ProjectID 通道名称 起始位置 消费者标识 如在华北-北京1测试,则参数示例为

https://dis.${region}.myhuaweicloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME latest GROUP_ID

参数顺序与含义在示例代码中有,可以参考。

// DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com String endpoint;

// DIS服务所在区域ID,如 cn-north-1 String region;

// 用户的AK String ak;

// 用户的SK String sk;

// 用户的项目ID String projectId;

// DIS通道名称 String streamName;

// 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略;如果存在有效 的Checkpoint,则会从此Checkpoint开始继续消费

// 取值有: LATEST 从最新的数据开始消费,此策略会忽略通道中已有数据 // EARLIEST 从最老的数据开始消费,此策略会获取通道中所有的有效数据 String startingOffsets;

// 消费组标识,同一个消费组下的不同客户端可以同时消费同一个通道 String groupId;

说明

● 断点消费 必须指定checkpoint 或者按照如下設置自动打上消费点 。 disConfig.put(DisConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

disConfig.put(DisConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

disConfig.put(DisConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "LATEST"); //LATEST 表示 从最新的数据开始消费 。

● 如果都不设置 默认的是从lastest开始消费。

最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。

步骤8 在DISFlinkStreamingSourceJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSourceJavaExample'”,即可启动作业。

步骤9 如果没有其他错误,将从DIS读取数据并输出到控制台,示例如下:

2> hello world 2> hello world 2> hello world

步骤10 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释 (防止以后打包会把flink依赖也打进来),然后停止数据上传程序。

----结束

验证 Flink Streaming Sink 作业

实际场景中,Flink作业需要提交在Flink集群上运行,但本次验证只介绍在本地IDE上 测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集群(如MRS集群)

并提交作业验证。

步骤1 使用注册帐户登录DIS控制台。

步骤2 单击管理控制台左上角的 ,选择区域和项目。

步骤3 参考开通DIS通道申请开通DIS通道。

步骤4 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并 保存。

步骤5 右键单击pom.xml,选择“Maven > Reimport”,重新引入依赖包。

步骤6 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Create 'DISFlinkStreamingSinkJavaExample'”。

步骤7 在打开的配置页面中,“Program arguments”中输入运行参数,格式为 :

DIS网关地址 Region名称 AK SK ProjectID 通道名称

https://dis.${region}.myhuaweicloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME

参数顺序与含义在示例代码中有,可以参考。

// DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com String endpoint;

// DIS服务所在区域ID,如 cn-north-1 String region;

// 用户的AK String ak;

// 用户的SK String sk;

// 用户的项目ID String projectId;

// DIS通道名称 String streamName;

最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。

步骤8 在DISFlinkStreamingSinkJavaExample文件内任意地方,右键选择“Run 'DISFlinkStreamingSinkJavaExample'”,即可启动作业。

步骤9 如果没有其他错误,可以到DIS控制台通道监控页面查看数据是否上传成功。

步骤10 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释 (防止以后打包会把flink依赖也打进来),然后停止数据上传程序。

----结束

相關文件