• 沒有找到結果。

2.3 创建输入流

2.3.2 DIS 输入流

数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用 程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接 入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击 流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数

据接入服务用户指南》。

语法格式

CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH (

(TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

关键字

参数 是否 必选

说明

region 是 数据所在的DIS区域。

ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭

证。

sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥 获取方式请参见我的凭证。

channel 是 数据所在的DIS通道名称。

partition_

count 否 数据所在的DIS通道分区数。该参数和partition_range参数不能 同时配置。当该参数没有配置的时候默认读取所有partition。

partition_

range 否 指定作业从DIS通道读取的分区范围。该参数和partition_count 参数不能同时配置。当该参数没有配置的时候默认读取所有 partition。

partition_range = "[2:5]"时,表示读取的分区范围是2-5,包 括分区2和分区5。

encode 是 数据编码格式,可选为“csv”、“json”、“xml”、

“email”、“blob”和“user_defined”。

● 若编码格式为“csv”,则需配置“field_delimiter”属性。

● 若编码格式为“json”,则需配置“json_config”属性。

● 若编码格式为“xml”,则需配置“xml_config”属性。

● 若编码格式为“email”,则需配置“email_key”属性。

● 若编码格式为“blob”,表示不对接收的数据进行解析,流 属性仅能有一个且数据格式为ARRAY[TINYINT]。

● 若编码格式为“user_defined”,则需配置

“encode_class_name”和“encode_class_parameter”属

● 当引用符号为双引号时,请设置quote = "\u005c\u0022"进 行转义。

g 否 当编码格式为json时,用户需要通过该参数来指定json字段和 流定义字段的映射关系,格式为“field1=data_json.field1;

field2=data_json.field2; field3=$”,其中field3=$表示field3 的内容为整个json串。

参数 是否

email_key 否 当编码格式为email时,用户需要通过该参数来指定需要提取的 信息,需要列出信息的key值,需要与流定义字段一一对应,多 个key值时以逗号分隔,例如“Message-ID, Date, Subject, body”,其中由于邮件正文没有关键字,DLI规定其关键字为

“body”。

encode_cl

ass_name 否 当encode为user_defined时,需配置该参数,指定用户自实现 解码类的类名(包含完整包路径),该类需继承类

DeserializationSchema。

encode_cl ass_para meter

否 当encode为user_defined时,可以通过配置该参数指定用户自 实现解码类的入参,仅支持一个string类型的参数。

offset 否 ● 当启动作业后再获取数据,则该参数无效。

● 当获取数据后再启动作业,用户可以根据需求设置该参数的 数值。例如当offset= "100"时,则表示DLI从DIS服务中的第100条 数据开始处理。

start_time 否 DIS数据读取起始时间。

● 当该参数配置时则从配置的时间开始读取数据,有效格式为 yyyy-MM-dd HH:mm:ss。

● 当没有配置start_time也没配置offset的时候,读取最新数 据。

● 当没有配置start_time但配置了offset的时候,则从offset开 始读取数据。

timeindic

ator 否 在流中增加时间戳,可增加“processing time”时间戳或者

“event time”时间戳。

说明

● 若设置“processing time”,则为proctime.proctime。当设置了 proctime.proctime时,会在原有属性字段基础上多增加一个 proctime系统时间戳属性,假设原有字段为3个,设置了

proctime.proctime后会变成4个,设置rowtime属性字段不会发生变 化。

● 若设置“event time”,可选择流中的某个属性来作为时间戳,格 式为attr_name.rowtime。

● 以上两者可以同时设置。

enable_ch

eckpoint 否 是否启用checkpoint功能,可配置为true(启用)或者false

(停用), 默认为false。

参数 是否 必选

说明

checkpoin

t_interval 否 DIS源算子做checkpoint的时间间隔,单位秒,默认为60。

注意事项

用来做时间戳的属性类型必须为long或者timestamp。

示例

● CSV编码格式:从DIS通道读取数据,记录为csv编码,并且以逗号为分隔符。

CREATE SOURCE STREAM car_infos ( car_id STRING,

car_owner STRING, car_age INT, average_speed INT, total_miles INT, car_timestamp LONG ) WITH (

● JSON编码格式:从DIS通道读取数据,记录为json编码。数据示例:{"car":

{"car_id":"ZJA710XC", "car_owner":"coco", "car_age":5, "average_speed":80,

"total_miles":15000, "car_timestamp":1526438880}}。

CREATE SOURCE STREAM car_infos ( car_id STRING,

car_owner STRING, car_age INT, average_speed INT, total_miles INT, car_timestamp LONG ) WITH (

type = "dis",

region = "cn-north-1" , channel = "dliinput", encode = "json",

json_config = "car_id=car.car_id;car_owner =car.car_owner;car_age=car.car_age;average_speed

=car.average_speed ;total_miles=car.total_miles;"

);

● XML编码格式:从DIS通道读取数据,记录为xml编码。

CREATE SOURCE STREAM person_infos ( pid BIGINT, channel = "dis-dli-input",

encode = "xml",

<?xml version="1.0" encodeing="utf-8"?>

<root>

<person>

<pid>362305199010025042</pid>

<pname>xiaoming</pname>

<page>28</page>

<plocation>内蒙古,乌兰察布市,集宁区,商都县</plocation>

<pbir>1990-10-02</pbir>

<phealthy>true</phealthy>

<pgrade>[A,B,C]</pgrade>

</person>

</root>

● EMAIL编码格式:从DIS通道读取数据,每条记录为一封完整邮件。

CREATE SOURCE STREAM email_infos ( Event_ID String,

Event_Time Date, Subject String, From_Email String, To_EMAIL String, CC_EMAIL Array[String], BCC_EMAIL String, MessageBody String, Mime_Version String, Content_Type String, charset String,

Content_Transfer_Encoding String ) WITH (

type = "dis", region = "cn-north-1" , channel = "dliinput", encode = "email",

email_key = "Message-ID, Date, Subject, From, To, CC, BCC, Body, Mime-Version, Content-Type, charset, Content_Transfer_Encoding"

);

email数据示例如下:

Message-ID: <[email protected]>

Date: Fri, 11 May 2001 09:54:00 -0700 (PDT) From: [email protected]

To: [email protected], [email protected] Subject: "Hello World"

Cc: [email protected], [email protected] Mime-Version: 1.0

Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit

Bcc: [email protected], [email protected] X-From: Zhang San

X-To: Li Si, Wang Wu X-cc: Li Lei, Han Mei X-bcc:

X-Folder: \Li_Si_June2001\Notes Folders\Notes inbox X-Origin: Lucy

X-FileName: sample.nsf

Dear Associate / Analyst Committee:

Hello World!

Thank you,

Associate / Analyst Program zhangsan