• 沒有找到結果。

1. 单击DLI控制台左侧“作业管理”,选择“Flink作业”。单击“创建作业”。

– 类型:选择作业类型为:Flink OpenSource SQL。

– 名称:自定义。

5-17 创建 Flink 作业

2. 单击“确定”,进入作业编辑作业页面,具体SQL示例如下,部分参数值需要根 据RDS和DMS对应的信息进行修改。

--********************************************************************---- 数据源:trade_order_detail_info (订单详情宽表) --********************************************************************--create table trade_order_detail (

order_id string, -- 订单ID order_channel string, -- 渠道

最佳实践 5 使用 DLI Flink SQL 进行电商实时业务数据分析

order_time string, -- 订单创建时间

"connector.type" = "kafka", "connector.version" = "0.10",

"connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka连接地址 "connector.properties.group.id" = "trade_order", -- Kafka groupID

"connector.topic" = "trade_order_detail_info", -- Kafka topic "format.type" = "json",

"connector.startup-mode" = "latest-offset"

);

--********************************************************************---- 结果表:trade_channel_collect (各渠道的销售总额实时统计) --********************************************************************--create table trade_channel_collect(

begin_time string, --统计数据的开始时间 channel_code string, -- 渠道编号 channel_name string, -- 渠道名 cur_gmv double, -- 当天GMV

cur_order_user_count bigint, -- 当天付款人数 cur_order_count bigint, -- 当天付款订单数 last_pay_time string, -- 最近结算时间 flink_current_time string,

primary key (begin_time, channel_code) not enforced ) with (

"connector.type" = "jdbc",

"connector.url" = "jdbc:mysql://xxxx:3306/xxxx", -- mysql连接地址,jdbc格式 "connector.table" = "xxxx", -- mysql表名

"connector.driver" = "com.mysql.jdbc.Driver",

"connector.username" = "xxx", -- mysql用户名 "connector.password" = "xxxx", -- mysql密码 "connector.write.flush.max-rows" = "1000",

"connector.write.flush.interval" = "1s"

);

--********************************************************************---- 临时中间表

--********************************************************************--create view tmp_order_detail

asselect *

, case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"

else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、

appShop、miniAppShop、other]

, case when t.order_channel = "webShop" then _UTF16"网页商城"

when t.order_channel = "appShop" then _UTF16"app商城" , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time

, concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time from trade_order_detail

where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方 便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)

and real_pay is not null ) twhere t.rn = 1;

-- 按渠道统计各个指标

insert into trade_channel_collect select

最佳实践 5 使用 DLI Flink SQL 进行电商实时业务数据分析

begin_time --统计数据的开始时间 , channel_code

, channel_name

, cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV , count(distinct user_id) as cur_order_user_count --当天付款人数 , count(1) as cur_order_count --当天付款订单数

, max(pay_time) as last_pay_time --最近结算时间

, cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间 from tmp_order_detail

where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") group by begin_time, channel_code, channel_name;

说明

作业逻辑说明如下:

1. 创建一个Kafka源表,用来从Kafka指定Topic中读取消费数据;

2. 创建一个结果表,用来通过JDBC向MySQL中写入结果数据。

3. 实现相应的处理逻辑,以实现各个指标的统计。

为了简化最终的处理逻辑,使用创建视图进行数据预处理。

1. 利用over窗口条件和过滤条件结合以去除重复数据(该方式是利用了top N的方 法),同时利用相应的内置函数concat和substr将当天的00:00:00作为统计的开始 时间,当天的23:59:59作为统计结束时间,并筛选出支付时间在当天凌晨00:00:00 后的订单数据进行统计(为了方便模拟数据的构造,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string))。

2. 根据这些数据的订单渠道利用内置的条件函数设置channel_code和channel_name 的值,从而获取了源表中的字段信息,以及begin_time、end_time和

请在官网提工单申请开通CCE队列使用权限后,再重新创建DLI CCE队列,继续后续步骤。

5-18 Flink Opensource SQL 作业

4. 等待作业状态会变为“运行中”,单击作业名称,可以查看作业详细运行情况。

最佳实践 5 使用 DLI Flink SQL 进行电商实时业务数据分析

5-19 作业运行状态

5. 使用Kafka客户端向指定topic发送数据,模拟实时数据流。具体方法请参考:

DMS - 连接实例生产消费信息。

5-20 模拟实时数据流

发送命令如下:

sh kafka_2.11-2.3.0/bin/kafka-console-producer.sh --broker-list kafka连接地址 --topic 指定topic

示例数据如下:

{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00",

"pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001",

"user_name":"Alice", "area_id":"330106"}

{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06",

"pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001",

"user_name":"Alice", "area_id":"330106"}

{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00",

"user_id":"0002", "user_name":"Bob", "area_id":"330110"}

{"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05",

"pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003",

"user_name":"Cindy", "area_id":"330108"}

{"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20",

"pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004",

"user_name":"Daisy", "area_id":"330102"}

{"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08",

"pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004",

"user_name":"Daisy", "area_id":"330102"}

{"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13",

"pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004",

"user_name":"Daisy", "area_id":"330102"}

{"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06",

"pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001",

"user_name":"Alice", "area_id":"330106"}

{"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06",

"pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002",

"user_name":"Bob", "area_id":"330110"}

{"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00",

"user_id":"0003", "user_name":"Cindy", "area_id":"330108"}

{"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06",

"pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004",

"user_name":"Daisy", "area_id":"330102"}

6. 单击DLI控制台左侧“作业管理”>“Flink作业”,单击3提交的Flink作业。在作 业详情页面,可以看到处理的数据记录数。

最佳实践 5 使用 DLI Flink SQL 进行电商实时业务数据分析

5-21 Flink 作业详情

相關文件