1.6 使用 SDK(Java)
1.6.13 下载流式数据
背景信息
下载流式数据,需要确定从分区的什么位置开始获取(即获取游标)。确定起始位置 后,再循环获取数据。
获取游标有如下五种方式:
● AT_SEQUENCE_NUMBER
● AFTER_SEQUENCE_NUMBER
● TRIM_HORIZON
● LATEST
● AT_TIMESTAMP
为更好理解游标类型,您需要了解如下几个基本概念。
● 序列号(sequenceNumber),每个记录的唯一标识符。序列号由DIS在数据生产者 调用PutRecord操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键 的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越 大。
● 每个分区的sequenceNumber从0开始持续增长,每条数据对应唯一的
sequenceNumber,超过生命周期后此sequenceNumber将过期不可用。(例如上 传一条数据到新分区,其sequenceNumber起始为0,上传100条之后,则最后一 条的sequenceNumber为99;如超过生命周期之后,0~99的数据则不可用)
● 分区的数据有效范围可以通过调用describeStream(查询通道详情)接口获取,其 sequenceNumberRange代表数据有效范围,第一个值为最老数据的
sequenceNumber,最后一个值为下一条上传数据的sequenceNumber(最新数据 的sequenceNumber为此值-1)
例如[100, 200],表示此分区总共上传了200条数据,其中第0~99条已过期,有效 的最老数据为100,最新数据为199,下一条上传数据的sequenceNumber为 始sequenceNumber 场景,例如已知需 要从哪一条数据开 始消费。
与序列号(sequenceNumb er)和分区数据有 效范围sequenceNumber Range【A,B】强 相关。
指定的sequenceNumber 应满足如下条件:
A<=sequenceNum ber<=B
AFTER_SEQUENCE
_NUMBER 从特定序列号(即
与序列号(sequenceNumb er)和分区数据有 效范围sequenceNumber Range【A,B】强 相关。
指定的sequenceNumber 应满足如下条件:
(A-1)<=sequence Number<=(B-1)
游标类型(CursorType) 说明 适用场景 备注
TRIM_HORIZON 从分区最老的数据 开始消费,即读取 分区内所有有效数 据。
例如分区数据有效 范围为[100, 200], 则会从100开始消
LATEST 从分区最新的数据 之后开始消费,即
AT_TIMESTAMP 指定一个时间戳,
会从此时间戳上传 C,则timestamp>=c 即可
● 若timestamp大 于最新一条数据
● 下载数据方式选择AT_SEQUENCE_NUMBER和AFTER_SEQUENCE_NUMBER时,
样例代码示例如下:
//初始化DIS客户端实例
DIS dic = DISClientBuilder.standard()
.withEndpoint("xxxx")
String streamName = "streamName";
// 配置数据下载分区ID
String partitionId = "shardId-0000000000";
// 配置下载数据序列号
String startingSequenceNumber = "0";
// 配置下载数据方式
// AT_SEQUENCE_NUMBER: 从指定的sequenceNumber开始获取,需要设置StartingSequenceNumber // AFTER_SEQUENCE_NUMBER: 从指定的sequenceNumber之后开始获取,需要设置StartingSequenceNumber String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name();
try {
// 获取数据游标
GetPartitionCursorRequest request = new GetPartitionCursorRequest();
request.setStreamName(streamName);
request.setPartitionId(partitionId);
request.setCursorType(cursorType);
request.setStartingSequenceNumber(startingSequenceNumber);
GetPartitionCursorResult response = dic.getPartitionCursor(request);
String cursor = response.getPartitionCursor();
LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
GetRecordsRequest recordsRequest = new GetRecordsRequest();
GetRecordsResult recordResponse = null;
while (true)
catch (DISClientException e) {
LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(),
● 下载数据方式选择TRIM_HORIZON和 LATEST时,样例代码示例如下,参见加粗 代码行,基于demo注释掉startingSequenceNumber字段。
//初始化DIS客户端实例
DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx") .withAk("xxxx") .withSk("xxxx") .withProjectId("xxxx")
.withRegion("xxxx") .build();
// 配置通道名称
String streamName = "streamName";
// 配置数据下载分区ID
String partitionId = "shardId-0000000000";
// 配置下载数据序列号
//String startingSequenceNumber = "0";
// 配置下载数据方式
// TRIM_HORIZON: 从最早被存储至分区的有效记录开始读取。
//LATEST:从分区中的最新记录开始读取,此设置可以保证你总是读到分区中最新记录。
String cursorType = PartitionCursorTypeEnum.TRIM_HORIZON.name();
try {
// 获取数据游标
GetPartitionCursorRequest request = new GetPartitionCursorRequest();
request.setStreamName(streamName);
request.setPartitionId(partitionId);
request.setCursorType(cursorType);
//request.setStartingSequenceNumber(startingSequenceNumber);
GetPartitionCursorResult response = dic.getPartitionCursor(request);
String cursor = response.getPartitionCursor();
LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
GetRecordsRequest recordsRequest = new GetRecordsRequest();
GetRecordsResult recordResponse = null;
while (true)
LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(),
● 下载数据方式选择TAT_TIMESTAMP时,样例代码示例如下,基于demo增加 timestamp字段,添加如下加粗行代码。
//初始化DIS客户端实例,其中,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见。
DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx") .withAk("xxxx") .withSk("xxxx") .withProjectId("xxxx")
.withRegion("xxxx") .build();
// 配置通道名称
String streamName = "streamName";
// 配置数据下载分区ID
String partitionId = "shardId-0000000000";
// 配置下载数据序列号
//String startingSequenceNumber = "0";
//配置时间戳
long timestamp = 1542960693804L;
// 配置下载数据方式
// AT_TIMESTAMP: 从特定时间戳(即timestamp定义的时间戳)开始读取。
String cursorType = PartitionCursorTypeEnum.AT_TIMESTAMP.name();
try {
// 获取数据游标
GetPartitionCursorRequest request = new GetPartitionCursorRequest();
request.setStreamName(streamName);
request.setPartitionId(partitionId);
request.setCursorType(cursorType);
//request.setStartingSequenceNumber(startingSequenceNumber);
request.setTimestamp(timestamp);
GetPartitionCursorResult response = dic.getPartitionCursor(request);
String cursor = response.getPartitionCursor();
LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
GetRecordsRequest recordsRequest = new GetRecordsRequest();
GetRecordsResult recordResponse = null;
LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(),
参数说明
表1-4 参数说明
参数名 参数类型 说明
partitionId String 分区ID。
说明
请根据上传流式数据的执行结果,控制台的返回信息字段,
例如 “partitionId [shardId-0000000000]”进行定义。
startingSequen
ceNumber String 序列号。序列号是每个记录的唯一标识符。序列号由 DIS在数据生产者调用PutRecords操作以添加数据到
例如“sequenceNumber [1]”进行定义。
cursorType String 游标类型。
● AT_SEQUENCE_NUMBER:从特定序列号(即 startingSequenceNumber定义的序列号)所在的 记录开始读取数据。此类型为默认游标类型。
● AFTER_SEQUENCE_NUMBER:从特定序列号(即 startingSequenceNumber定义的序列号)后的记 录开始读取数据。
● TRIM_HORIZON:从最早被存储至分区的有效记 录开始读取。
● LATEST:从分区中的最新记录开始读取,此设置可 以保证你总是读到分区中最新记录。
● AT_TIMESTAMP:从特定时间戳(即timestamp定 义的时间戳)开始读取。
运行程序
右键选择“Run As > 1 Java Application”运行程序,若程序运行成功,可以在控制台 查看到类似如下信息:
14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader
14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader 14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2 14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream
streamName[partitionId=0] cursor success :
eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY 3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4O TcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.],
partitionKey [964885], sequenceNumber [0].
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1].
14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2].
表1-5 参数说明
参数名 参数类型 说明
partition_key String 用户上传数据时设置的partition_key。
说明上传数据时,如果传了partition_key参数,则下载数据时可 返回此参数。如果上传数据时,未传partition_key参数,而 是传入partition_id,则不返回partition_key。
startingSequen
ceNumber String 序列号。序列号是每个记录的唯一标识符。序列号由 DIS在数据生产者调用PutRecords操作以添加数据到
DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx")
String streamName = "xxxx";
// 配置上传的数据
String message = "hello world.";
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(streamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>();
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
for (int i = 0; i < 3; i++) {
PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
putRecordsRequestEntry.setData(buffer);
putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));