3.3 使用客户端连接 MQS
3.3.4 Java 客户端使用说明
T E S T S
---Running com.mqs.consumer.MqsConsumerTest the numbers of topic:0
the numbers of topic:0 the numbers of topic:6
ConsumerRecord(topic = topic-0, partition = 2, offset = 0, CreateTime = 1557059377179, serialized key size
= -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value
= The msg is 2)
ConsumerRecord(topic = topic-0, partition = 2, offset = 1, CreateTime = 1557059377195, serialized key size
= -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value
= The msg is 5)
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
准备配置信息
为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了 SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法 连接。如果没有使用SASL认证,请注释掉相关配置。
● 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件)
以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添
#发送确认参数
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \
password="password";
#SASL鉴权方式 sasl.mechanism=PLAIN
#加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL
#ssl truststore文件的位置
ssl.truststore.location=E:\\temp\\client.truststore.jks
#ssl truststore文件的密码,固定,请勿修改。配置此密码是为了访问Java生成的jks文件。
ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
● 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件)
以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添 加。所有配置信息,如MQS连接地址、Topic名称、用户信息等,请参考 收集连 接信息获取。
#如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \
password="password";
#SASL鉴权方式 sasl.mechanism=PLAIN
#加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL
#ssl truststore文件的位置
ssl.truststore.location=E:\\temp\\client.truststore.jks
#ssl truststore文件的密码,配置此密码是为了访问Java生成的jks文件。
ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm=
生产消息
● 测试代码:
package com.mqs.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;
public class MqsProducerTest { @Test
public void testProducer() throws Exception {
MqsProducer<String, String> producer = new MqsProducer<String, String>();
int partiton = 0;
package com.mqs.producer;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class MqsProducer<K, V> {
//引入生产消息的配置信息,具体内容参考上文
public static final String CONFIG_PRODUCER_FILE_NAME = "mqs.sdk.producer.properties";
private Producer<K, V> producer;
MqsProducer(String path) {
Properties props = new Properties();
try {
InputStream in = new BufferedInputStream(new FileInputStream(path));
props.load(in);
producer = new KafkaProducer<K,V>(props);
props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);
}catch (IOException e)
public void produce(String topic, Integer partition, K key, V data) {
produce(topic, partition, key, data, null, (Callback)null);
}
public void produce(String topic, Integer partition, K key, V data, Long timestamp) {
produce(topic, partition, key, data, timestamp, (Callback)null);
}
public void produce(String topic, Integer partition, K key, V data, Callback callback) {
produce(topic, partition, key, data, null, callback);
}
public void produce(String topic, V data) {
*/
public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)
public void produce(ProducerRecord<K, V> kafkaRecord) {
produce(kafkaRecord, (Callback)null);
}
public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback) {
producer.send(kafkaRecord, callback);
}
* get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class
*
* @return classloader */
public static ClassLoader getCurrentClassLoader() {
ClassLoader classLoader = Thread.currentThread() .getContextClassLoader();
public static Properties loadFromClasspath(String configFileName) throws IOException {
ClassLoader classLoader = getCurrentClassLoader();
Properties config = new Properties();
List<URL> properties = new ArrayList<URL>();
Enumeration<URL> propertyResources = classLoader .getResources(configFileName);
while (propertyResources.hasMoreElements()) {
properties.add(propertyResources.nextElement());
}
}
package com.mqs.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.Test;
import java.util.Arrays;
public class MqsConsumerTest { @Test
public void testConsumer() throws Exception { MqsConsumer consumer = new MqsConsumer();
consumer.consume(Arrays.asList("topic-0"));
try {
package com.mqs.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.*;
public class MqsConsumer {
public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";
private KafkaConsumer<Object, Object> consumer;
MqsConsumer(String path) {
Properties props = new Properties();
try {
InputStream in = new BufferedInputStream(new FileInputStream(path));
props.load(in);
consumer = new KafkaConsumer<Object, Object>(props);
}
MqsConsumer() {
Properties props = new Properties();
try {
props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
}catch (IOException e) {
e.printStackTrace();
return;
}
consumer = new KafkaConsumer<Object, Object>(props);
}
public void consume(List topics) {
consumer.subscribe(topics);
}
public ConsumerRecords<Object, Object> poll(long timeout) {
* get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class
*
* @return classloader */
public static ClassLoader getCurrentClassLoader() {
ClassLoader classLoader = Thread.currentThread() .getContextClassLoader();
public static Properties loadFromClasspath(String configFileName) throws IOException {
ClassLoader classLoader = getCurrentClassLoader();
Properties config = new Properties();
List<URL> properties = new ArrayList<URL>();
Enumeration<URL> propertyResources = classLoader .getResources(configFileName);
while (propertyResources.hasMoreElements()) {
properties.add(propertyResources.nextElement());
}