• 沒有找到結果。

3.3 使用客户端连接 MQS

3.3.3 Java 开发环境搭建

基于收集连接信息的介绍,假设您已经获取了实例连接相关的信息,以及配置好客户 端的网络环境。本章节以生产与发送消息的Demo为例,介绍Kafka客户端的环境配 置。

开发环境准备

● Maven:

获取并安装Apache Maven 3.0.3及以上版本,可至Maven官方下载页面下载。

● JDK:

获取并安装Java Development Kit 1.8.111及以上版本,可至Oracle官方下载页面 下载。

安装后注意配置Java的环境变量。

● 获取并安装2018.3.5或以上版本的IntelliJ IDEA,可至IntelliJ IDEA官方网站下 载。

操作步骤

步骤1 下载Demo包。

在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击右上角的

“用户指南 > 下载Kafka客户端 Java Demo包”下载Demo。

解压后,有如下文件:

3-3 Kafka Demo 文件清单

文件名 路径 说明

MqsConsumer.java .\src\main\java\com

\mqs\consumer 消费消息的API。

MqsProducer.java .\src\main\java\com

\mqs\producer 生产消息的API。

mqs.sdk.consumer.

properties .\src\main\resources 消费消息的配置信息。

mqs.sdk.producer.p

roperties .\src\main\resources 生产消息的配置信息。

client.truststore.jks .\src\main\resources SSL证书,用于SASL方式连接。

MqsConsumerTest.j

ava .\src\test\java\com\mqs

\consumer 消费消息的测试代码。

MqsProducerTest.ja

va .\src\test\java\com\mqs

\producer 生产消息的测试代码。

pom.xml .\ maven配置文件,包含Kafka客户

端引用。

步骤2 打开IntelliJ IDEA,导入Demo。

Demo是一个Maven构建的Java工程,因此需要配置JDK环境,以及IDEA的Maven插 件。

3-1 选择“导入工程”

3-2 选择“Maven”

3-3 选择 Java 环境

其他选项可默认或自主选择。然后单击Finish,完成Demo导入。

导入后Demo工程如下:

步骤3 配置Maven路径。

打开“File > Settings”,找到“Maven home directory”信息项,选择正确的Maven 路径,以及Maven所需的settings.xml文件。

步骤4 修改客户端配置信息。

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \

password="password";

#SASL鉴权方式 sasl.mechanism=PLAIN

#加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL

#ssl truststore文件的位置,证书文件在demo包的\src\main\resources路径下。

ssl.truststore.location=E:\\temp\\client.truststore.jks

#ssl truststore文件的密码,配置此密码是为了访问Java生成的jks文件。

ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=

步骤5 打开IDEA的Terminal窗口,执行mvn test命令体验demo。

Terminal窗口默认在IDEA工具的左下角: ---Running com.mqs.producer.MqsProducerTest produce msg:The msg is 0

produce msg:The msg is 1 produce msg:The msg is 2 produce msg:The msg is 3 produce msg:The msg is 4 produce msg:The msg is 5

produce msg:The msg is 6 produce msg:The msg is 7 produce msg:The msg is 8 produce msg:The msg is 9

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 138.877 sec

消费消息会得到以下回显信息:

T E S T S

---Running com.mqs.consumer.MqsConsumerTest the numbers of topic:0

the numbers of topic:0 the numbers of topic:6

ConsumerRecord(topic = topic-0, partition = 2, offset = 0, CreateTime = 1557059377179, serialized key size

= -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value

= The msg is 2)

ConsumerRecord(topic = topic-0, partition = 2, offset = 1, CreateTime = 1557059377195, serialized key size

= -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value

= The msg is 5)

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.3.0</version>

</dependency>

准备配置信息

为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了 SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法 连接。如果没有使用SASL认证,请注释掉相关配置。

● 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件)

以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添