2.3 创建输入流
2.3.5 MRS Kafka 输入流
2.3.5 MRS Kafka 输入流
功能描述
创建source流从Kafka获取数据,作为作业的输入数据。
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,
具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景 中使用。MRS基于Apache Kafka在平台部署并托管了Kafka集群。
前提条件
● Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的 hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP 请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《数据湖探索用户指 南》中修改主机信息章节。
● Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。
且用户可以根据实际所需设置相应安全组规则。
如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章 节。
如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
语法格式
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH (
type = "kafka",
kafka_bootstrap_servers = "", kafka_group_id = "", kafka_topic = "", encode = "json"
) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME
关键字
表2-5 关键字说明
参数 是否必
选 说明
type 是 数据源类型,“Kafka”表示数据源。
kafka_bootstrap
_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型 跨源开通DLI队列和Kafka集群的连接)。
kafka_group_id 否 group id。
kafka_topic 是 读取的Kafka的topic。目前只支持读取单个topic。
encode 是 数据编码格式,可选为“csv”、“json”、“blob”
和“user_defined”。
● 若编码格式为“csv”,则需配置
“field_delimiter”属性。
● 若编码格式为“json”,则需配置“json_config”
属性。
● 当编码格式为"blob"时,表示不对接收的数据进行 解析,流属性仅能有一个且为Array[TINYINT]类 型。
● 若编码格式为“user_defined”,则需配置
“encode_class_name”和
“encode_class_parameter”属性。
参数 是否必 选
说明
encode_class_n
ame 否 当encode为user_defined时,需配置该参数,指定用 户自实现解码类的类名(包含完整包路径),该类需 继承类DeserializationSchema。
encode_class_p
arameter 否 当encode为user_defined时,可以通过配置该参数指 定用户自实现解码类的入参,仅支持一个string类型的 参数。
krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配 置该参数。
说明请确保在DLI队列host文件中添加MRS集群master节点的
“/etc/hosts”信息。
json_config 否 当encode为json时,用户可以通过该参数指定json字 段和流属性字段的映射关系。
格式:"field1=json_field1;field2=json_field2"
格式说明:field1、field2为创建的表字段名称。
json_field1、json_field2为kafka输入数据json串的key 字段名称。
具体使用方法可以参考示例说明。
field_delimiter 否 当encode为csv时,用于指定csv字段分隔符,默认为 逗号。
quote 否 可以指定数据格式中的引用符号,在两个引用符号之 间的属性分隔符会被当做普通字符处理。
● 当引用符号为双引号时,请设置quote = "\u005c
\u0022"进行转义。
● 当引用符号为单引号时,则设置quote = "'"。
说明
● 目前仅适用于CSV格式。
● 设置引用符号后,必须保证每个字段中包含0个或者偶数 个引用符号,否则会解析失败。
timeindicator 否 在流中增加时间戳,可增加“processing time”时间 戳或者“event time”时间戳。
说明
● 若设置“processing time”,则为proctime.proctime。
当设置了proctime.proctime时,会在原有属性字段基础 上多增加一个proctime系统时间戳属性,假设原有字段 为3个,设置了proctime.proctime后会变成4个,设置 rowtime属性字段不会发生变化。
● 若设置“event time”,可选择流中的某个属性来作为时 间戳,格式为attr_name.rowtime。
● 以上两者可以同时设置。
参数 是否必 选
说明
start_time 否 kafka数据读取起始时间。
当该参数配置时则从配置的时间开始读取数据,有效 格式为yyyy-MM-dd HH:mm:ss。start_time要不大于 当前时间,若大于当前时间,则不会有数据读取出。
● 从Kafka名称为test的topic中读取数据。
CREATE SOURCE STREAM kafka_source ( name STRING,
age int ) WITH ( type = "kafka",
kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1",
kafka_topic = "test", encode = "json"
);
● 从Kafka读取对象为test的topic,使用json_config将json数据和表字段对应。
数据编码格式为json且不含嵌套,例如:
{"attr1": "lilei", "attr2": 18}
建表语句参考如下:
CREATE SOURCE STREAM kafka_source (name STRING, age int) WITH (
type = "kafka",
kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1",
kafka_topic = "test", encode = "json",
json_config = "name=attr1;age=attr2"
);