2.3 创建输入流
2.3.7 OBS 输入流
CREATE SINK STREAM kafka_sink ( id STRING,
type2 STRING, patient_id STRING, name STRING ) WITH ( type="kafka",
kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink",
encode = "csv"
);
INSERT INTO kafka_sink select id, type2, data.patient_id, data.name from kafka_source;
2.3.7 OBS 输入流
功能描述
创建source流从对象存储服务(OBS)获取数据。DLI从OBS上读取用户存储的数据,
作为作业的输入数据。适用于大数据分析、原生云应用程序数据、静态网站托管、备 份/活跃归档、深度/冷归档等场景。
对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服 务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请 参见《对象存储服务控制台指南》。
语法格式
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH (
) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME
关键字
表2-7 关键字说明
参数 是否
必选
说明
type 是 数据源类型,“obs”表示数据源为对象存储服务。
region 是 对象存储服务所在区域。
encode 否 数据的编码格式,可以为“csv”或者“json”。默认值为
“csv”。
ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的
凭证。
sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密 钥获取方式请参见我的凭证。
bucket 是 数据所在的OBS桶名。
object_na
me 是 数据所在OBS桶中的对象名。如果对象不在OBS根目录下,
则需添加文件夹名,例如:test/test.csv。对象文件格式参考
“encode”参数。
row_delimi
ter 是 行间的分隔符。
field_delim
iter 否 属性分隔符。
● 当“encode”参数为csv时,该参数必选。用户可以自定 义属性分隔符。
● 当“encode”参数为json时,该参数不需要填写。
quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属 性分隔符会被当做普通字符处理。
● 当引用符号为双引号时,请设置quote = "\u005c
\u0022"进行转义。
● 当引用符号为单引号时,则设置quote = "'"。
说明
● 目前只适用于CSV格式。
● 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符 号,否则会解析失败。
version_id 否 版本号,当obs里的桶或对象有设置版本的时候需填写,否则 不用配置该项。
参数 是否 必选
说明
timeindicat
or 否 在流中增加时间戳,可增加“processing time”时间戳或者
“event time”时间戳。
说明
● 若设置“processing time”,则为proctime.proctime。
当设置了proctime.proctime时,会在原有属性字段基础上多增加 一个proctime系统时间戳属性,假设原有字段为3个,设置了 proctime.proctime后会变成4个,设置rowtime属性字段不会发生 变化。
● 若设置“event time”,可选择流中的某个属性来作为时间戳,
格式为attr_name.rowtime。
● 以上两者可以同时设置。
注意事项
用来做时间戳的属性类型必须为long或者timestamp。
示例
● 从OBS的桶读取对象为input.csv的文件,文件以'\n'划行, 以','划列。
测试输入数据input.csv可以先通过新建input.txt复制如下文本数据,再另存为
CREATE SOURCE STREAM car_infos ( car_id STRING,
car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH (
type = "obs",
bucket = "dli-test-obs01", region = "cn-north-1" , object_name = "input.csv", row_delimiter = "\n", field_delimiter = ","
);
● 从OBS的桶读取对象为input.json的文件,文件以'\n'划行。
CREATE SOURCE STREAM obs_source ( str STRING object_name = "input.json"
);