2.5.1 Spring Cloud 接入 DTM 样例指南
本章节介绍Spring Cloud 框架下MQ消息接入DTM事务的demo,使得Spring Cloud框 架下的MQ消息项目可以快速接入DTM。
其中,Spring Cloud的样例代码在导入样例工程过程中准备的dtm-demo的dtm-springcloud项目中。
前提条件
在启动样例之前,先安装、启动RocketMQ,以便样例中的服务可以注册到RocketMQ 上。
请参考RocketMQ安装和启动。
样例设计
Spring Cloud样例中使用Spring Cloud框架,样例流程如下所示:
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 136
图 2-22 MQ 消息接入样例设计流程
● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。
● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
须知
@GetMapping(value = "transfer")
@DTMTxBegin(appName = "noninvasive-transfer-mq-SpringCloud")
public String transfer(@RequestParam(value = "id") int id, @RequestParam(value = "money") int money,
@RequestParam(value = "errRate") int errRate) throws Exception { LOGGER.info("mq service start invoke bankA and bankB");
bankOperator.transfer(id, money, errRate);
return "ok";
}// bankOperator有两个实现类,下面以restTemplate的实现类为例 // com.huawei.dtm.mq.server.impl.RestOpImpl.java
@Override
public String transfer(int userId, int money, int errRate) throws Exception { LOGGER.info("Start transfer---rest");
restTemplate.getForObject(String.format(BANKA_TRANSFER, userId, money * 2, errRate), String.class);
mqTemplate.sendMsg(userId, money);
// BankB在转账的时候会概率性抛出异常
restTemplate.getForObject(String.format(BANKB_TRANSFER, userId, money, errRate), String.class);
public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {
DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");
// 设置自己的MQ地址
producer.setNamesrvAddr("http://127.0.0.1:9876");
return producer;
}
● 发送接入到全局事务的半消息。
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 138
说明
● 当全局事务提交时,这个半消息会自动commit,完成投递。
● 当全局事务回滚时,这个半消息会自动cancel,完成丢弃。
// com.huawei.dtm.mq.server.model.RocketMqTemplate.java public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。
String uuid = UUID.randomUUID().toString();
String msgBody = uuid + "__" + money + "__" + userId;
Message msg =
new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));
producer.sendMessageInTransaction(msg, null);
}
public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");
consumer.setNamesrvAddr("http://127.0.0.1:9876");
consumer.subscribe("dtm-topic-mq", "*");
/**
* 消费的时候要保证幂等性, RocketMQ为确保能正常消费, 在少数极端场景有重复消费问 题。 */
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try {
bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));
ConsumerStore.INST.add(arr[0]);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) { e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} });
return consumer;
}//com.huawei.common.impl.BankBService.java
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
public void transferOut(int id, int money) { LOGGER.info("BankB transfer out");
jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);
}
application:
name: dtm-banka main:
allow-bean-definition-overriding: true datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}/banka?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver eureka:
client:
serviceUrl:
defaultZone: http://${eureka_ip}:8761/eureka/
instance:
prefer-ip-address: true
表 2-39 bankA application.yaml 配置文件参数详解
参数 说明
“dtmClientConfig.properties”中,
表 2-40 dtmClientConfig.properties 配置文件参数详解
参数 说明
application:
name: dtm-bankb main:
allow-bean-definition-overriding: true datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
url: jdbc:mysql://${db_ip}/bankb?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver eureka:
client:
serviceUrl:
defaultZone: http://${eureka_ip}:8761/eureka/
instance:
prefer-ip-address: true
3. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,
application:
name: dtm-bankcenter main:
allow-bean-definition-overriding: true eureka:
client:
serviceUrl:
defaultZone: http://${eureka_ip}:8761/eureka/
instance:
prefer-ip-address: true
表 2-41 bankCenter application.yaml 配置文件参数详解
参数 说明
实际修改auto-create-table-dtm-tran-info、dtm-app-name、sc-server-address
application:
name: dtm-client eureka:
client:
serviceUrl:
defaultZone: http://${eureka_ip}:8761/eureka/
instance:
prefer-ip-address: true
3. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,
application:
name: dtm-mq main:
allow-bean-definition-overriding: true eureka:
client:
serviceUrl:
defaultZone: http://${eureka_ip}:8761/eureka/
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
instance:
prefer-ip-address: true
3. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,
application:
name: dtm-mq-consumer main:
allow-bean-definition-overriding: true datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}/bankb?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver eureka:
client:
serviceUrl:
defaultZone: http://${eureka_ip}:8761/eureka/
instance:
prefer-ip-address: true
表 2-42 mq-consumer application.yaml 配置文件参数详解
参数 说明
参数 说明
register-with-eureka: false fetch-registry: false
----结束
启动测试样例
步骤1 登录ECS,在eureka-server.jar包同级目录下,执行java -Dfile.encoding=utf-8 -jar eureka-server.jar启动eureka服务。
步骤2 打开第二个窗口登录ECS,在bankAservicespringcloud.jar包同级目录下,执行java -Dfile.encoding=utf-8 -jar bankA-service-springcloud.jar启动bankA服务。
步骤3 打开第三个窗口登录ECS,在bankBservicespringcloud.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar bankB-service-springcloud.jar启动bankB服务。
步骤4 打开第四个窗口登录ECS,在bankcenterspringcloud.jar包同级目录下,执行java -Dfile.encoding=utf-8 -jar bank-center-springcloud.jar启动bankCenter服务。
步骤5 打开第五个窗口登录ECS,在mqservicespringcloud.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-service-springcloud.jar启动mq-service服务。
步骤6 打开第六个窗口登录ECS,在invokeservicespringcloud.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar invoke-service-springcloud.jar启动invoke服务。
[0] 初始化数据库, 重置账号资金;
步骤7 在invoke服务中,输入命令0初始化数据库。
请输入命令执行操作:(当前远程调用/feign)
02021-06-15 21:00:32.905 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Init bankA bankB success
步骤8 输入命令1查询帐号余额,结果如下图所示。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500000000,total b 500000000,sum 1000000000
步骤9 输入命令4,执行非侵入样例DTM对接消息事务的微服务场景验证。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
| 1| 1000000| 1000000| 2000000|
[ERROR] user id: 2, bankA: 1000200, bankB: 999900, total: 2000100 [ERROR] user id: 3, bankA: 1000200, bankB: 999900, total: 2000100
| 4| 1000000| 1000000| 2000000|
...
| 496| 1000000| 1000000| 2000000|
| 497| 1000000| 1000000| 2000000|
[ERROR] user id: 498, bankA: 1000200, bankB: 999900, total: 2000100
| 499| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499995000,sum 1000005000
步骤12 打开第七个窗口登录ECS,在mqconsumerspringcloud.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-consumer-springcloud.jar启动mq-consumer服务。
步骤13 再输入命令1查询帐号余额,此时通过MQ消息传递的bankB转出100的消息被消费,
最终数据保持一致,结果如下所示。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499990000,sum 1000000000
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 146
步骤14 输入命令5退出执行程序。
----结束
2.5.2 Spring Cloud Huawei 接入 DTM 样例指南
本章节介绍Spring Cloud Huawei框架下MQ消息接入DTM事务的demo,使得Spring Cloud Huawei框架下的MQ消息项目可以快速接入DTM。
其中,Spring Cloud Huawei的样例代码在导入样例工程过程中准备的dtm-demo的 dtm-springcloud-hw项目中。
前提条件
1. 已创建微服务引擎专享版,请参考创建微服务引擎专享版。
说明
DTM不支持开启了安全认证的微服务引擎专享版。
2. 请参考RocketMQ安装和启动,在启动样例之前,先安装、启动RocketMQ,以 便样例中的服务可以注册到RocketMQ上。
样例设计
Spring Cloud Huawei样例中使用Spring Cloud Huawei框架,样例流程参考图2-22。
● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。
● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。
须知
mqConsumer服务消息消费方,需要保证消费的幂等性。RocketMQ本身的消息实现 机制,会有重复消费消息的问题。
业务流程分析
● 正常场景:BankA转入200,BankB第一次转出100,第二次通过mqConsumer服 务转出100。
● 异常场景:mqService服务主动cancel投递的半消息,转账消息丢弃,BankA转入 的200回滚。
DTM 全局事务发起者
定义非侵入样例微服务场景事务发起端,通过微服务接口调用bankA发起银行转账,
并且投递半消息。
// com.huawei.dtm.mq.server.controller.MqController.java
@GetMapping(value = "transfer")
@DTMTxBegin(appName = "noninvasive-transfer-mq-SpringCloudHW")
public String transfer(@RequestParam(value = "id") int id, @RequestParam(value = "money") int money,
@RequestParam(value = "errRate") int errRate) throws Exception { LOGGER.info("mq service start invoke bankA and bankB");
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
bankOperator.transfer(id, money, errRate);
return "ok";
}// bankOperator有两个实现类,下面以restTemplate的实现类为例 // com.huawei.dtm.mq.server.impl.RestOpImpl.java
@Override
public String transfer(int userId, int money, int errRate) throws Exception { LOGGER.info("Start transfer---rest");
restTemplate.getForObject(String.format(BANKA_TRANSFER, userId, money * 2, errRate), String.class);
mqTemplate.sendMsg(userId, money);
// BankB在转账的时候会概率性抛出异常
restTemplate.getForObject(String.format(BANKB_TRANSFER, userId, money, errRate), String.class);
public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {
DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");
// 设置自己的MQ地址
producer.setNamesrvAddr("http://127.0.0.1:9876");
return producer; public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。
String uuid = UUID.randomUUID().toString();
String msgBody = uuid + "__" + money + "__" + userId;
Message msg =
new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));
producer.sendMessageInTransaction(msg, null);
}
public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 148
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");
consumer.setNamesrvAddr("http://127.0.0.1:9876");
consumer.subscribe("dtm-topic-mq", "*");
/**
* 消费的时候要保证幂等性, RocketMQ 为确保能正常消费, 在少数极端场景有重复消费问 题。 */
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try {
bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));
ConsumerStore.INST.add(arr[0]);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) { e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} });
return consumer;
}//com.huawei.common.impl.BankBService.java public void transferOut(int id, int money) { LOGGER.info("BankB transfer out");
jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);
}
application:
name: dtm-banka main:
allow-bean-definition-overriding: true datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}:3306/banka?
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver
表 2-43 application.yaml 配置文件参数详解
参数 说明
db_user_name 客户端MySQL数据库用户名。
db_user_pwd 客户端MySQL数据库的密码。
db_ip 客户端MySQL数据库业务数据库地
址,分别为banka和bankb的地址,根 据实际情况修改。
3. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表2-44,修改address信息。
spring:
application:
name: dtm-banka cloud:
servicecomb:
discovery:
version: 1.0.0 enabled: true
address: ${cse_address}
watch: false
app-name: dtm-demo service-name: dtm-banka
表 2-44 bootstrap.yaml 配置文件参数详解
参数 说明
cse_address 服务注册发现地址,该配置项可从
“应用管理与运维平台”控制台的
“基础设施 > 微服务引擎( CSE)”
中指定微服务引擎实例页面上得到。
4. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,
将下列配置拷贝到配置文件“dtmClientConfig.properties”中,参考表2-45, 按 实际修改auto-create-table-dtm-tran-info、dtm-app-name、sc-server-address 和rpc-ssl-switch。
auto-create-table-dtm-tran-info=on dtm-app-name=xxxx
sc-server-address=xxxx rpc-ssl-switch=off
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 150
表 2-45 dtmClientConfig.properties 配置文件参数详解
参数 说明
auto-create-table-dtm-tran-info 是否自动创建DTM事务表dtm_tran_info,用来记录事 务信息。
– on:自动创建 – off:手动创建
dtm-app-name 应用名称,该配置项可从“应用管理与运维平台”控制 台“分布式事务管理 DTM > 引擎实例”界面中得到。
sc-server-address 服务中心地址,该配置项可从“应用管理与运维平台”
控制台“分布式事务管理 DTM > 引擎实例”界面中得 到。
rpc-ssl-switch SSL开关,该配置项可从“应用管理与运维平台”控制 台“分布式事务管理 DTM > 引擎实例”界面中得到。
– on:开启SSL – off:关闭SSL
图 2-24 DTM 引擎实例
步骤3 配置bankB-service所需配置文件。
1. 登录创建好的弹性云服务器,将bankB-service-springcloudHW.jar包上传至弹性 云服务器的bankB目录,并在bankB-service-springcloudHW.jar包同级目录下新 建“application.yaml”、“bootstrap.yaml”文件和“dtm-config”文件夹。
2. 把下列配置拷贝到“application.yaml”文件中,参考表2-43,修改username、
password和url为您创建好的数据库用户名、密码和地址,拷贝完成后保存退出。
server:
port: 8033 spring:
application:
name: dtm-bankb datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}:3306/bankb?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver
3. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表2-44,修改address信息。
spring:
application:
name: dtm-bankb main:
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
allow-bean-definition-overriding: true
application:
name: dtm-bankcenter main:
allow-bean-definition-overriding: true cloud:
service-name: ${spring.application.name}
表 2-46 bootstrap.yaml 配置文件参数详解
参数 说明
invoke_mode 服务调用的方式。
– rest,服务采取rest调用方式。
– feign,服务采取feign调用方式。
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 152
参数 说明
2. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表 bootstrap.yaml配置文件 参数详解,修改服务的调用方式和address信息。
application:
name: dtm-invoke main:
allow-bean-definition-overriding: true cloud:
service-name: ${spring.application.name}
3. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,
2. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表5 mq-service bootstrap
application:
name: dtm-mq main:
allow-bean-definition-overriding: true cloud:
service-name: ${spring.application.name}
表 2-47 mq-service bootstrap.yaml 配置文件参数详解
参数 说明
2. 把下列配置拷贝到“application.yaml”文件中,参考表 mq-consumer
application.yaml配置文件参数详解,修改username、password和url为您创建 好的bankB数据库用户名、密码和地址。
server:
port: 8393 spring:
datasource:
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 154
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}/bankb?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver
表 2-48 mq-consumer application.yaml 配置文件参数详解
参数 说明
3. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表 mq-service
bootstrap.yaml配置文件参数详解,修改服务的调用方式和address信息。
dtm: invoke:
mode: ${invoke_mode}
spring:
application:
name: dtm-mq-consumer main:
allow-bean-definition-overriding: true cloud:
service-name: ${spring.application.name}
4. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,
步骤1 登录ECS,在bankAservicespringcloudHW.jar包同级目录下,执行java
-Dfile.encoding=utf-8 -jar bankA-service-springcloudHW.jar启动bankA服务。
步骤2 打开第二个窗口登录ECS,在bankB-service-springcloudHW.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar bankB-service-springcloudHW.jar启动bankB服务。
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
步骤3 打开第三个窗口登录ECS,在bankCenter-service-springcloudHW.jar包同级目录下,
执行java -Dfile.encoding=utf-8 -jar bankCenter-service-springcloudHW.jar启动 bankCenter服务。
步骤4 打开第四个窗口登录ECS,在mqservicespringcloudHW.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-service-springcloudHW.jar启动mq-service服务 步骤5 打开第五个窗口登录ECS,在invoke-service-springcloudHW.jar包所在目录,执行java
-Dfile.encoding=utf-8 -jar invoke-service-springcloudHW.jar启动invoke服务。
[0] 初始化数据库, 重置账号资金;
2021-03-23 11:51:59.417 [main] INFO c.h.d.c.service.TransferService - Init bankA bankB success
步骤7 输入命令1查询帐号余额,结果如下所示。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500000000,total b 500000000,sum 1000000000
步骤8 输入命令4,执行非侵入样例DTM事务的微服务场景验证。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
| 1| 1000000| 1000000| 2000000|
[ERROR] user id: 2, bankA: 1000200, bankB: 999900, total: 2000100 [ERROR] user id: 3, bankA: 1000200, bankB: 999900, total: 2000100
| 4| 1000000| 1000000| 2000000|
...
| 496| 1000000| 1000000| 2000000|
| 497| 1000000| 1000000| 2000000|
[ERROR] user id: 498, bankA: 1000200, bankB: 999900, total: 2000100
| 499| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499995000,sum 1000005000
步骤11 打开第六个窗口登录ECS,在mq-consumer-springcloudHW.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-consumer-springcloudHW.jar启动mq-consumer服 务。
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 156
步骤12 再输入命令1查询帐号余额,此时通过MQ消息传递的bankB转出100的消息被消费,
最终数据保持一致,结果如下所示。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
| 1| 1000000| 1000000| 2000000|
| 2| 1000200| 999800| 2000000|
| 3| 1000200| 999800| 2000000|
| 4| 1000000| 1000000| 2000000|
...
| 496| 1000000| 1000000| 2000000|
| 497| 1000000| 1000000| 2000000|
| 498| 1000200| 999800| 2000000|
| 499| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499990000,sum 1000000000
步骤13 输入命令5退出执行程序。
----结束
2.5.3 ServiceComb 接入 DTM 样例指南
本章节介绍ServiceComb 框架下MQ消息接入DTM事务的demo,使得Spring Cloud框 架下的MQ消息项目可以快速接入DTM。
其中,ServiceComb的样例代码在导入样例工程过程中准备的dtm-demo的dtm-servicecomb项目中。
前提条件
1. 已创建微服务引擎专享版,请参考创建微服务引擎专享版。
说明
DTM不支持开启了安全认证的微服务引擎专享版。
2. 请参考RocketMQ安装和启动,在启动样例之前,先安装、启动RocketMQ,以 便样例中的服务可以注册到RocketMQ上。
样例设计
ServiceComb样例中使用ServiceComb框架,样例流程参考图2-22。
● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。
● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。
须知
mqConsumer服务消息消费方,需要保证消费的幂等性。RocketMQ本身的消息实现 机制,会有重复消费消息的问题。
业务流程分析
● 正常场景:BankA转入200,BankB第一次转出100,第二次通过mqConsumer服 务转出100。
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
● 异常场景:mqService服务主动cancel投递的半消息,转账消息丢弃,BankA转入
@GetMapping(value = "transfer")
@DTMTxBegin(appName = "noninvasive-transfer-mq-ServiceComb")
public String transfer(@RequestParam(value = "id") int id, @RequestParam(value = "money") int money,
@RequestParam(value = "errRate") int errRate) throws Exception { LOGGER.info("mq service start invoke bankA and bankB");
bankOperator.transfer(id, money, errRate);
return "ok";
}// bankOperator有两个实现类,下面以restTemplate的实现类为例 // com.huawei.dtm.mq.server.impl.RestOpImpl.java
@Override
public String transfer(int userId, int money, int errRate) throws Exception { LOGGER.info("Start transfer---rest");
restTemplate.getForObject(String.format(BANKA_TRANSFER, userId, money * 2, errRate), String.class);
mqTemplate.sendMsg(userId, money);
// BankB在转账的时候会概率性抛出异常
restTemplate.getForObject(String.format(BANKB_TRANSFER, userId, money, errRate), String.class);
public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {
DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");
// 设置自己的MQ地址
producer.setNamesrvAddr("http://127.0.0.1:9876");
return producer; public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。
String uuid = UUID.randomUUID().toString();
String msgBody = uuid + "__" + money + "__" + userId;
Message msg =
new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));
producer.sendMessageInTransaction(msg, null);
}
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 158
mqConsumer 服务
public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");
consumer.setNamesrvAddr("http://127.0.0.1:9876");
consumer.subscribe("dtm-topic-mq", "*");
/**
* 消费的时候要保证幂等性, RocketMQ为确保能正常消费, 在少数极端场景有重复消费问 题。 */
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try {
bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));
ConsumerStore.INST.add(arr[0]);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) { e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} });
return consumer;
}//com.huawei.common.impl.BankBService.java public void transferOut(int id, int money) { LOGGER.info("BankB transfer out");
jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);
}
datasource:
banka:
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}:3306/banka?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC
表 2-49 microservice.yaml 配置文件参数详解
参数 说明
default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:
表 2-50 配置 microservice.yaml servicecomb 参数详解
参数 说明
datasource:
bankb:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}:3306/bankb?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 160
2. 修改“dtm-servicecomb\bank-b\src\main\resources\microservice.yaml”文件中
default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:
default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:
default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:
1. 修改“dtm-servicecomb\mq-service\src\main\resources\microservice.yaml”文
default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:
default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:
3. 参考表 microservice.yaml配置文件参数详解,修改“microservice.yaml”文件 的username、password和url为您创建好的bankB数据库用户名、密码和地址。
spring:
main:
allow-bean-definition-overriding: true datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}/bankb?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver
步骤7 通过Maven执行clean、install,将工程进行打包,生成banka、bankb、
auto-create-table-dtm-tran-info=on dtm-app-name=xxxx
sc-server-address=xxxx rpc-ssl-switch=off
表 2-51 dtmClientConfig.properties 配置文件参数详解
参数 说明
auto-create-table-dtm-tran-info 是否自动创建DTM事务表dtm_tran_info,用来记录事务信 息。
● on:自动创建
● off:手动创建
dtm-app-name 应用名称,该配置项可从“应用管理与运维平台”控制台
“分布式事务管理 DTM > 引擎实例”界面中得到。
sc-server-address 服务中心地址,该配置项可从“应用管理与运维平台”控 制台“分布式事务管理 DTM > 引擎实例”界面中得到。
rpc-ssl-switch SSL开关,该配置项可从“应用管理与运维平台”控制台
“分布式事务管理 DTM > 引擎实例”界面中得到。
● on:开启SSL
● off:关闭SSL
图 2-25 DTM 引擎实例
----结束
启动测试样例
步骤1 登录ECS,在bankAserviceservicecomb.jar包同级目录下,执行java
-Dfile.encoding=utf-8 -jar bankA-service-servicecomb.jar启动bankA服务。
步骤2 打开第二个窗口登录ECS,在bankBserviceservicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar bankB-service-servicecomb.jar启动bankB服务。
步骤3 打开第三个窗口登录ECS,在bankCenter-service-servicecomb.jar包同级目录下,执行 java -Dfile.encoding=utf-8 -jar bankCenter-service-servicecomb.jar启动
bankCenter服务。
步骤4 打开第四个窗口登录ECS,在mqserviceservicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-service-servicecomb.jar启动mq-service服务 步骤5 打开第五个窗口登录ECS,在invokeserviceservicecomb.jar包所在目录,执行java
-Dfile.encoding=utf-8 -jar invoke-service-servicecomb.jar启动invoke服务。
[0] 初始化数据库, 重置账号资金;
[1] 查询 Bank A 和 Bank B 余额;
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
[2] 非侵入用例 -> DTM 事务 微服务场景调用;
2021-03-23 11:51:59.417 [main] INFO c.h.d.c.service.TransferService - Init bankA bankB success
步骤7 输入命令1查询帐号余额,结果如下所示。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500000000,total b 500000000,sum 1000000000
步骤8 输入命令4,执行非侵入样例DTM对接消息事务的微服务场景验证。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
| 1| 1000000| 1000000| 2000000|
[ERROR] user id: 2, bankA: 1000200, bankB: 999900, total: 2000100 [ERROR] user id: 3, bankA: 1000200, bankB: 999900, total: 2000100
| 4| 1000000| 1000000| 2000000|
...
| 496| 1000000| 1000000| 2000000|
| 497| 1000000| 1000000| 2000000|
[ERROR] user id: 498, bankA: 1000200, bankB: 999900, total: 2000100
| 499| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499995000,sum 1000005000
步骤11 打开第六个窗口登录ECS,在mqconsumerservicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-consumer-servicecomb.jar启动mq-consumer服 务。
步骤12 再输入命令1查询帐号余额,此时通过MQ消息传递的bankB转出100的消息被消费,
最终数据保持一致,结果如下所示。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
| 498| 1000000| 1000000| 2000000|
| 499| 1000200| 999800| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 49990000,sum 1000000000
步骤13 输入命令5退出执行程序。
----结束
2.5.4 Dubbo 接入 DTM 样例指南
本章节介绍Dubbo框架下MQ消息接入DTM事务的demo,使得Dubbo框架下的MQ消 息项目可以快速接入DTM。
其中,Dubbo的样例代码在 导入样例工程 过程中准备的dtm-demo的dtm-dubbo项目 中。
前提条件
1. 下载安装ZooKeeper,请参考8.1下载安装Zookeeper。
2. 在启动我们提供的dtm-dubbo样例之前,请先启动ZooKeeper,以便dtm-dubbo 中的服务可以注册到ZooKeeper上。
3. 请参考RocketMQ安装和启动,在启动样例之前,先安装、启动RocketMQ,以 便样例中的服务可以注册到RocketMQ上。
样例设计
dtm-dubbo样例中使用Dubbo框架,样例流程参考图2-22。
● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。
● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。
须知
mqConsumer服务消息消费方,需要保证消费的幂等性。RocketMQ本身的消息实现 机制,会有重复消费消息的问题。
业务流程分析
● 正常场景:BankA转入200,BankB第一次转出100,第二次通过mqConsumer服 务转出100。
● 异常场景:mqService服务主动cancel投递的半消息,转账消息丢弃,BankA转入 的200回滚。
DTM 全局事务发起者
定义非侵入样例微服务场景事务发起端,通过微服务接口调用bankA发起银行转账,
并且投递半消息。
// com.huawei.dubbo.mq.service.controller.MqServiceController.java
@DTMTxBegin(appName = "mq-transfer-dubbo")
public String transfer(int id, int money, int errRate) throws Exception {
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
LOGGER.info("mq service start invoke bankA and bankB: {}", DTMContext.getDTMContext().getGlobalTxId());
bankAController.transfer(id, money * 2, errRate);
rocketMqTemplate.sendMsg(id, money);
bankBController.transfer(id, money, errRate);
return "ok";
public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {
DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");
// 设置自己的MQ地址
producer.setNamesrvAddr("http://127.0.0.1:9876");
return producer; public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。
String uuid = UUID.randomUUID().toString();
String msgBody = uuid + "__" + money + "__" + userId;
Message msg =
new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));
producer.sendMessageInTransaction(msg, null);
}
public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");
consumer.setNamesrvAddr("http://127.0.0.1:9876");
consumer.subscribe("dtm-topic-mq", "*");
/**
* 消费的时候要保证幂等性, RocketMQ为确保能正常消费, 在少数极端场景有重复消费问 题。 */
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 166
try {
bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));
ConsumerStore.INST.add(arr[0]);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) { e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} });
return consumer;
}// com.huawei.dubbo.common.impl.BankBService.java public void transferOut(int id, int money) {
LOGGER.info("BankB transfer out");
jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);
}
address: zookeeper://${registry_ip}:${registry_port}
metadata-report:
address: zookeeper://${registry_ip}:${registry_port}
scan:
base-packages: com.huawei.dubbo.banka protocols:
dubbo:
datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
url: jdbc:mysql://${db_ip}:3306/banka?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver
表 2-52 bankA 配置文件参数详解
application:
name: dtm-banka main:
allow-bean-definition-overriding: true cloud:
zookeeper:
enabled: true
connect-string: ${registry_ip}:${registry_port}
4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下
表 2-53 dtmClientConfig.properties 配置文件参数详解
参数 说明
参数 说明
address: zookeeper://${registry_ip}:${registry_port}
metadata-report:
address: zookeeper://${registry_ip}:${registry_port}
scan:
base-packages: com.huawei.dubbo.bankb protocols:
dubbo:
datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}:3306/bankb?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver
3. 在bankB-service-dubbo.jar包同级目录下,新建配置文件“bootstrap.yaml”,
把下列配置拷贝到“bootstrap.yaml”文件中,使用ZooKeeper作为注册中心时的 注册信息参考表2-52,修改registry_ip和registry_port,完成后保存退出。
spring:
application:
name: dtm-bankb
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
main:
allow-bean-definition-overriding: true cloud:
zookeeper:
enabled: true
connect-string: ${registry_ip}:${registry_port}
4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下
address: zookeeper://${registry_ip}:${registry_port}
metadata-report:
address: zookeeper://${registry_ip}:${registry_port}
scan:
base-packages: com.huawei.dubbo.bank.center protocols:dubbo:
name: dubbo port: -1 server:
port: 8089
表 2-54 bankCenter 配置文件参数详解
参数 说明
application:
name: dtm-bank-center main:
allow-bean-definition-overriding: true cloud:
zookeeper:
enabled: true
connect-string: ${registry_ip}:${registry_port}
4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下 列配置拷贝到配置文件“dtmClientConfig.properties”中, 参考表2-53,按实际
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 170
修改auto-create-table-dtm-tran-info、dtm-app-name、sc-server-address和
address: zookeeper://${registry_ip}:${registry_port}
metadata-report:
address: zookeeper://${registry_ip}:${registry_port}
scan:
base-packages: com.huawei.dubbo.mq.service protocols:
dubbo:
application:
name: mq-service main:
allow-bean-definition-overriding: true cloud:
zookeeper:
enabled: true
connect-string: ${registry_ip}:${registry_port}
4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下
address: zookeeper://${registry_ip}:${registry_port}
metadata-report:
address: zookeeper://${registry_ip}:${registry_port}
scan:
base-packages: com.huawei.dubbo.mq.consumer protocols:
dubbo:
application:
name: mq-consumer main:
allow-bean-definition-overriding: true cloud:
zookeeper:
enabled: true
connect-string: ${registry_ip}:${registry_port}
datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}:3306/bankb?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver
4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下
address: zookeeper://${registry_ip}:${registry_port}
metadata-report:
address: zookeeper://${registry_ip}:${registry_port}
scan:
base-packages: com.huawei.dubbo.invoke protocols:
dubbo:
name: dubbo
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 172
port: -1
application:
name: dtm-invoke main:
allow-bean-definition-overriding: true cloud:
zookeeper:
enabled: true
connect-string: ${registry_ip}:${registry_port}
4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下
步骤2 登录ECS,在bankAservicedubbo.jar包同级目录下打开终端窗口,执行java -Dfile.encoding=utf-8 -jar bankA-service-dubbo.jar启动banka服务。
步骤3 打开第二个窗口登录ECS,在bankB-service-dubbo.jar包同级目录下打开一个终端窗 口,执行java -Dfile.encoding=utf-8 -jar bankB-service-dubbo.jar启动bankb服 务。
步骤4 打开第三个窗口登录ECS,在bank-center-dubbo.jar包同级目录下打开一个终端窗口,
执行java -Dfile.encoding=utf-8 -jar bank-center-dubbo.jar启动bankcenter服务。
步骤5 打开第四个窗口登录ECS,在mqservicedubbo.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-service-dubbo.jar启动mq-service服务。
步骤6 打开第五个窗口登录ECS,在invoke-service-mq-dubbo.jar包同级目录下打开一个终端 窗口,执行java -Dfile.encoding=utf-8 -jar invoke-service-mq-dubbo.jar启动 invoke服务。
02021-06-15 21:00:32.905 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Init bankA bankB success
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
步骤8 输入命令1查询BankA和BankB帐号余额。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
2021-03-23 14:25:29,799 [INFO] Run finish. total a 500000000,total b 500000000,sum 1000000000
步骤9 输入命令2,执行非侵入样例DTM对接消息事务的微服务场景验证。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
| 1| 1000000| 1000000| 2000000|
[ERROR] user id: 2, bankA: 1000200, bankB: 999900, total: 2000100 [ERROR] user id: 3, bankA: 1000200, bankB: 999900, total: 2000100
| 4| 1000000| 1000000| 2000000|
...
| 496| 1000000| 1000000| 2000000|
| 497| 1000000| 1000000| 2000000|
[ERROR] user id: 498, bankA: 1000200, bankB: 999900, total: 2000100
| 499| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499995000,sum 1000005000
步骤12 打开第六个窗口登录ECS,在mqconsumerdubbo.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-consumer-dubbo.jar启动mq-consumer服务。
步骤13 在invoke服务中,再输入命令1查询帐号余额,此时通过MQ消息传递的bankB转出100 的消息被消费,最终数据保持一致,结果如下所示。
|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|
| 0| 1000000| 1000000| 2000000|
2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499990000,sum 1000000000
步骤14 输入命令3退出执行程序。
----结束
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 174
2.5.5 Dubbo ServiceComb 接入 DTM 样例指南
本章节介绍Dubbo ServiceComb框架下MQ消息接入DTM事务的demo,使得Dubbo ServiceComb框架下的MQ消息项目可以快速接入DTM。
其中,Dubbo ServiceComb的样例代码在导入样例工程过程中准备的dtm-demo的 dtm-dubbo-servicecomb项目中。
前提条件
1. 已创建微服务引擎专享版,请参考创建微服务引擎专享版。
说明
DTM不支持开启了安全认证的微服务引擎专享版。
2. 请参考RocketMQ安装和启动,在启动样例之前,先安装、启动RocketMQ,以 便样例中的服务可以注册到RocketMQ上。
样例设计
Dubbo ServiceComb样例中使用Dubbo ServiceComb框架,样例流程参考图2-22。
● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。
● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。
须知
mqConsumer服务消息消费方,需要保证消费的幂等性。RocketMQ本身的消息实现 机制,会有重复消费消息的问题。
业务流程分析
● 正常场景:BankA转入200,BankB第一次转出100,第二次通过mqConsumer服 务转出100。
● 异常场景:mqService服务主动cancel投递的半消息,转账消息丢弃,BankA转入 的200回滚。
DTM 全局事务发起者
定义非侵入样例微服务场景事务发起端,通过微服务接口调用bankA发起银行转账,
并且投递半消息。
// com.huawei.dubbo.mq.service.controller.MqServiceController.java
@DTMTxBegin(appName = "mq-transfer-dubbo-servicecomb") public String transfer(int id, int money, int errRate) throws Exception { LOGGER.info("mq service start invoke bankA and bankB: {}", DTMContext.getDTMContext().getGlobalTxId());
bankAController.transfer(id, money * 2, errRate);
rocketMqTemplate.sendMsg(id, money);
bankBController.transfer(id, money, errRate);
return "ok";
}
最佳实践 2 使用 DTM 保证银行转账业务数据一致性
mqService 服务
● 使用DTM封装好的事务生产者DtmRocketMqProducer。
// com.huawei.dubbo.mq.service.config.WebConfig.java;
@Bean
public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {
DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");
// 设置自己的MQ地址
producer.setNamesrvAddr("http://127.0.0.1:9876");
return producer; public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。
String uuid = UUID.randomUUID().toString();
String msgBody = uuid + "__" + money + "__" + userId;
Message msg =
new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));
producer.sendMessageInTransaction(msg, null);
}
public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");
consumer.setNamesrvAddr("http://127.0.0.1:9876");
consumer.subscribe("dtm-topic-mq", "*");
/**
* 消费的时候要保证幂等性, RocketMQ为确保能正常消费, 在少数极端场景有重复消费问 题。 */
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try {
防止重复消费。
*/
bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));
ConsumerStore.INST.add(arr[0]);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) { e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} });
return consumer;
}// com.huawei.dubbo.common.impl.BankBService.java public void transferOut(int id, int money) {
LOGGER.info("BankB transfer out");
jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);
}
application:
name: dtm-banka main:
allow-bean-definition-overriding: true datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}/banka?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver
表 2-55 bankA-service 配置文件参数详解
参数 说明
dubbo.servicecomb.registry.address=${registry_address}
dubbo.servicecomb.config.address=${config_address}
表 2-56 dubbo.properties 配置文件参数详解
参数 说明
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://
www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/
dubbo.xsd">
<dubbo:application name="dtm-banka"/>
<dubbo:registry address="sc://${registry_address}"/>
<dubbo:protocol name="dubbo" port="8037"/>
<bean id="bankAController"
class="com.huawei.dubbo.banka.controller.BankAController"/>
<dubbo:service interface="com.huawei.dubbo.common.intf.IBankAController"
ref="bankAController"/>
<dubbo:provider timeout="30000"/>
</beans>
application:
name: dtm-bankb main:
allow-bean-definition-overriding: true datasource:
bank:
username: ${db_user_name}
password: ${db_user_pwd}
url: jdbc:mysql://${db_ip}/bankb?
verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=
UTC driver-class-name: com.mysql.cj.jdbc.Driver
文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 178
2. 修改“dtm-dubbo-servicecomb\bankB-service\src\main\resources
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://
www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/
dubbo.xsd">
<dubbo:application name="dtm-bankb"/>
<dubbo:registry address="sc://${registry_address}"/>
<dubbo:protocol name="dubbo" port="8047"/>
<bean id="bankBController"
class="com.huawei.dubbo.bankb.controller.BankBController"/>
<dubbo:service interface="com.huawei.dubbo.common.intf.IBankBController"
ref="bankBController"/>
<dubbo:provider timeout="30000"/>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://
www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/
dubbo.xsd">
<dubbo:application name="bank-center"/>
<dubbo:registry address="sc://${registry_address}"/>
<dubbo:protocol name="dubbo" port="8031"/>
<bean id="bankCenterController"
class="com.huawei.dubbo.bank.center.controller.BankCenterController"/>
<dubbo:service interface="com.huawei.dubbo.common.intf.IBankCenterController"
ref="bankCenterController"/>
<dubbo:reference id="bankAController" check="false"
interface="com.huawei.dubbo.common.intf.IBankAController"/>
interface="com.huawei.dubbo.common.intf.IBankAController"/>