• 沒有找到結果。

3.3 使用客户端连接 MQS

3.3.4 Java 客户端使用说明

T E S T S

---Running com.mqs.consumer.MqsConsumerTest the numbers of topic:0

the numbers of topic:0 the numbers of topic:6

ConsumerRecord(topic = topic-0, partition = 2, offset = 0, CreateTime = 1557059377179, serialized key size

= -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value

= The msg is 2)

ConsumerRecord(topic = topic-0, partition = 2, offset = 1, CreateTime = 1557059377195, serialized key size

= -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value

= The msg is 5)

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.3.0</version>

</dependency>

准备配置信息

为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了 SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法 连接。如果没有使用SASL认证,请注释掉相关配置。

● 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件)

以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添

#发送确认参数

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \

password="password";

#SASL鉴权方式 sasl.mechanism=PLAIN

#加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL

#ssl truststore文件的位置

ssl.truststore.location=E:\\temp\\client.truststore.jks

#ssl truststore文件的密码,固定,请勿修改。配置此密码是为了访问Java生成的jks文件。

ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=

● 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件)

以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添 加。所有配置信息,如MQS连接地址、Topic名称、用户信息等,请参考 收集连 接信息获取。

#如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \

password="password";

#SASL鉴权方式 sasl.mechanism=PLAIN

#加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL

#ssl truststore文件的位置

ssl.truststore.location=E:\\temp\\client.truststore.jks

#ssl truststore文件的密码,配置此密码是为了访问Java生成的jks文件。

ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=

生产消息

● 测试代码:

package com.mqs.producer;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.RecordMetadata;

import org.junit.Test;

public class MqsProducerTest { @Test

public void testProducer() throws Exception {

MqsProducer<String, String> producer = new MqsProducer<String, String>();

int partiton = 0;

package com.mqs.producer;

import java.io.BufferedInputStream;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStream;

import java.net.URL;

import java.util.ArrayList;

import java.util.Enumeration;

import java.util.List;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class MqsProducer<K, V> {

//引入生产消息的配置信息,具体内容参考上文

public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties";

private Producer<K, V> producer;

MqsProducer(String path) {

Properties props = new Properties();

try {

InputStream in = new BufferedInputStream(new FileInputStream(path));

props.load(in);

producer = new KafkaProducer<K,V>(props);

props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);

}catch (IOException e)

public void produce(String topic, Integer partition, K key, V data) {

produce(topic, partition, key, data, null, (Callback)null);

}

public void produce(String topic, Integer partition, K key, V data, Long timestamp) {

produce(topic, partition, key, data, timestamp, (Callback)null);

}

public void produce(String topic, Integer partition, K key, V data, Callback callback) {

produce(topic, partition, key, data, null, callback);

}

public void produce(String topic, V data) {

*/

public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)

public void produce(ProducerRecord<K, V> kafkaRecord) {

produce(kafkaRecord, (Callback)null);

}

public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback) {

producer.send(kafkaRecord, callback);

}

* get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class

*

* @return classloader */

public static ClassLoader getCurrentClassLoader() {

ClassLoader classLoader = Thread.currentThread() .getContextClassLoader();

public static Properties loadFromClasspath(String configFileName) throws IOException {

ClassLoader classLoader = getCurrentClassLoader();

Properties config = new Properties();

List<URL> properties = new ArrayList<URL>();

Enumeration<URL> propertyResources = classLoader .getResources(configFileName);

while (propertyResources.hasMoreElements()) {

properties.add(propertyResources.nextElement());

}

}

package com.mqs.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.junit.Test;

import java.util.Arrays;

public class MqsConsumerTest { @Test

public void testConsumer() throws Exception { MqsConsumer consumer = new MqsConsumer();

consumer.consume(Arrays.asList("topic-0"));

try {

package com.mqs.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.BufferedInputStream;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStream;

import java.net.URL;

import java.util.*;

public class MqsConsumer {

public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";

private KafkaConsumer<Object, Object> consumer;

MqsConsumer(String path) {

Properties props = new Properties();

try {

InputStream in = new BufferedInputStream(new FileInputStream(path));

props.load(in);

consumer = new KafkaConsumer<Object, Object>(props);

}

MqsConsumer() {

Properties props = new Properties();

try {

props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);

}catch (IOException e) {

e.printStackTrace();

return;

}

consumer = new KafkaConsumer<Object, Object>(props);

}

public void consume(List topics) {

consumer.subscribe(topics);

}

public ConsumerRecords<Object, Object> poll(long timeout) {

* get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class

*

* @return classloader */

public static ClassLoader getCurrentClassLoader() {

ClassLoader classLoader = Thread.currentThread() .getContextClassLoader();

public static Properties loadFromClasspath(String configFileName) throws IOException {

ClassLoader classLoader = getCurrentClassLoader();

Properties config = new Properties();

List<URL> properties = new ArrayList<URL>();

Enumeration<URL> propertyResources = classLoader .getResources(configFileName);

while (propertyResources.hasMoreElements()) {

properties.add(propertyResources.nextElement());

}