2.7 自拓展生态
2.7.2 自拓展输出流
WITH (
type = "user_defined",
type_class_name = "mySourceSink.MySource", type_class_parameter = "60"
)
TIMESTAMP BY car_timestamp.rowtime;
说明
自定义source类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。
2.7.2 自拓展输出流
用户可通过编写代码实现将DLI处理之后的数据写入指定的云生态或者开源生态。
语法格式
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH (
type = "user_defined", type_class_name = "", type_class_parameter = ""
);
关键字
表2-28 关键字说明
参数 是否
必选
说明
type 是 数据源类型,"user_defined"表示数据源为用户自定义数据 源。
type_class
_name 是 用户实现获取源数据的sink类名称,注意包含完整包路径。
type_class _paramete r
是 用户自定义sink类的入参,仅支持一个string类型的参数。
注意事项
用户自定义sink类需要继承类RichSinkFunction,并指定数据类型为Row例如定义类 MySink:public class MySink extends RichSinkFunction<Row>{},重点实现其中的 open、invoke和close函数。
依赖pom:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
示例
实现数据以CSV编码写入DIS通道。
CREATE SINK STREAM user_out_data ( count INT
) WITH (
type = "user_defined",
type_class_name = "mySourceSink.MySink", type_class_parameter = ""
);
说明
自定义sink类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。
2.8 数据类型
概述
数据类型是数据的一个基本属性,用于区分不同类别的数据。不同的数据类型所占的 存储空间不同,能够进行的操作也不相同。数据库中的数据存储在数据表中。数据表 中的每一列都定义了数据类型,用户存储数据时,须遵从这些数据类型的属性,否则 可能会出错。
华为大数据平台的Flink SQL与开源社区相同,支持原生数据类型、复杂数据类型和复 杂类型嵌套。
原生数据类型
Flink SQL支持原生数据类型,请参见表2-29。
表2-29 原生数据类型
数据类型 描述 存储空间 范围
VARCHAR 可变长度的字符 -
-BOOLEAN 布尔类型 - TRUE/FALSE
TINYINT 有符号整数 1字节 -128-127 SMALLINT 有符号整数 2字节 -32768-32767
INT 有符号整数 4字节 -2147483648~
2147483647 INTEGER 有符号整数 4字节 -2147483648~
2147483647
BIGINT 有符号整数 8字节 -9223372036854775808
~9223372036854775807
REAL 单精度浮点型 4字节
-FLOAT 单精度浮点型 4字节
-DOUBLE 双精度浮点型 8字节
-DECIMAL 固定有效位数和小数位 数的数据类型
-
-DATE 日期类型,描述了特定 的年月日,以yyyy-MM-dd格式表示,例如 2014-05-29
- DATE类型不包含时间,
所表示日期的范围为 0000-01-01 to 9999-12-31
数据类型 描述 存储空间 范围 TIME 时间类型,以HH:mm:ss
表示。
例如20:17:40
-
-TIMESTAMP(3) 完整日期,包括日期和 时间。
例如:1969-07-20 20:17:40
-
-INTERVAL timeUnit [TO timeUnit]
时间间隔
例如:INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY
-
-复杂数据类型
Flink SQL支持复杂数据类型和复杂类型嵌套。复杂数据类型如表2-30所示。
表2-30 复杂数据类型
,TYPE] 变量名[key],例
如:v1[key] Map[key, value, key2, value2, key3,
value3...] as v1
ROW 一组命名的字段,
字段的数据类型可 以不同。
ROW<a1 TYPE1, a2 TYPE2>
变量名.字段名,
例如:v1.a1 Row('1',2) as v1
使用示例如下:
CREATE SOURCE STREAM car_infos ( car_id STRING,
address ROW<city STRING, province STRING, country STRING>, average_speed MAP[STRING, LONG],
speeds ARRAY[LONG]
) WITH ( type = "dis",
region = "cn-north-1" ,
channel = "dliinput", encode = "json"
);
CREATE temp STREAM car_speed_infos ( car_id STRING,
province STRING, average_speed LONG, start_speed LONG );
INSERT INTO car_speed_infos SELECT car_id,
address.province,
average_speed[address.city], speeds[1]
FROM car_infos;
复杂类型嵌套
● Json格式增强
以Source为例进行说明,Sink的使用方法相同。
– 支持配置Json_schema
配置了json_schema后,可以不声明DDL中的字段,自动从json_schema中生 成。使用示例如下:
CREATE SOURCE STREAM data_with_schema WITH ( type = "dis",
region = "cn-north-1" , channel = "dis-in", encode = "json",
json_schema = '{"definitions":{"address":{"type":"object","properties":{"street_address":
{"type":"string"},"city":{"type":"string"},"state":{"type":"string"}},"required": shipping_address_state string ) WITH (
insert into buy_infos select billing_address.city, shipping_address.state from data_with_schema;
示例数据:
{ "billing_address":
{ "street_address":"binjiang", "city":"hangzhou", "state":"zhejiang"
},
"shipping_address":
{ "street_address":"tianhe", "city":"guangzhou",
"state":"guangdong"
}}
– 支持不配置json_schema也不配置json_config。json_config使用可以参考开
源Kafka输入流样例说明。
这种情况下默认用ddl中属性名当做json key来进行解析。
测试示例数据如下,测试数据既包括嵌套json字段,如billing_address、
shipping_address,也包括非嵌套的字段id、type2。
{ "id":"1", "type2":"online", "billing_address":
{ "street_address":"binjiang", "city":"hangzhou", "state":"zhejiang"
},
"shipping_address":
{ "street_address":"tianhe", "city":"guangzhou", "state":"guangdong"
}}
具体建表和使用示例参考如下:
CREATE SOURCE STREAM car_info_data ( id STRING,
type2 STRING,
billing_address Row<street_address string, city string, state string>, shipping_address Row<street_address string, city string, state string>, optional_address Row<street_address string, city string, state string>
) WITH ( shipping_address_state string ) WITH (
insert into buy_infos select id, type2, billing_address.city, shipping_address.state from car_info_data;
● Sink序列化支持复杂类型
– 目前只有CSV、Json两种格式支持复杂类型。
– Json请参考•Json格式增强。
– 由于CSV没有标准的格式,所以目前暂不支持source解析,只支持sink。
– 输出格式:尽量和flink原生保持一致。
Map: {key1=Value1, key2=Value2}
Row: 平摊用逗号分隔属性,如Row(1, '2') => 1,'2'