• 沒有找到結果。

2.3 创建输入流

2.3.6 开源 Kafka 输入流

功能描述

创建source流从Kafka获取数据,作为作业的输入数据。

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,

具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景 中使用。

前提条件

● Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的 hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP 请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《数据湖探索用户指 南》中修改主机信息章节。

● Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。

且用户可以根据实际所需设置相应安全组规则。

如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章 节。

如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。

语法格式

CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH (

type = "kafka",

kafka_bootstrap_servers = "", kafka_group_id = "", kafka_topic = "", encode = "json", json_config=""

) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

关键字

2-6 关键字说明

参数 是否必

说明

type 是 数据源类型,“Kafka”表示数据源。

kafka_bootstrap

_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型 跨源开通DLI队列和Kafka集群的连接)。

kafka_group_id 否 group id。

kafka_topic 是 读取的Kafka的topic。目前只支持读取单个topic。

参数 是否必 选

说明

encode 是 数据编码格式,可选为“csv”、“json”、“blob”

和“user_defined”。

● 若编码格式为“csv”,则需配置

“field_delimiter”属性。

● 若编码格式为“json”,则需配置“json_config”

属性。

● 当编码格式为"blob"时,表示不对接收的数据进行 解析,流属性仅能有一个且为Array[TINYINT]类 型。

● 若编码格式为“user_defined”,则需配置

“encode_class_name”和

“encode_class_parameter”属性。

encode_class_n

ame 否 当encode为user_defined时,需配置该参数,指定用 户自实现解码类的类名(包含完整包路径),该类需 继承类DeserializationSchema。

encode_class_p

arameter 否 当encode为user_defined时,可以通过配置该参数指 定用户自实现解码类的入参,仅支持一个string类型的 参数。

json_config 否 当encode为json时,用户可以通过该参数指定json字 段和流属性字段的映射关系。

格式:"field1=json_field1;field2=json_field2"

格式说明:field1、field2为创建的表字段名称。

json_field1、json_field2为kafka输入数据json串的key 字段名称。

具体使用方法可以参考示例说明。

说明如果定义的source stream中的属性和json中的属性名称相

同,json_configs可以不用配置。

field_delimiter 否 当encode为csv时,用于指定csv字段分隔符,默认为 逗号。

quote 否 可以指定数据格式中的引用符号,在两个引用符号之 间的属性分隔符会被当做普通字符处理。

● 当引用符号为双引号时,请设置quote = "\u005c

\u0022"进行转义。

● 当引用符号为单引号时,则设置quote = "'"。

说明

● 目前仅适用于CSV格式。

● 设置引用符号后,必须保证每个字段中包含0个或者偶数 个引用符号,否则会解析失败。

参数 是否必 选

说明

timeindicator 否 在流中增加时间戳,可增加“processing time”时间 戳或者“event time”时间戳。

说明

● 若设置“processing time”,则为proctime.proctime。

当设置了proctime.proctime时,会在原有属性字段基础 上多增加一个proctime系统时间戳属性,假设原有字段 为3个,设置了proctime.proctime后会变成4个,设置 rowtime属性字段不会发生变化。

● 若设置“event time”,可选择流中的某个属性来作为时 间戳,格式为attr_name.rowtime。

● 以上两者可以同时设置。

start_time 否 kafka数据读取起始时间。

当该参数配置时则从配置的时间开始读取数据,有效 格式为yyyy-MM-dd HH:mm:ss。start_time要不大于 当前时间,若大于当前时间,则不会有数据读取出。

该参数配置后,只会读取Kafka topic在该时间点后产 生的数据。

● 从Kafka读取对象为test的topic。数据编码格式为json且不含嵌套,例如:

{"attr1": "lilei", "attr2": 18}。

CREATE SOURCE STREAM kafka_source (name STRING, age int) WITH (

type = "kafka",

kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1",

kafka_topic = "test", encode = "json",

json_config = "name=attr1;age=attr2"

);

● 从Kafka读取对象为test的topic。数据编码格式为json且包含嵌套。本示例使用了 复杂数据类型ROW,ROW使用语法可以参考数据类型。

测试数据参考如下:

{ "id":"1", "type2":"online", "data":{

CREATE SOURCE STREAM kafka_source ( id STRING,

kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1",

kafka_topic = "test", encode = "json"

);

CREATE SINK STREAM kafka_sink ( id STRING,

type2 STRING, patient_id STRING, name STRING ) WITH ( type="kafka",

kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink",

encode = "csv"

);

INSERT INTO kafka_sink select id, type2, data.patient_id, data.name from kafka_source;