使用DLI提交Flink SQL作业进行实时计算。基本流程如下:
步骤1:登录华为云
步骤2:准备数据源和数据输出通道 步骤3:创建OBS桶保存输出数据 步骤4:登录DLI管理控制台 步骤5:创建队列
步骤6:创建增强型跨源连接 步骤7:创建跨源认证
步骤8:配置安全组规则和测试地址连通性 步骤9:创建Flink SQL作业
样例场景需要创建一个Flink SQL作业,并且该作业有一个输入流和一个输出流。输入 流用于从DIS读取数据,输出流用于将数据写入到Kafka中。
步骤 1:登录华为云
使用DLI服务,首先要登录华为云。
1. 打开华为云首页。
2. 在登录页面输入“帐号名”和“密码”,单击“登录”。
步骤 2:准备数据源和数据输出通道
DLI Flink作业支持其他服务作为数据源和数据输出通道,具体内容请参见准备Flink作 业数据 。
本样例中,假设作业名称为“JobSample”,采用DIS服务作为数据源,开通数据接入 服务(DIS),具体操作请参见《数据接入服务用户指南》中的“开通DIS通道”章 节。采用分布式消息服务Kafka作为数据输出通道,创建Kafka专享版实例,具体操作 请参见《分布式消息服务Kafka用户指南》中的“购买实例”章节。
● 创建用于作业输入流的DIS通道:
a. 登录DIS管理控制台。
b. 在管理控制台左上角选择区域和项目。
c. 单击“购买接入通道”配置相关参数。通道信息如下:
▪
区域:选择与DLI服务相同的区域▪
通道名称:csinput▪
通道类型:普通▪
分区数量:1▪
生命周期(小时):24▪
源数据类型:BLOB▪
自动扩缩容:关闭▪
企业项目:default▪
高级配置:暂不配置d. 单击“立即购买”,进入“规格确认”页面。
e. 单击“提交”,完成通道接入。
● 创建用于作业输出流的Kafka专享版实例:
a. 在创建Kafka实例前您需要提前准备相关依赖资源,包括VPC、子网和安全 组,并配置安全组。
▪
创建VPC和子网的操作指导请参考创建虚拟私有云和子网,若需要在已 有VPC上创建和使用新的子网,请参考为虚拟私有云创建新的子网。说明
● 创建的VPC与使用的Kafka服务应在相同的区域。
● 创建VPC和子网时,如无特殊需求,配置参数使用默认配置即可。
▪
创建安全组的操作指导请参考创建安全组,为安全组添加规则的操作指 导请参考添加安全组规则。更多信息请参考《分布式消息服务Kafka用户指南》中的“准备实例依赖资 源”章节。
b. 登录分布式消息服务Kafka管理控制台。
c. 在管理控制台左上角选择区域。
d. 在“Kafka专享版”页面,单击右上角“购买Kafka实例”配置相关参数。实 例信息如下:
▪
计费模式:按需付费▪
区域:选择与DLI服务相同的区域▪
项目:默认▪
可用区:默认▪
实例名称:kafka-dliflink▪
企业项目:default▪
版本:默认▪
CPU架构:默认▪
规格:选择对应的规格▪
代理个数:默认▪
存储空间:默认▪
容量阈值策略:默认▪
虚拟私有云: vpc-dli,子网:dli-subnet▪
安全组:default▪
Manager用户名:dliflink(用于登录实例管理页面)▪
密码:****(请妥善管理密码,系统无法获取您设置的密码内容)▪
确认密码:****▪
更多配置:开启参数“Kafka SASL_SSL”,根据界面提示配置SSL认证 的用户名和密码。其他参数可暂不配置。e. 单击“立即购买”,弹出“规格确认”页面。
f. 单击“提交”,完成实例创建。
g. 在分布式消息服务Kafka管理,单击“Kafka专享版”,单击已创建的Kafka 实例名称,例如kafka-dliflink,进入实例详情页面。
h. 在“基本信息 > 高级配置 > SSL 证书”所在行,单击下载按钮。下载压缩包 到本地并解压,获取压缩包中的客户端证书文件:client.truststore.jks,给后 续步骤做准备。
步骤 3:创建 OBS 桶保存输出数据
在本样例中,需要为作业“JobSample”开通对象存储服务(OBS),为DLI Flink作业 提供Checkpoint、保存作业日志和调试测试数据的存储功能。
具体操作请参见《对象存储服务控制台指南》中的“创建桶”章节。
1. 在OBS管理控制台左侧导航栏选择“对象存储”。
2. 在页面右上角单击“创建桶”,配置桶参数。
– 区域:选择与DLI服务相同的区域 – 桶名称:smoke-test
– 存储类别:标准存储 – 桶策略:私有 – 默认加密:关闭 – 归档数据直读:关闭 – 企业项目:default – 标签:不填写 3. 单击“立即创建”。
步骤 4:登录 DLI 管理控制台
1. 在华为云官网首页的上方导航栏中,单击“产品”页签。
2. 在列表中,选择“大数据”>“大数据计算”中的“数据湖探索 DLI”。
3. 在DLI服务产品页,单击“进入控制台”,进入DLI管理控制台页面。第一次进入 数据湖探索管理控制台需要进行授权,以获取访问OBS的权限。
步骤 5:创建队列
创建DLI Flink SQL作业,不能使用系统已有的default队列,需要您创建队列,例如创 建名为“Flinktest”的队列。创建队列详细介绍请参考创建队列。
1. 在DLI管理控制台总览页,单击右上角“购买队列”进入购买队列页面。
2. 配置参数。
– 计费模式:按需计费 – 当前区域:默认区域 – 队列名称:Flinktest
– 队列类型:通用队列。勾选“专属资源模式”。
– AZ策略:单AZ – 队列规格:16CUs – 企业项目:default
– 描述:不填
– 高级配置:自定义配置
– 网段:配置的网段不能与Kafka的子网网段冲突 – 标签:不填
3. 单击“立即购买”,确认配置。
4. 配置确认无误,提交请求。
步骤 6:创建增强型跨源连接
创建DLI Flink作业,还需要创建增强型跨源连接。具体操作请参考创建增强型跨源连 接。
说明
● 增强型跨源仅支持包年包月队列和按需专属队列。
● 绑定跨源的DLI队列网段和数据源网段不能重合。
● 系统default队列不支持创建跨源连接。
● 访问跨源表需要使用已经创建跨源连接的队列。
1. 在DLI管理控制台左侧导航栏中,选择“跨源连接”。
2. 选择“增强型跨源”页签,单击左上角的“创建”按钮。配置参数:
– 连接名称:diskafka – 绑定队列:Flinktest – 虚拟私有云:vpc-dli – 子网:dli-subnet
说明
创建跨源连接的虚拟私有云和子网需要和Kafka实例保持一致。
3. 单击“确定”,完成创建增强型跨源连接。
4. 在“增强型跨源”页签,单击创建的连接名称:diskafka,查看对等连接ID及连接 状态,连接状态为“已激活”表示连接成功。
步骤 7:创建跨源认证
创建跨源认证的具体操作请参考跨源认证。
1. 将步骤2:准备数据源和数据输出通道中获取的kafka认证文件
“client.truststore.jks”上传到步骤3:创建OBS桶保存输出数据中的OBS桶
“smoke-test”下。
2. 在DLI管理控制台选择“跨源连接”。
3. 在“跨源认证”页签,单击“创建”,创建认证信息。配置参数:
– 认证信息名称:Flink – 类型:Kafka_SSL
– Truststore路径:obs://smoke-test/client.truststore.jks – Truststore密码:dms@kafka
其余参数可不用配置。
4. 单击“确定”,完成创建跨源认证。
步骤 8:配置安全组规则和测试地址连通性
1. DLI管理控制台,单击“队列管理”,选择绑定的队列,在操作列,单击“详情”
获取队列的网段信息。
2. 登录分布式消息服务Kafka管理控制台,单击“Kafka专享版”,单击已创建的 Kafka实例名称,例如kafka-dliflink,进入实例基本信息页面。
3. 在实例基本信息页面,在“连接地址”配置下的获取Kafka的连接地址和端口。
4. 在实例基本信息页面,在“网络”配置下的“安全组”,单击安全组名称,进入 安全组配置页面。
5. 在Kafka实例对应的安全组配置页面,单击“入方向规则 > 添加规则”,协议选 择“TCP”,端口选择“9093”,源地址填写DLI队列的网段。单击“确定”完成 配置。
6. 登录DLI管理控制台,选择“队列管理”,在所在Flink队列行,单击“更多 > 测 试地址连通性”,在“地址”参数中按照“IP:端口”的格式输入Kafka的连接地址 和端口,单击“测试”,返回地址可达后进行后续操作步骤。注意多个地址要分 开单独测试。
步骤 9:创建 Flink SQL 作业
准备好数据源和数据输出通道之后,就可以创建Flink SQL作业了。
1. 在DLI管理控制台的左侧导航栏中,单击“作业管理”>“Flink作业”,进入
“Flink作业”页面。
2. 在“Flink作业”页面右上角单击“创建作业”,弹出“创建作业”对话框。配置 参数:
– 类型:Flink SQL – 名称:DIS-Flink-Kafka – 描述:不填
– 模板名称:不选择 – 标签:不填
3. 单击“确定”,进入作业“编辑”页面。
4. 编辑SQL作业。
在SQL语句编辑区域,输入详细的SQL语句。具体如下,注意以下加粗的参数值都 需要根据注释提示修改。
CREATE SOURCE STREAM car_info ( a1 string,
region = "cn-north-4",//需要修改为当前DLI队列所在的region channel = "csinput",
encode = "csv", FIELD_DELIMITER = ";"
);
CREATE SINK STREAM kafka_sink ( a1 string, type="kafka",
kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093",//需要修改为kafka实例
的连接地址
kafka_topic = "testflink", // 要写入kafka的topic,进入kafka控制台,单击已创建的Kafka实例名称,在 Topic管理查看Topic名称
encode = "csv", // 编码格式,支持json/csv kafka_certificate_name = "Flink", kafka_properties_delimiter = ",",
//kafka_properties中的username和password的值xxx需要替换为步骤2中kafka创建SSL认证的用户名 和密码 kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL"
);
INSERT INTO kafka_sink SELECT * FROM car_info;
CREATE sink STREAM car_info1 ( a1 string,
region = "cn-north-4",//需要修改为当前DLI队列所在的region channel = "csinput",
encode = "csv", FIELD_DELIMITER = ";"
);
insert into car_info1 select 'id','owner','brand',1;
insert into car_info1 select 'id','owner','brand',2;
insert into car_info1 select 'id','owner','brand',3;
insert into car_info1 select 'id','owner','brand',4;
insert into car_info1 select 'id','owner','brand',5;
insert into car_info1 select 'id','owner','brand',6;
insert into car_info1 select 'id','owner','brand',7;
insert into car_info1 select 'id','owner','brand',8;
insert into car_info1 select 'id','owner','brand',9;
insert into car_info1 select 'id','owner','brand',10;
5. 单击“语义校验”,确保语义校验成功。
6. 设置作业运行参数。配置必选参数:
– CU数量:2 – 管理单元:1 – 并行数:1
– 所属队列:Flinktest – 保存作业日志:勾选
– OBS桶:选择作业日志保存的OBS桶,并进行授权。
其余参数可不用配置。
7. 单击“保存”,保存作业和相关参数。
8. 单击“启动”,进入“启动Flink作业”页面,确认作业规格和费用后,单击“立 即启动”,启动作业。
启动作业后,系统将自动跳转到Flink作业管理页面,新创建的作业将显示在作业 列表中,在“状态”列中可以查看作业状态。作业提交成功后,状态将由“提交 中”变为“运行中”。
如果作业状态为“提交失败”或“运行异常”,表示作业提交或运行失败。用户 可以在作业列表中的“状态”列中,将鼠标移动到状态图标上查看错误信息,单 击 可以复制错误信息。根据错误信息解决故障后,重新提交。
如果以上错误信息不足以定位问题,还可以参考Flink作业运行异常,如何定位,
从OBS桶中下载作业日志对问题进一步定位。
9. 作业运行完成后,可登录分布式消息服务Kafka管理控制台,查看对应的Kafka专 享实例。单击实例名称,选择“消息查询”页签,选择Flink SQL作业中写入的 kafka的Topic名称,单击“搜索”,在操作列单击“查看消息正文”查看写入的 消息内容。
后续指引
完成Flink SQL作业快速入门操作后,如果您想了解更多关于Flink SQL作业相关操作,
完成Flink SQL作业快速入门操作后,如果您想了解更多关于Flink SQL作业相关操作,