• 沒有找到結果。

2.7 自拓展生态

2.7.2 自拓展输出流

WITH (

type = "user_defined",

type_class_name = "mySourceSink.MySource", type_class_parameter = "60"

)

TIMESTAMP BY car_timestamp.rowtime;

说明

自定义source类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。

2.7.2 自拓展输出流

用户可通过编写代码实现将DLI处理之后的数据写入指定的云生态或者开源生态。

语法格式

CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH (

type = "user_defined", type_class_name = "", type_class_parameter = ""

);

关键字

2-28 关键字说明

参数 是否

必选

说明

type 是 数据源类型,"user_defined"表示数据源为用户自定义数据 源。

type_class

_name 是 用户实现获取源数据的sink类名称,注意包含完整包路径。

type_class _paramete r

是 用户自定义sink类的入参,仅支持一个string类型的参数。

注意事项

用户自定义sink类需要继承类RichSinkFunction,并指定数据类型为Row例如定义类 MySink:public class MySink extends RichSinkFunction<Row>{},重点实现其中的 open、invoke和close函数。

依赖pom:

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.11</artifactId>

<version>${flink.version}</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-core</artifactId>

<version>${flink.version}</version>

<scope>provided</scope>

</dependency>

示例

实现数据以CSV编码写入DIS通道。

CREATE SINK STREAM user_out_data ( count INT

) WITH (

type = "user_defined",

type_class_name = "mySourceSink.MySink", type_class_parameter = ""

);

说明

自定义sink类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。

2.8 数据类型

概述

数据类型是数据的一个基本属性,用于区分不同类别的数据。不同的数据类型所占的 存储空间不同,能够进行的操作也不相同。数据库中的数据存储在数据表中。数据表 中的每一列都定义了数据类型,用户存储数据时,须遵从这些数据类型的属性,否则 可能会出错。

华为大数据平台的Flink SQL与开源社区相同,支持原生数据类型、复杂数据类型和复 杂类型嵌套。

原生数据类型

Flink SQL支持原生数据类型,请参见表2-29。

2-29 原生数据类型

数据类型 描述 存储空间 范围

VARCHAR 可变长度的字符 -

-BOOLEAN 布尔类型 - TRUE/FALSE

TINYINT 有符号整数 1字节 -128-127 SMALLINT 有符号整数 2字节 -32768-32767

INT 有符号整数 4字节 -2147483648~

2147483647 INTEGER 有符号整数 4字节 -2147483648~

2147483647

BIGINT 有符号整数 8字节 -9223372036854775808

~9223372036854775807

REAL 单精度浮点型 4字节

-FLOAT 单精度浮点型 4字节

-DOUBLE 双精度浮点型 8字节

-DECIMAL 固定有效位数和小数位 数的数据类型

-

-DATE 日期类型,描述了特定 的年月日,以yyyy-MM-dd格式表示,例如 2014-05-29

- DATE类型不包含时间,

所表示日期的范围为 0000-01-01 to 9999-12-31

数据类型 描述 存储空间 范围 TIME 时间类型,以HH:mm:ss

表示。

例如20:17:40

-

-TIMESTAMP(3) 完整日期,包括日期和 时间。

例如:1969-07-20 20:17:40

-

-INTERVAL timeUnit [TO timeUnit]

时间间隔

例如:INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY

-

-复杂数据类型

Flink SQL支持复杂数据类型和复杂类型嵌套。复杂数据类型如表2-30所示。

2-30 复杂数据类型

,TYPE] 变量名[key],例

如:v1[key] Map[key, value, key2, value2, key3,

value3...] as v1

ROW 一组命名的字段,

字段的数据类型可 以不同。

ROW<a1 TYPE1, a2 TYPE2>

变量名.字段名,

例如:v1.a1 Row('1',2) as v1

使用示例如下:

CREATE SOURCE STREAM car_infos ( car_id STRING,

address ROW<city STRING, province STRING, country STRING>, average_speed MAP[STRING, LONG],

speeds ARRAY[LONG]

) WITH ( type = "dis",

region = "cn-north-1" ,

channel = "dliinput", encode = "json"

);

CREATE temp STREAM car_speed_infos ( car_id STRING,

province STRING, average_speed LONG, start_speed LONG );

INSERT INTO car_speed_infos SELECT car_id,

address.province,

average_speed[address.city], speeds[1]

FROM car_infos;

复杂类型嵌套

● Json格式增强

以Source为例进行说明,Sink的使用方法相同。

– 支持配置Json_schema

配置了json_schema后,可以不声明DDL中的字段,自动从json_schema中生 成。使用示例如下:

CREATE SOURCE STREAM data_with_schema WITH ( type = "dis",

region = "cn-north-1" , channel = "dis-in", encode = "json",

json_schema = '{"definitions":{"address":{"type":"object","properties":{"street_address":

{"type":"string"},"city":{"type":"string"},"state":{"type":"string"}},"required": shipping_address_state string ) WITH (

insert into buy_infos select billing_address.city, shipping_address.state from data_with_schema;

示例数据:

{ "billing_address":

{ "street_address":"binjiang", "city":"hangzhou", "state":"zhejiang"

},

"shipping_address":

{ "street_address":"tianhe", "city":"guangzhou",

"state":"guangdong"

}}

– 支持不配置json_schema也不配置json_config。json_config使用可以参考开

源Kafka输入流样例说明。

这种情况下默认用ddl中属性名当做json key来进行解析。

测试示例数据如下,测试数据既包括嵌套json字段,如billing_address、

shipping_address,也包括非嵌套的字段id、type2。

{ "id":"1", "type2":"online", "billing_address":

{ "street_address":"binjiang", "city":"hangzhou", "state":"zhejiang"

},

"shipping_address":

{ "street_address":"tianhe", "city":"guangzhou", "state":"guangdong"

}}

具体建表和使用示例参考如下:

CREATE SOURCE STREAM car_info_data ( id STRING,

type2 STRING,

billing_address Row<street_address string, city string, state string>, shipping_address Row<street_address string, city string, state string>, optional_address Row<street_address string, city string, state string>

) WITH ( shipping_address_state string ) WITH (

insert into buy_infos select id, type2, billing_address.city, shipping_address.state from car_info_data;

● Sink序列化支持复杂类型

– 目前只有CSV、Json两种格式支持复杂类型。

– Json请参考•Json格式增强。

– 由于CSV没有标准的格式,所以目前暂不支持source解析,只支持sink。

– 输出格式:尽量和flink原生保持一致。

Map: {key1=Value1, key2=Value2}

Row: 平摊用逗号分隔属性,如Row(1, '2') => 1,'2'

2.9 内置函数