• 沒有找到結果。

3.4 使用 RESTful API 连接 MQS

3.4.1 Java Demo 使用说明

除了前面章节介绍的使用原生Kafka客户端,MQS实例还可以通过HTTP RESTful方式 访问,包括向指定Topic发送消息、消费消息以及确认消费。

这种方式主要用于适配原有业务系统架构,方便统一使用HTTP协议接入。

如何使用

1. 收集连接信息

包括MQS连接地址与端口、Topic名称、SASL用户名与密码。具体请参考收集连 接信息。

说明

● 若ROMA Connect实例的消息集成在开启SASL_SSL的同时,也开启了VPC内网明文访 问,则VPC内无法使用SASL方式连接消息集成的Topic。

● 使用SASL方式连接消息集成的Topic时,建议在客户端所在主机的“/etc/hosts”文件中 配置host和IP的映射关系,否则会引入时延。

其中,IP地址必须为消息集成的连接地址,host为每个实例主机的名称,可以自定义,

但不能重复。例如:

10.10.10.11 host01 10.10.10.12 host02 10.10.10.13 host03

2. 参考示例代码,组装API请求,包括对API请求的签名。

对API请求签名,指使用SASL的用户名与密码作为密钥对,将请求URL、消息头时 间戳等内容进行签名,供后端服务进行校验。点此了解签名流程

3. 使用Demo向指定Topic生产消息、消费消息和确认消息时,返回的响应消息结构 请参考生产消息接口说明、消费消息接口说明和消费确认接口说明。

示例工程搭建

本指南提供了Java语言的RESTful API请求发送示例。

示例为一个在IntelliJ IDEA工具中开发的Maven工程,因此,您如果在本地环境使用,

请先安装并配置以下环境(以Windows 10系统为例):

● Maven:

Apache Maven 3.0.3及以上版本,可至Maven官方下载页面下载。

● JDK:

Java Development Kit 1.8.111及以上版本,可至Oracle官方下载页面下载。

安装后注意配置JAVA的环境变量。

● IntelliJ IDEA工具:

IntelliJ IDEA 2018.3.5及以上版本,可至IntelliJ IDEA官方网站下载。

● Demo:

在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击右上 角的“用户指南 > 下载RESTful API Java Demo包”下载Demo。

步骤1 打开IntelliJ IDEA,在菜单栏选择“Import Project”。

弹出“Select File or Directory to Import”对话框。

步骤2 在弹出的对话框中选择解压后的RESTful API Java Demo路径,单击“OK”。

步骤3 “Import project from external model”选择“Eclipse”,单击“Next”,进入下一 页后保持默认连续单击“Next”,直到“Please select project SDK”页面。

3-7 Import Project

步骤4 单击“Finish”,完成工程导入。

3-8 Finish

步骤5 编辑rest-config.properties

文件在src/main/resources目录下。将获取到的MQS实例连接地址、Topic名称,以及 SASL信息填写到下述配置中。其中参数kafka.rest.group为消费组ID,可在客户端指 定。

# Kafka rest endpoint.

kafka.rest.endpoint=https://{MQS_Instance_IP_Addr}:9292

# Kafka topic name.

kafka.rest.topic=topic_name_demo

# Kafka consume group.

kafka.rest.group=group_id_demo

# Kafka sasl username.

kafka.rest.username=sasl_username_demo

# Kafka sasl password.

kafka.rest.password=sasl_user_passwd_demo

步骤6 编辑log4j.properties 修改日志存储目录:

log.directory=D://workspace/logs

步骤7 运行示例工程,查看消息生产与消费样例。

消息生成与消费的Main方法在RestMain.java中,以Java Application的方式运行即 可。

----结束

示例代码解读

● 工程入口:

工程入口在RestMain.java文件中。

public class RestMain

{ private static final Logger LOGGER = LoggerFactory.getLogger(RestMain.class);

public static void main(String[] args) throws InterruptedException {

//初始化请求对象。在RestServiceImpl类文件中还包含RESTful API组装,以及对请求签名 IRestService restService = new RestServiceImpl();

Base64.Decoder decoder = Base64.getDecoder();

//以下分别为生产消息、消费消息与消费确认 // Produce message

ProduceReq messages = new ProduceReq();

messages.addMessage("{[{'id': '00001', 'name': 'John'}, {'id': '00002', 'name':

'Mike'}]}").addMessage("Kafka rest client demo!");

LOGGER.debug("produce message: {}", JsonUtils.convertObject2Str(messages));

restService.produce(messages);

// Consume message

List<ConsumeResp> consumeResps = restService.consume();

CommitReq commitReq = new CommitReq();

LOGGER.info("Commit resp: success: {}, failed: {}", resp.getSuccess(), resp.getFail());

}

public List<ProduceResp> produce(ProduceReq messages) {

request.setBody(JsonUtils.convertObject2Str(messages));

//对请求内容签名,签名后,请求头部参数会新增两个参数:Authorization和X-Sdk-Date,

HttpResponse response = HttpUtils.execute(signedRequest);

if (response.getStatusLine().getStatusCode() == HttpStatus.SC_CREATED)

{

String jsonStr = EntityUtils.toString(response.getEntity(), "UTF-8");

prodResp = JsonUtils.convertStr2ListObject(jsonStr, new TypeReference<List<ProduceResp>>() { });

LOGGER.info("Produce response: {}", jsonStr);

return prodResp;

} else {

LOGGER.error("Produce message failed. statusCode: {}, error msg: {}", response.getStatusLine().getStatusCode(),

EntityUtils.toString(response.getEntity(), "UTF-8"));

} }

catch (Exception e) {

LOGGER.error("Produce message failed.");

}

return prodResp;

}