• 沒有找到結果。

2.4 创建输出流

2.4.15 文件系统输出流(推荐)

功能描述

创建sink流将数据输出到分布式文件系统(HDFS)或者对象存储服务(OBS)等文件系 统。数据生成后,可直接对生成的目录创建非DLI表,通过DLI SQL进行下一步处理分 析,并且输出数据目录支持分区表结构。适用于数据转储、大数据分析、备份或活跃 归档、深度或冷归档等场景。

对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服 务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请 参见《对象存储服务控制台指南》。

语法格式

CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) [PARTITIONED BY (attr_name (',' attr_name)*]

WITH (

type = "filesystem", file.path = "obs://bucket/xx", encode = "parquet", ak = "",

sk = ""

);

关键字

2-21 关键字说明

参数 是否

必选 说明

type 是 输出流类型。“type”为“filesystem”,表示输出数据到文件系 统。

file.pat

h 是 输出目录,格式为: schema://file.path。

当前schame只支持obs和hdfs。

● 当schema为obs时,表示输出到对象存储服务OBS。

● 当schema为hdfs时,表示输出到HDFS。HDFS需要配置代理用 户,具体请参考HDFS代理用户配置。

示例:hdfs://node-master1sYAx:9820/user/car_infos,其中 node-master1sYAx:9820为MRS集群NameNode所在节点信 息。

encode 是 输出数据编码格式,当前支持“parquet”格式和“csv”格式。

● 当schema为obs时,输出数据编码格式仅支持“parquet”格 式。

● 当schema为hdfs时,输出数据编码格式支持“parquet”格式 和“csv”格式。

ak 否 输出到OBS时该参数必填。用于访问OBS认证的accessKey,可使 用全局变量,屏蔽敏感信息。关于全局变量在控制台上的使用方 法,请参考《数据湖探索用户指南》。

参数 是否 必选

说明

sk 否 输出到OBS时该参数必填。用于访问OBS认证的secretKey,可使用 全局变量,屏蔽敏感信息。关于全局变量在控制台上的使用方法,

请参考《数据湖探索用户指南》。

krb_au

th 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如 果创建的MRS集群未开启kerb认证的集群,请确保在DLI队列host 文件中添加MRS集群master节点的“/etc/hosts”信息。

field_d elimite r

否 属性分隔符。

当编码格式为“csv”时,需要设置属性分隔符,用户可以自定 义,如:“,”。

注意事项

● 使用文件系统输出流的Flink作业必须开启checkpoint,保证作业的一致性。

● 为了避免数据丢失或者数据被覆盖,开启作业异常自动重启或者手动重启,需要 配置为“从checkpoint恢复”。

● checkpoint间隔设置需在输出文件实时性、文件大小和恢复时长之间进行权衡,

比如10分钟。

● checkpoint支持如下两种模式:

– AtLeastOnce:事件至少被处理一次。

– ExactlyOnce:事件仅被处理一次。

● 使用文件系统输出流写入数据到OBS时,应避免多个作业写同一个目录的情况。

– OBS对象存储桶的默认行为为覆盖写,可能导致数据丢失。

– OBS并行文件系统桶的默认行为追加写,可能导致数据混淆。

因为以上OBS桶类型行为的区别,为避免作业异常重启可能导致的数据异常问 题,请根据您的业务需求选择OBS桶类型。

HDFS 代理用户配置

1. 登录MRS管理页面。

2. 选择MRS的HDFS Namenode配置,在“自定义”中添加配置参数。

2-1 HDFS 服务配置

其中,core-site值名称“hadoop.proxyuser.myname.hosts”和

create sink stream car_infos ( carId string,

carOwner string, average_speed double, buyday string

) partitioned by (buyday) with (

type = "filesystem",

file.path = "obs://obs-sink/car_infos", encode = "parquet",

create table car_infos ( carId string,

carOwner string, average_speed double ) partitioned by (buyday string) stored as parquet

location 'obs://obs-sink/car_infos';

b. 从关联OBS路径中恢复分区信息。

alter table car_infos recover partitions;

● 示例二

该示例将car_info数据,以buyday字段为分区字段,csv为编码格式,转储数据到 HDFS。

create sink stream car_infos ( carId string,

carOwner string, average_speed double, buyday string

) partitioned by (buyday) with (

type = "filesystem",

file.path = "hdfs://node-master1sYAx:9820/user/car_infos", encode = "csv",

field_delimiter = ","

);

数据最终在HDFS中的存储目录结构为:/user/car_infos/buyday=xx/part-x-x。