• 沒有找到結果。

2.3 创建输入流

2.3.7 OBS 输入流

CREATE SINK STREAM kafka_sink ( id STRING,

type2 STRING, patient_id STRING, name STRING ) WITH ( type="kafka",

kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink",

encode = "csv"

);

INSERT INTO kafka_sink select id, type2, data.patient_id, data.name from kafka_source;

2.3.7 OBS 输入流

功能描述

创建source流从对象存储服务(OBS)获取数据。DLI从OBS上读取用户存储的数据,

作为作业的输入数据。适用于大数据分析、原生云应用程序数据、静态网站托管、备 份/活跃归档、深度/冷归档等场景。

对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服 务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请 参见《对象存储服务控制台指南》。

语法格式

CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH (

) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

关键字

2-7 关键字说明

参数 是否

必选

说明

type 是 数据源类型,“obs”表示数据源为对象存储服务。

region 是 对象存储服务所在区域。

encode 否 数据的编码格式,可以为“csv”或者“json”。默认值为

“csv”。

ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的

凭证。

sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密 钥获取方式请参见我的凭证。

bucket 是 数据所在的OBS桶名。

object_na

me 是 数据所在OBS桶中的对象名。如果对象不在OBS根目录下,

则需添加文件夹名,例如:test/test.csv。对象文件格式参考

“encode”参数。

row_delimi

ter 是 行间的分隔符。

field_delim

iter 否 属性分隔符。

● 当“encode”参数为csv时,该参数必选。用户可以自定 义属性分隔符。

● 当“encode”参数为json时,该参数不需要填写。

quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属 性分隔符会被当做普通字符处理。

● 当引用符号为双引号时,请设置quote = "\u005c

\u0022"进行转义。

● 当引用符号为单引号时,则设置quote = "'"。

说明

● 目前只适用于CSV格式。

● 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符 号,否则会解析失败。

version_id 否 版本号,当obs里的桶或对象有设置版本的时候需填写,否则 不用配置该项。

参数 是否 必选

说明

timeindicat

or 否 在流中增加时间戳,可增加“processing time”时间戳或者

“event time”时间戳。

说明

● 若设置“processing time”,则为proctime.proctime。

当设置了proctime.proctime时,会在原有属性字段基础上多增加 一个proctime系统时间戳属性,假设原有字段为3个,设置了 proctime.proctime后会变成4个,设置rowtime属性字段不会发生 变化。

● 若设置“event time”,可选择流中的某个属性来作为时间戳,

格式为attr_name.rowtime。

● 以上两者可以同时设置。

注意事项

用来做时间戳的属性类型必须为long或者timestamp。

示例

● 从OBS的桶读取对象为input.csv的文件,文件以'\n'划行, 以','划列。

测试输入数据input.csv可以先通过新建input.txt复制如下文本数据,再另存为

CREATE SOURCE STREAM car_infos ( car_id STRING,

car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH (

type = "obs",

bucket = "dli-test-obs01", region = "cn-north-1" , object_name = "input.csv", row_delimiter = "\n", field_delimiter = ","

);

● 从OBS的桶读取对象为input.json的文件,文件以'\n'划行。

CREATE SOURCE STREAM obs_source ( str STRING object_name = "input.json"

);