2.4 创建输出流
2.4.16 OBS 输出流
功能描述
创建sink流将DLI数据输出到对象存储服务(OBS)。DLI可以将作业分析结果输出到 OBS上。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、
深度/冷归档等场景。
对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服 务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请 参见《对象存储服务控制台指南》。
说明
推荐使用《文件系统输出流(推荐)》。
前提条件
OBS输出流功能仅支持输出数据到3.0版本以上的桶,请先查看桶信息确认桶的版本。
语法格式
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH (
type = "obs", region = "", encode = "", field_delimiter = "", row_delimiter = "", obs_dir = "", file_prefix = "", rolling_size = "", rolling_interval = "", quote = "", array_bracket = "", append = "",
max_record_num_per_file = "", dump_interval = "",
dis_notice_channel = "", max_record_num_cache = "", carbon_properties = ""
)
关键字
表2-22 关键字说明
参数 是否
必选
说明
type 是 输出通道类型,“obs”表示输出到对象存储服务。
region 是 对象存储服务所在区域。
ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我
的凭证。
参数 是否 必选
说明
sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问 密钥获取方式请参见我的凭证。
encode 是 编码方式。当前支持csv/json/orc/carbondata/avro/
avro_merge/parquet七种格式。
field_delimiter 否 属性分隔符。
仅当编码方式为csv时需要配置,若不配置,默认分隔符为 逗号。
row_delimiter 否 行分隔符。当编码格式为csv、json时需要设置。
json_config 否 当编码格式为json时,用户可以通过该参数来指定json字段 和流定义字段的映射关系,格式为
“field1=data_json.field1;field2=data_json.field2”。
obs_dir 是 文件存储目录。格式为{桶名}/{目录名}, 如obs-a1/dir1/
subdir。当编码格式为csv(append为false)、json
(append为false)、avro_merge、parquet时,支持参数 化。
file_prefix 否 输出文件名前缀。生成的文件会以file_prefix.x的方式命 名,如file_prefix.1、 file_prefix.2,若没有设置,默认文件 前缀为temp。此配置项不适用于carbondata文件。
rolling_size 否 单个文件最大允许大小。
说明
● rolling_size和rolling_interval必须至少配一样或者都配置。
● 当文件大小超过设置size后,会生成新文件。
● 支持的单位包括KB/MB/GB,若没写单位,表示单位为字节 数。
● 当编码格式为orc和carbondata时不需要设置。
rolling_interva
l 否 数据保存到对应目录的时间模式。
说明
● rolling_size和rolling_interval必须至少配一样或者都配置。
● 设置后数据会按照输出时间输出到相应时间目录下。
● 支持的格式为yyyy/MM/dd/HH/mm, 最小单位只到分钟,大 小写敏感。例如配置为yyyy/MM/dd/HH, 则数据会写入对应 小时这个时间点所产生的目录下,比如2018-09-10 16:40产生 的数据就会写到{obs_dir}/2018-09-10_16目录下。
● 当rolling_size和rolling_interval都配置时,表示每个时间所对 应的目录下,单个文件超过设置大小时,另起新文件。
quote 否 修饰符,仅当编码格式为csv时可配置,配置后会在每个属 性前后各加上修饰符,建议使用不可见字符配置,如
"\u0007"。
array_bracket 否 数组括号,仅当编码格式为csv时可配置, 可选值为"()",
"{}", "[]", 例如配置了"{}", 则数组输出格式为{a1,a2}。
参数 是否 必选
说明
append 否 值为true或者false,默认为true。
当OBS不支持append模式,且编码格式为csv和json时,可 将该参数设置为false。Append为false时需要设置
max_record_num_per_file和dump_interval。
max_record_n
um_per_file 否 文件最大记录数,当编码格式为csv(append为false)、
json(append为false)、orc、carbondata、avro、
avro_merge和parquet时需配置,表明一个文件最多存储 记录数,当达到最大值,则另起新文件。
dump_interval 否 触发周期, 当编码格式为orc或者配置了DIS通知提醒时需 进行配置。
● 在orc编码方式中,该配置表示周期到达时,即使文件记 录数未达到最大个数配置,也将文件上传到OBS上。
● 在DIS通知提醒功能中,该配置表示每周期往DIS发送一 个通知提醒,表明该目录已写完。
dis_notice_cha
nnel 否 OBS目录完成通知通道。表示每周期往DIS通道中发送一条 记录,该记录内容为OBS目录路径,表明该目录已书写完 毕。
max_record_n
um_cache 否 记录最大缓存数,仅当编码格式为carbondata时,可以配 置该字段,且最小值不能小于max_record_num_per_file,
默认值为max_record_num_per_file。
carbon_proper
ties 否 carbon属性,仅当编码格式为carbondata时,可以配置该 字段,值为"k1=v1, k2=v2"的形式,支持carbon-sdk中函 数withTableProperties所支持的所有配置项,另外还支持 IN_MEMORY_FOR_SORT_DATA_IN_MB和
UNSAFE_WORKING_MEMORY_IN_MB两个配置项。
encoded_data 否 当编码格式为json(append为false)、avro_merge和 parquet时,可通过配置该参数指定真正需要编码的数据,
● 将car_infos数据输出到OBS的obs-sink桶下,输出目录为car_infos, 输出文件以 greater_30作为文件名前缀,当单个文件超过100M时新起一个文件,同时数据输 出用csv编码,使用逗号作为属性分隔符,换行符作为行分隔符。
CREATE SINK STREAM car_infos ( car_id STRING,
car_owner STRING,
car_brand STRING, car_price INT, car_timestamp LONG ) WITH (
obs_dir = "obs-sink/car_infos", file_prefix = "greater_30", rolling_size = "100m"
);
● carbondata编码格式示例
CREATE SINK STREAM car_infos ( car_id STRING,
car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH (
type = "obs",
region = "cn-north-1" , encode = "carbondata",
obs_dir = "dli-append-2/carbondata", max_record_num_per_file = "1000", max_record_num_cache = "2000", dump_interval = "60",
ROLLING_INTERVAL = "yyyy/MM/dd/HH/mm", dis_notice_channel = "dis-notice",
carbon_properties = "long_string_columns=MessageBody, IN_MEMORY_FOR_SORT_DATA_IN_MB=512"
);
● orc编码格式示例
CREATE SINK STREAM car_infos ( car_id STRING,
car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH (
type = "obs",
region = "cn-north-1" , encode = "orc",
obs_dir = "dli-append-2/obsorc", FILE_PREFIX = "es_info",
max_record_num_per_file = "100000", dump_interval = "60"
);
● parquet编码示例请参考文件系统输出流(推荐)中的示例。