• 沒有找到結果。

1.6 使用 SDK(Java)

1.6.13 下载流式数据

背景信息

下载流式数据,需要确定从分区的什么位置开始获取(即获取游标)。确定起始位置 后,再循环获取数据。

获取游标有如下五种方式:

● AT_SEQUENCE_NUMBER

● AFTER_SEQUENCE_NUMBER

● TRIM_HORIZON

● LATEST

● AT_TIMESTAMP

为更好理解游标类型,您需要了解如下几个基本概念。

● 序列号(sequenceNumber),每个记录的唯一标识符。序列号由DIS在数据生产者 调用PutRecord操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键 的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越 大。

● 每个分区的sequenceNumber从0开始持续增长,每条数据对应唯一的

sequenceNumber,超过生命周期后此sequenceNumber将过期不可用。(例如上 传一条数据到新分区,其sequenceNumber起始为0,上传100条之后,则最后一 条的sequenceNumber为99;如超过生命周期之后,0~99的数据则不可用)

● 分区的数据有效范围可以通过调用describeStream(查询通道详情)接口获取,其 sequenceNumberRange代表数据有效范围,第一个值为最老数据的

sequenceNumber,最后一个值为下一条上传数据的sequenceNumber(最新数据 的sequenceNumber为此值-1)

例如[100, 200],表示此分区总共上传了200条数据,其中第0~99条已过期,有效 的最老数据为100,最新数据为199,下一条上传数据的sequenceNumber为 始sequenceNumber 场景,例如已知需 要从哪一条数据开 始消费。

与序列号(sequenceNumb er)和分区数据有 效范围sequenceNumber Range【A,B】强 相关。

指定的sequenceNumber 应满足如下条件:

A<=sequenceNum ber<=B

AFTER_SEQUENCE

_NUMBER 从特定序列号(即

与序列号(sequenceNumb er)和分区数据有 效范围sequenceNumber Range【A,B】强 相关。

指定的sequenceNumber 应满足如下条件:

(A-1)<=sequence Number<=(B-1)

游标类型(CursorType) 说明 适用场景 备注

TRIM_HORIZON 从分区最老的数据 开始消费,即读取 分区内所有有效数 据。

例如分区数据有效 范围为[100, 200], 则会从100开始消

LATEST 从分区最新的数据 之后开始消费,即

AT_TIMESTAMP 指定一个时间戳,

会从此时间戳上传 C,则timestamp>=c 即可

● 若timestamp大 于最新一条数据

● 下载数据方式选择AT_SEQUENCE_NUMBER和AFTER_SEQUENCE_NUMBER时,

样例代码示例如下:

//初始化DIS客户端实例

DIS dic = DISClientBuilder.standard()

.withEndpoint("xxxx")

String streamName = "streamName";

// 配置数据下载分区ID

String partitionId = "shardId-0000000000";

// 配置下载数据序列号

String startingSequenceNumber = "0";

// 配置下载数据方式

// AT_SEQUENCE_NUMBER: 从指定的sequenceNumber开始获取,需要设置StartingSequenceNumber // AFTER_SEQUENCE_NUMBER: 从指定的sequenceNumber之后开始获取,需要设置StartingSequenceNumber String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name();

try {

// 获取数据游标

GetPartitionCursorRequest request = new GetPartitionCursorRequest();

request.setStreamName(streamName);

request.setPartitionId(partitionId);

request.setCursorType(cursorType);

request.setStartingSequenceNumber(startingSequenceNumber);

GetPartitionCursorResult response = dic.getPartitionCursor(request);

String cursor = response.getPartitionCursor();

LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);

GetRecordsRequest recordsRequest = new GetRecordsRequest();

GetRecordsResult recordResponse = null;

while (true)

catch (DISClientException e) {

LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(),

● 下载数据方式选择TRIM_HORIZON和 LATEST时,样例代码示例如下,参见加粗 代码行,基于demo注释掉startingSequenceNumber字段。

//初始化DIS客户端实例

DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx") .withAk("xxxx") .withSk("xxxx") .withProjectId("xxxx")

.withRegion("xxxx") .build();

// 配置通道名称

String streamName = "streamName";

// 配置数据下载分区ID

String partitionId = "shardId-0000000000";

// 配置下载数据序列号

//String startingSequenceNumber = "0";

// 配置下载数据方式

// TRIM_HORIZON: 从最早被存储至分区的有效记录开始读取。

//LATEST:从分区中的最新记录开始读取,此设置可以保证你总是读到分区中最新记录。

String cursorType = PartitionCursorTypeEnum.TRIM_HORIZON.name();

try {

// 获取数据游标

GetPartitionCursorRequest request = new GetPartitionCursorRequest();

request.setStreamName(streamName);

request.setPartitionId(partitionId);

request.setCursorType(cursorType);

//request.setStartingSequenceNumber(startingSequenceNumber);

GetPartitionCursorResult response = dic.getPartitionCursor(request);

String cursor = response.getPartitionCursor();

LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);

GetRecordsRequest recordsRequest = new GetRecordsRequest();

GetRecordsResult recordResponse = null;

while (true)

LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(),

● 下载数据方式选择TAT_TIMESTAMP时,样例代码示例如下,基于demo增加 timestamp字段,添加如下加粗行代码。

//初始化DIS客户端实例,其中,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见。

DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx") .withAk("xxxx") .withSk("xxxx") .withProjectId("xxxx")

.withRegion("xxxx") .build();

// 配置通道名称

String streamName = "streamName";

// 配置数据下载分区ID

String partitionId = "shardId-0000000000";

// 配置下载数据序列号

//String startingSequenceNumber = "0";

//配置时间戳

long timestamp = 1542960693804L;

// 配置下载数据方式

// AT_TIMESTAMP: 从特定时间戳(即timestamp定义的时间戳)开始读取。

String cursorType = PartitionCursorTypeEnum.AT_TIMESTAMP.name();

try {

// 获取数据游标

GetPartitionCursorRequest request = new GetPartitionCursorRequest();

request.setStreamName(streamName);

request.setPartitionId(partitionId);

request.setCursorType(cursorType);

//request.setStartingSequenceNumber(startingSequenceNumber);

request.setTimestamp(timestamp);

GetPartitionCursorResult response = dic.getPartitionCursor(request);

String cursor = response.getPartitionCursor();

LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);

GetRecordsRequest recordsRequest = new GetRecordsRequest();

GetRecordsResult recordResponse = null;

LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(),

参数说明

1-4 参数说明

参数名 参数类型 说明

partitionId String 分区ID。

说明

请根据上传流式数据的执行结果,控制台的返回信息字段,

例如 “partitionId [shardId-0000000000]”进行定义。

startingSequen

ceNumber String 序列号。序列号是每个记录的唯一标识符。序列号由 DIS在数据生产者调用PutRecords操作以添加数据到

例如“sequenceNumber [1]”进行定义。

cursorType String 游标类型。

● AT_SEQUENCE_NUMBER:从特定序列号(即 startingSequenceNumber定义的序列号)所在的 记录开始读取数据。此类型为默认游标类型。

● AFTER_SEQUENCE_NUMBER:从特定序列号(即 startingSequenceNumber定义的序列号)后的记 录开始读取数据。

● TRIM_HORIZON:从最早被存储至分区的有效记 录开始读取。

● LATEST:从分区中的最新记录开始读取,此设置可 以保证你总是读到分区中最新记录。

● AT_TIMESTAMP:从特定时间戳(即timestamp定 义的时间戳)开始读取。

运行程序

右键选择“Run As > 1 Java Application”运行程序,若程序运行成功,可以在控制台 查看到类似如下信息:

14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader

14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader 14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2 14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream

streamName[partitionId=0] cursor success :

eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY 3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4O TcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ

14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.],

partitionKey [964885], sequenceNumber [0].

14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1].

14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2].

1-5 参数说明

参数名 参数类型 说明

partition_key String 用户上传数据时设置的partition_key。

说明上传数据时,如果传了partition_key参数,则下载数据时可 返回此参数。如果上传数据时,未传partition_key参数,而 是传入partition_id,则不返回partition_key。

startingSequen

ceNumber String 序列号。序列号是每个记录的唯一标识符。序列号由 DIS在数据生产者调用PutRecords操作以添加数据到

DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx")

String streamName = "xxxx";

// 配置上传的数据

String message = "hello world.";

PutRecordsRequest putRecordsRequest = new PutRecordsRequest();

putRecordsRequest.setStreamName(streamName);

List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>();

ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());

for (int i = 0; i < 3; i++) {

PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();

putRecordsRequestEntry.setData(buffer);

putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));

相關文件