• 沒有找到結果。

3.3 使用客户端连接 MQS

3.3.6 Go 客户端使用说明

print('start consumer')

consumer = KafkaConsumer(conf['topic_name'],

bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'])

for message in consumer:

print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

print('end consumer')

3.3.6 Go 客户端使用说明

本文以Linux CentOS环境为例,介绍Go 1.16.5版本的Kafka客户端连接指导,包括 demo代码库的获取,以及生产、消费消息。

下文所有配置信息,如MQS连接地址、Topic名称、用户信息等,请参考收集连接信息 获取。

准备环境

● 执行以下命令,检查是否已安装Go。

go version

返回如下回显时,说明Go已经安装。

[root@ecs-test sarama]# go version go version go1.16.5 linux/amd64

如果未安装Go,请下载并安装。

● 执行以下命令,获取demo需要的代码库。

go get github.com/confluentinc/confluent-kafka-go/kafka

生产消息

说明

以下加粗内容需要替换为实例自有信息,请根据实际情况替换。

● SASL认证方式

package main import ( "bufio"

"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"

"log"

"os"

"os/signal"

"syscall"

) var (

brokers = "ip1:port1,ip2:port2,ip3:port3"

topics = "topic_name"

log.Println("Starting a new kafka producer") config := &kafka.ConfigMap{

"bootstrap.servers": brokers, "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": user, "sasl.password": password, "ssl.ca.location": caFile, }

producer, err := kafka.NewProducer(config) if err != nil {

// Produce messages to topic (asynchronously) fmt.Println("please enter message:")

go func() { for {

err := producer.Produce(&kafka.Message{

TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny}, Value: GetInput(),

signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select {

case <-sigterm:

log.Println("terminating: via signal") }

// Wait for message deliveries before shutting down producer.Flush(15 * 1000)

producer.Close() }

func GetInput() []byte {

reader := bufio.NewReader(os.Stdin) data, _, _ := reader.ReadLine() return data

}

● 非SASL认证方式

package main import ( "bufio"

"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"

"log"

brokers = "ip1:port1,ip2:port2,ip3:port3"

topics = "topic_name"

)

func main() {

log.Println("Starting a new kafka producer") config := &kafka.ConfigMap{

"bootstrap.servers": brokers, }

producer, err := kafka.NewProducer(config) if err != nil {

// Produce messages to topic (asynchronously) fmt.Println("please enter message:")

go func() { for {

err := producer.Produce(&kafka.Message{

TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny}, Value: GetInput(),

signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select {

case <-sigterm:

log.Println("terminating: via signal") }

// Wait for message deliveries before shutting down producer.Flush(15 * 1000)

producer.Close() }

func GetInput() []byte {

reader := bufio.NewReader(os.Stdin) data, _, _ := reader.ReadLine()

package main import ( "fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"

"log"

brokers = "ip1:port1,ip2:port2,ip3:port3"

group = "group-id"

log.Println("Starting a new kafka consumer") config := &kafka.ConfigMap{

"bootstrap.servers": brokers, "group.id": group, "auto.offset.reset": "earliest", "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": user, "sasl.password": password, "ssl.ca.location": caFile, }

consumer, err := kafka.NewConsumer(config) if err != nil {

log.Panicf("Error creating consumer: %v", err) return

}

err = consumer.SubscribeTopics([]string{topics}, nil) if err != nil {

log.Panicf("Error subscribe consumer: %v", err) return

log.Printf("Consumer error: %v (%v)", err, msg)

signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select {

case <-sigterm:

log.Println("terminating: via signal") }

package main import ( "fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"

"log"

brokers = "ip1:port1,ip2:port2,ip3:port3"

group = "group-id"

topics = "topic_name"

)

func main() {

log.Println("Starting a new kafka consumer") config := &kafka.ConfigMap{

"bootstrap.servers": brokers, "group.id": group, "auto.offset.reset": "earliest", }

consumer, err := kafka.NewConsumer(config) if err != nil {

log.Panicf("Error creating consumer: %v", err) return

}

err = consumer.SubscribeTopics([]string{topics}, nil) if err != nil {

log.Panicf("Error subscribe consumer: %v", err) return

signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

select { case <-sigterm:

log.Println("terminating: via signal") }

消息生产和消费的可靠性必须由ROMA Connect、生产者和消费者协同工作才能保 证,对使用ROMA Connect的生产者和消费者有如下的使用建议。

重视消息生产与消费的确认过程

消息生产

生产消息后,生产者需要根据ROMA Connect的返回信息确认消息是否发送成功,如 果返回失败需要重新发送。

每次生产消息,生产者都需要等待消息发送API的应答信号,以确认消息是否成功发 送。在消息传递过程中,如果发生异常,生产者没有接收到发送成功的信号,生产者 自己决策是否需要重复发送消息。如果接收到发送成功的信号,则表明该消息已经被 ROMA Connect可靠存储。

消息消费

消息消费时,消费者需要确认消息是否已被成功消费。

生产的消息被依次存储在ROMA Connect的存储介质中。消费时依次获取ROMA Connect中存储的消息。消费者获取消息后,进行消费并记录消费成功或失败的状态,

并将消费状态提交到ROMA Connect,由ROMA Connect决定消费下一批消息或回滚 重新消费消息。

在消费过程中,如果出现异常,没有提交消费确认,该批消息会在后续的消费请求中 再次被获取。

消息生产与消费的幂等传递

ROMA Connect设计了一系列可靠性保障措施,确保消息不丢失。例如使用消息同步 存储机制防止系统与服务器层面的异常重启或者掉电,使用消息确认(ACK)机制解 决消息传输过程中遇到的异常。

考虑到网络异常等极端情况,用户除了做好消息生产与消费的确认,还需要配合 ROMA Connect完成消息发送与消费的重复传输设计。

● 当无法确认消息是否已发送成功,生产者需要将消息重复发送给ROMA Connect。

● 当重复收到已处理过的消息,消费者需要告诉ROMA Connect消费成功且保证不 重复处理。