• 沒有找到結果。

2.5.1 Spring Cloud 接入 DTM 样例指南

本章节介绍Spring Cloud 框架下MQ消息接入DTM事务的demo,使得Spring Cloud框 架下的MQ消息项目可以快速接入DTM。

其中,Spring Cloud的样例代码在导入样例工程过程中准备的dtm-demo的dtm-springcloud项目中。

前提条件

在启动样例之前,先安装、启动RocketMQ,以便样例中的服务可以注册到RocketMQ 上。

请参考RocketMQ安装和启动。

样例设计

Spring Cloud样例中使用Spring Cloud框架,样例流程如下所示:

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 136

2-22 MQ 消息接入样例设计流程

● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。

● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

须知

@GetMapping(value = "transfer")

@DTMTxBegin(appName = "noninvasive-transfer-mq-SpringCloud")

public String transfer(@RequestParam(value = "id") int id, @RequestParam(value = "money") int money,

@RequestParam(value = "errRate") int errRate) throws Exception { LOGGER.info("mq service start invoke bankA and bankB");

bankOperator.transfer(id, money, errRate);

return "ok";

}// bankOperator有两个实现类,下面以restTemplate的实现类为例 // com.huawei.dtm.mq.server.impl.RestOpImpl.java

@Override

public String transfer(int userId, int money, int errRate) throws Exception { LOGGER.info("Start transfer---rest");

restTemplate.getForObject(String.format(BANKA_TRANSFER, userId, money * 2, errRate), String.class);

mqTemplate.sendMsg(userId, money);

// BankB在转账的时候会概率性抛出异常

restTemplate.getForObject(String.format(BANKB_TRANSFER, userId, money, errRate), String.class);

public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {

DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");

// 设置自己的MQ地址

producer.setNamesrvAddr("http://127.0.0.1:9876");

return producer;

}

● 发送接入到全局事务的半消息。

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 138

说明

● 当全局事务提交时,这个半消息会自动commit,完成投递。

● 当全局事务回滚时,这个半消息会自动cancel,完成丢弃。

// com.huawei.dtm.mq.server.model.RocketMqTemplate.java public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。

String uuid = UUID.randomUUID().toString();

String msgBody = uuid + "__" + money + "__" + userId;

Message msg =

new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));

producer.sendMessageInTransaction(msg, null);

}

public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");

consumer.setNamesrvAddr("http://127.0.0.1:9876");

consumer.subscribe("dtm-topic-mq", "*");

/**

* 消费的时候要保证幂等性, RocketMQ为确保能正常消费, 在少数极端场景有重复消费问 题。 */

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try {

bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));

ConsumerStore.INST.add(arr[0]);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

} catch (Exception e) { e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

} });

return consumer;

}//com.huawei.common.impl.BankBService.java

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

public void transferOut(int id, int money) { LOGGER.info("BankB transfer out");

jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);

}

application:

name: dtm-banka main:

allow-bean-definition-overriding: true datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}/banka?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver eureka:

client:

serviceUrl:

defaultZone: http://${eureka_ip}:8761/eureka/

instance:

prefer-ip-address: true

2-39 bankA application.yaml 配置文件参数详解

参数 说明

“dtmClientConfig.properties”中,

2-40 dtmClientConfig.properties 配置文件参数详解

参数 说明

application:

name: dtm-bankb main:

allow-bean-definition-overriding: true datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

url: jdbc:mysql://${db_ip}/bankb?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver eureka:

client:

serviceUrl:

defaultZone: http://${eureka_ip}:8761/eureka/

instance:

prefer-ip-address: true

3. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,

application:

name: dtm-bankcenter main:

allow-bean-definition-overriding: true eureka:

client:

serviceUrl:

defaultZone: http://${eureka_ip}:8761/eureka/

instance:

prefer-ip-address: true

2-41 bankCenter application.yaml 配置文件参数详解

参数 说明

实际修改auto-create-table-dtm-tran-info、dtm-app-name、sc-server-address

application:

name: dtm-client eureka:

client:

serviceUrl:

defaultZone: http://${eureka_ip}:8761/eureka/

instance:

prefer-ip-address: true

3. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,

application:

name: dtm-mq main:

allow-bean-definition-overriding: true eureka:

client:

serviceUrl:

defaultZone: http://${eureka_ip}:8761/eureka/

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

instance:

prefer-ip-address: true

3. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,

application:

name: dtm-mq-consumer main:

allow-bean-definition-overriding: true datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}/bankb?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver eureka:

client:

serviceUrl:

defaultZone: http://${eureka_ip}:8761/eureka/

instance:

prefer-ip-address: true

2-42 mq-consumer application.yaml 配置文件参数详解

参数 说明

参数 说明

register-with-eureka: false fetch-registry: false

----结束

启动测试样例

步骤1 登录ECS,在eureka-server.jar包同级目录下,执行java -Dfile.encoding=utf-8 -jar eureka-server.jar启动eureka服务。

步骤2 打开第二个窗口登录ECS,在bankAservicespringcloud.jar包同级目录下,执行java -Dfile.encoding=utf-8 -jar bankA-service-springcloud.jar启动bankA服务。

步骤3 打开第三个窗口登录ECS,在bankBservicespringcloud.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar bankB-service-springcloud.jar启动bankB服务。

步骤4 打开第四个窗口登录ECS,在bankcenterspringcloud.jar包同级目录下,执行java -Dfile.encoding=utf-8 -jar bank-center-springcloud.jar启动bankCenter服务。

步骤5 打开第五个窗口登录ECS,在mqservicespringcloud.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-service-springcloud.jar启动mq-service服务。

步骤6 打开第六个窗口登录ECS,在invokeservicespringcloud.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar invoke-service-springcloud.jar启动invoke服务。

[0] 初始化数据库, 重置账号资金;

步骤7 在invoke服务中,输入命令0初始化数据库。

请输入命令执行操作:(当前远程调用/feign)

02021-06-15 21:00:32.905 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Init bankA bankB success

步骤8 输入命令1查询帐号余额,结果如下图所示。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500000000,total b 500000000,sum 1000000000

步骤9 输入命令4,执行非侵入样例DTM对接消息事务的微服务场景验证。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

| 1| 1000000| 1000000| 2000000|

[ERROR] user id: 2, bankA: 1000200, bankB: 999900, total: 2000100 [ERROR] user id: 3, bankA: 1000200, bankB: 999900, total: 2000100

| 4| 1000000| 1000000| 2000000|

...

| 496| 1000000| 1000000| 2000000|

| 497| 1000000| 1000000| 2000000|

[ERROR] user id: 498, bankA: 1000200, bankB: 999900, total: 2000100

| 499| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499995000,sum 1000005000

步骤12 打开第七个窗口登录ECS,在mqconsumerspringcloud.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-consumer-springcloud.jar启动mq-consumer服务。

步骤13 再输入命令1查询帐号余额,此时通过MQ消息传递的bankB转出100的消息被消费,

最终数据保持一致,结果如下所示。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499990000,sum 1000000000

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 146

步骤14 输入命令5退出执行程序。

----结束

2.5.2 Spring Cloud Huawei 接入 DTM 样例指南

本章节介绍Spring Cloud Huawei框架下MQ消息接入DTM事务的demo,使得Spring Cloud Huawei框架下的MQ消息项目可以快速接入DTM。

其中,Spring Cloud Huawei的样例代码在导入样例工程过程中准备的dtm-demo的 dtm-springcloud-hw项目中。

前提条件

1. 已创建微服务引擎专享版,请参考创建微服务引擎专享版。

说明

DTM不支持开启了安全认证的微服务引擎专享版。

2. 请参考RocketMQ安装和启动,在启动样例之前,先安装、启动RocketMQ,以 便样例中的服务可以注册到RocketMQ上。

样例设计

Spring Cloud Huawei样例中使用Spring Cloud Huawei框架,样例流程参考图2-22。

● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。

● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。

须知

mqConsumer服务消息消费方,需要保证消费的幂等性。RocketMQ本身的消息实现 机制,会有重复消费消息的问题。

业务流程分析

● 正常场景:BankA转入200,BankB第一次转出100,第二次通过mqConsumer服 务转出100。

● 异常场景:mqService服务主动cancel投递的半消息,转账消息丢弃,BankA转入 的200回滚。

DTM 全局事务发起者

定义非侵入样例微服务场景事务发起端,通过微服务接口调用bankA发起银行转账,

并且投递半消息。

// com.huawei.dtm.mq.server.controller.MqController.java

@GetMapping(value = "transfer")

@DTMTxBegin(appName = "noninvasive-transfer-mq-SpringCloudHW")

public String transfer(@RequestParam(value = "id") int id, @RequestParam(value = "money") int money,

@RequestParam(value = "errRate") int errRate) throws Exception { LOGGER.info("mq service start invoke bankA and bankB");

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

bankOperator.transfer(id, money, errRate);

return "ok";

}// bankOperator有两个实现类,下面以restTemplate的实现类为例 // com.huawei.dtm.mq.server.impl.RestOpImpl.java

@Override

public String transfer(int userId, int money, int errRate) throws Exception { LOGGER.info("Start transfer---rest");

restTemplate.getForObject(String.format(BANKA_TRANSFER, userId, money * 2, errRate), String.class);

mqTemplate.sendMsg(userId, money);

// BankB在转账的时候会概率性抛出异常

restTemplate.getForObject(String.format(BANKB_TRANSFER, userId, money, errRate), String.class);

public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {

DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");

// 设置自己的MQ地址

producer.setNamesrvAddr("http://127.0.0.1:9876");

return producer; public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。

String uuid = UUID.randomUUID().toString();

String msgBody = uuid + "__" + money + "__" + userId;

Message msg =

new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));

producer.sendMessageInTransaction(msg, null);

}

public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 148

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");

consumer.setNamesrvAddr("http://127.0.0.1:9876");

consumer.subscribe("dtm-topic-mq", "*");

/**

* 消费的时候要保证幂等性, RocketMQ 为确保能正常消费, 在少数极端场景有重复消费问 题。 */

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try {

bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));

ConsumerStore.INST.add(arr[0]);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

} catch (Exception e) { e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

} });

return consumer;

}//com.huawei.common.impl.BankBService.java public void transferOut(int id, int money) { LOGGER.info("BankB transfer out");

jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);

}

application:

name: dtm-banka main:

allow-bean-definition-overriding: true datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}:3306/banka?

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver

2-43 application.yaml 配置文件参数详解

参数 说明

db_user_name 客户端MySQL数据库用户名。

db_user_pwd 客户端MySQL数据库的密码。

db_ip 客户端MySQL数据库业务数据库地

址,分别为banka和bankb的地址,根 据实际情况修改。

3. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表2-44,修改address信息。

spring:

application:

name: dtm-banka cloud:

servicecomb:

discovery:

version: 1.0.0 enabled: true

address: ${cse_address}

watch: false

app-name: dtm-demo service-name: dtm-banka

2-44 bootstrap.yaml 配置文件参数详解

参数 说明

cse_address 服务注册发现地址,该配置项可从

“应用管理与运维平台”控制台的

“基础设施 > 微服务引擎( CSE)”

中指定微服务引擎实例页面上得到。

4. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,

将下列配置拷贝到配置文件“dtmClientConfig.properties”中,参考表2-45, 按 实际修改auto-create-table-dtm-tran-info、dtm-app-name、sc-server-address 和rpc-ssl-switch。

auto-create-table-dtm-tran-info=on dtm-app-name=xxxx

sc-server-address=xxxx rpc-ssl-switch=off

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 150

2-45 dtmClientConfig.properties 配置文件参数详解

参数 说明

auto-create-table-dtm-tran-info 是否自动创建DTM事务表dtm_tran_info,用来记录事 务信息。

– on:自动创建 – off:手动创建

dtm-app-name 应用名称,该配置项可从“应用管理与运维平台”控制 台“分布式事务管理 DTM > 引擎实例”界面中得到。

sc-server-address 服务中心地址,该配置项可从“应用管理与运维平台”

控制台“分布式事务管理 DTM > 引擎实例”界面中得 到。

rpc-ssl-switch SSL开关,该配置项可从“应用管理与运维平台”控制 台“分布式事务管理 DTM > 引擎实例”界面中得到。

– on:开启SSL – off:关闭SSL

2-24 DTM 引擎实例

步骤3 配置bankB-service所需配置文件。

1. 登录创建好的弹性云服务器,将bankB-service-springcloudHW.jar包上传至弹性 云服务器的bankB目录,并在bankB-service-springcloudHW.jar包同级目录下新 建“application.yaml”、“bootstrap.yaml”文件和“dtm-config”文件夹。

2. 把下列配置拷贝到“application.yaml”文件中,参考表2-43,修改username、

password和url为您创建好的数据库用户名、密码和地址,拷贝完成后保存退出。

server:

port: 8033 spring:

application:

name: dtm-bankb datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}:3306/bankb?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver

3. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表2-44,修改address信息。

spring:

application:

name: dtm-bankb main:

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

allow-bean-definition-overriding: true

application:

name: dtm-bankcenter main:

allow-bean-definition-overriding: true cloud:

service-name: ${spring.application.name}

2-46 bootstrap.yaml 配置文件参数详解

参数 说明

invoke_mode 服务调用的方式。

– rest,服务采取rest调用方式。

– feign,服务采取feign调用方式。

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 152

参数 说明

2. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表 bootstrap.yaml配置文件 参数详解,修改服务的调用方式和address信息。

application:

name: dtm-invoke main:

allow-bean-definition-overriding: true cloud:

service-name: ${spring.application.name}

3. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,

2. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表5 mq-service bootstrap

application:

name: dtm-mq main:

allow-bean-definition-overriding: true cloud:

service-name: ${spring.application.name}

2-47 mq-service bootstrap.yaml 配置文件参数详解

参数 说明

2. 把下列配置拷贝到“application.yaml”文件中,参考表 mq-consumer

application.yaml配置文件参数详解,修改username、password和url为您创建 好的bankB数据库用户名、密码和地址。

server:

port: 8393 spring:

datasource:

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 154

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}/bankb?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver

2-48 mq-consumer application.yaml 配置文件参数详解

参数 说明

3. 把下列配置拷贝到“bootstrap.yaml”文件中,参考表 mq-service

bootstrap.yaml配置文件参数详解,修改服务的调用方式和address信息。

dtm: invoke:

mode: ${invoke_mode}

spring:

application:

name: dtm-mq-consumer main:

allow-bean-definition-overriding: true cloud:

service-name: ${spring.application.name}

4. 在新建目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,

步骤1 登录ECS,在bankAservicespringcloudHW.jar包同级目录下,执行java

-Dfile.encoding=utf-8 -jar bankA-service-springcloudHW.jar启动bankA服务。

步骤2 打开第二个窗口登录ECS,在bankB-service-springcloudHW.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar bankB-service-springcloudHW.jar启动bankB服务。

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

步骤3 打开第三个窗口登录ECS,在bankCenter-service-springcloudHW.jar包同级目录下,

执行java -Dfile.encoding=utf-8 -jar bankCenter-service-springcloudHW.jar启动 bankCenter服务。

步骤4 打开第四个窗口登录ECS,在mqservicespringcloudHW.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-service-springcloudHW.jar启动mq-service服务 步骤5 打开第五个窗口登录ECS,在invoke-service-springcloudHW.jar包所在目录,执行java

-Dfile.encoding=utf-8 -jar invoke-service-springcloudHW.jar启动invoke服务。

[0] 初始化数据库, 重置账号资金;

2021-03-23 11:51:59.417 [main] INFO c.h.d.c.service.TransferService - Init bankA bankB success

步骤7 输入命令1查询帐号余额,结果如下所示。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500000000,total b 500000000,sum 1000000000

步骤8 输入命令4,执行非侵入样例DTM事务的微服务场景验证。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

| 1| 1000000| 1000000| 2000000|

[ERROR] user id: 2, bankA: 1000200, bankB: 999900, total: 2000100 [ERROR] user id: 3, bankA: 1000200, bankB: 999900, total: 2000100

| 4| 1000000| 1000000| 2000000|

...

| 496| 1000000| 1000000| 2000000|

| 497| 1000000| 1000000| 2000000|

[ERROR] user id: 498, bankA: 1000200, bankB: 999900, total: 2000100

| 499| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499995000,sum 1000005000

步骤11 打开第六个窗口登录ECS,在mq-consumer-springcloudHW.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-consumer-springcloudHW.jar启动mq-consumer服 务。

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 156

步骤12 再输入命令1查询帐号余额,此时通过MQ消息传递的bankB转出100的消息被消费,

最终数据保持一致,结果如下所示。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

| 1| 1000000| 1000000| 2000000|

| 2| 1000200| 999800| 2000000|

| 3| 1000200| 999800| 2000000|

| 4| 1000000| 1000000| 2000000|

...

| 496| 1000000| 1000000| 2000000|

| 497| 1000000| 1000000| 2000000|

| 498| 1000200| 999800| 2000000|

| 499| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499990000,sum 1000000000

步骤13 输入命令5退出执行程序。

----结束

2.5.3 ServiceComb 接入 DTM 样例指南

本章节介绍ServiceComb 框架下MQ消息接入DTM事务的demo,使得Spring Cloud框 架下的MQ消息项目可以快速接入DTM。

其中,ServiceComb的样例代码在导入样例工程过程中准备的dtm-demo的dtm-servicecomb项目中。

前提条件

1. 已创建微服务引擎专享版,请参考创建微服务引擎专享版。

说明

DTM不支持开启了安全认证的微服务引擎专享版。

2. 请参考RocketMQ安装和启动,在启动样例之前,先安装、启动RocketMQ,以 便样例中的服务可以注册到RocketMQ上。

样例设计

ServiceComb样例中使用ServiceComb框架,样例流程参考图2-22。

● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。

● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。

须知

mqConsumer服务消息消费方,需要保证消费的幂等性。RocketMQ本身的消息实现 机制,会有重复消费消息的问题。

业务流程分析

● 正常场景:BankA转入200,BankB第一次转出100,第二次通过mqConsumer服 务转出100。

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

● 异常场景:mqService服务主动cancel投递的半消息,转账消息丢弃,BankA转入

@GetMapping(value = "transfer")

@DTMTxBegin(appName = "noninvasive-transfer-mq-ServiceComb")

public String transfer(@RequestParam(value = "id") int id, @RequestParam(value = "money") int money,

@RequestParam(value = "errRate") int errRate) throws Exception { LOGGER.info("mq service start invoke bankA and bankB");

bankOperator.transfer(id, money, errRate);

return "ok";

}// bankOperator有两个实现类,下面以restTemplate的实现类为例 // com.huawei.dtm.mq.server.impl.RestOpImpl.java

@Override

public String transfer(int userId, int money, int errRate) throws Exception { LOGGER.info("Start transfer---rest");

restTemplate.getForObject(String.format(BANKA_TRANSFER, userId, money * 2, errRate), String.class);

mqTemplate.sendMsg(userId, money);

// BankB在转账的时候会概率性抛出异常

restTemplate.getForObject(String.format(BANKB_TRANSFER, userId, money, errRate), String.class);

public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {

DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");

// 设置自己的MQ地址

producer.setNamesrvAddr("http://127.0.0.1:9876");

return producer; public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。

String uuid = UUID.randomUUID().toString();

String msgBody = uuid + "__" + money + "__" + userId;

Message msg =

new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));

producer.sendMessageInTransaction(msg, null);

}

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 158

mqConsumer 服务

public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");

consumer.setNamesrvAddr("http://127.0.0.1:9876");

consumer.subscribe("dtm-topic-mq", "*");

/**

* 消费的时候要保证幂等性, RocketMQ为确保能正常消费, 在少数极端场景有重复消费问 题。 */

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try {

bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));

ConsumerStore.INST.add(arr[0]);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

} catch (Exception e) { e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

} });

return consumer;

}//com.huawei.common.impl.BankBService.java public void transferOut(int id, int money) { LOGGER.info("BankB transfer out");

jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);

}

datasource:

banka:

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}:3306/banka?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC

2-49 microservice.yaml 配置文件参数详解

参数 说明

default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:

2-50 配置 microservice.yaml servicecomb 参数详解

参数 说明

datasource:

bankb:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}:3306/bankb?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 160

2. 修改“dtm-servicecomb\bank-b\src\main\resources\microservice.yaml”文件中

default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:

default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:

default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:

1. 修改“dtm-servicecomb\mq-service\src\main\resources\microservice.yaml”文

default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:

default: loadbalance,bizkeeper-consumer,dtm-consumer Provider:

3. 参考表 microservice.yaml配置文件参数详解,修改“microservice.yaml”文件 的username、password和url为您创建好的bankB数据库用户名、密码和地址。

spring:

main:

allow-bean-definition-overriding: true datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}/bankb?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver

步骤7 通过Maven执行clean、install,将工程进行打包,生成banka、bankb、

auto-create-table-dtm-tran-info=on dtm-app-name=xxxx

sc-server-address=xxxx rpc-ssl-switch=off

2-51 dtmClientConfig.properties 配置文件参数详解

参数 说明

auto-create-table-dtm-tran-info 是否自动创建DTM事务表dtm_tran_info,用来记录事务信 息。

● on:自动创建

● off:手动创建

dtm-app-name 应用名称,该配置项可从“应用管理与运维平台”控制台

“分布式事务管理 DTM > 引擎实例”界面中得到。

sc-server-address 服务中心地址,该配置项可从“应用管理与运维平台”控 制台“分布式事务管理 DTM > 引擎实例”界面中得到。

rpc-ssl-switch SSL开关,该配置项可从“应用管理与运维平台”控制台

“分布式事务管理 DTM > 引擎实例”界面中得到。

● on:开启SSL

● off:关闭SSL

2-25 DTM 引擎实例

----结束

启动测试样例

步骤1 登录ECS,在bankAserviceservicecomb.jar包同级目录下,执行java

-Dfile.encoding=utf-8 -jar bankA-service-servicecomb.jar启动bankA服务。

步骤2 打开第二个窗口登录ECS,在bankBserviceservicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar bankB-service-servicecomb.jar启动bankB服务。

步骤3 打开第三个窗口登录ECS,在bankCenter-service-servicecomb.jar包同级目录下,执行 java -Dfile.encoding=utf-8 -jar bankCenter-service-servicecomb.jar启动

bankCenter服务。

步骤4 打开第四个窗口登录ECS,在mqserviceservicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-service-servicecomb.jar启动mq-service服务 步骤5 打开第五个窗口登录ECS,在invokeserviceservicecomb.jar包所在目录,执行java

-Dfile.encoding=utf-8 -jar invoke-service-servicecomb.jar启动invoke服务。

[0] 初始化数据库, 重置账号资金;

[1] 查询 Bank A 和 Bank B 余额;

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

[2] 非侵入用例 -> DTM 事务 微服务场景调用;

2021-03-23 11:51:59.417 [main] INFO c.h.d.c.service.TransferService - Init bankA bankB success

步骤7 输入命令1查询帐号余额,结果如下所示。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500000000,total b 500000000,sum 1000000000

步骤8 输入命令4,执行非侵入样例DTM对接消息事务的微服务场景验证。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

| 1| 1000000| 1000000| 2000000|

[ERROR] user id: 2, bankA: 1000200, bankB: 999900, total: 2000100 [ERROR] user id: 3, bankA: 1000200, bankB: 999900, total: 2000100

| 4| 1000000| 1000000| 2000000|

...

| 496| 1000000| 1000000| 2000000|

| 497| 1000000| 1000000| 2000000|

[ERROR] user id: 498, bankA: 1000200, bankB: 999900, total: 2000100

| 499| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499995000,sum 1000005000

步骤11 打开第六个窗口登录ECS,在mqconsumerservicecomb.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-consumer-servicecomb.jar启动mq-consumer服 务。

步骤12 再输入命令1查询帐号余额,此时通过MQ消息传递的bankB转出100的消息被消费,

最终数据保持一致,结果如下所示。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

| 498| 1000000| 1000000| 2000000|

| 499| 1000200| 999800| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 49990000,sum 1000000000

步骤13 输入命令5退出执行程序。

----结束

2.5.4 Dubbo 接入 DTM 样例指南

本章节介绍Dubbo框架下MQ消息接入DTM事务的demo,使得Dubbo框架下的MQ消 息项目可以快速接入DTM。

其中,Dubbo的样例代码在 导入样例工程 过程中准备的dtm-demo的dtm-dubbo项目 中。

前提条件

1. 下载安装ZooKeeper,请参考8.1下载安装Zookeeper。

2. 在启动我们提供的dtm-dubbo样例之前,请先启动ZooKeeper,以便dtm-dubbo 中的服务可以注册到ZooKeeper上。

3. 请参考RocketMQ安装和启动,在启动样例之前,先安装、启动RocketMQ,以 便样例中的服务可以注册到RocketMQ上。

样例设计

dtm-dubbo样例中使用Dubbo框架,样例流程参考图2-22。

● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。

● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。

须知

mqConsumer服务消息消费方,需要保证消费的幂等性。RocketMQ本身的消息实现 机制,会有重复消费消息的问题。

业务流程分析

● 正常场景:BankA转入200,BankB第一次转出100,第二次通过mqConsumer服 务转出100。

● 异常场景:mqService服务主动cancel投递的半消息,转账消息丢弃,BankA转入 的200回滚。

DTM 全局事务发起者

定义非侵入样例微服务场景事务发起端,通过微服务接口调用bankA发起银行转账,

并且投递半消息。

// com.huawei.dubbo.mq.service.controller.MqServiceController.java

@DTMTxBegin(appName = "mq-transfer-dubbo")

public String transfer(int id, int money, int errRate) throws Exception {

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

LOGGER.info("mq service start invoke bankA and bankB: {}", DTMContext.getDTMContext().getGlobalTxId());

bankAController.transfer(id, money * 2, errRate);

rocketMqTemplate.sendMsg(id, money);

bankBController.transfer(id, money, errRate);

return "ok";

public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {

DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");

// 设置自己的MQ地址

producer.setNamesrvAddr("http://127.0.0.1:9876");

return producer; public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。

String uuid = UUID.randomUUID().toString();

String msgBody = uuid + "__" + money + "__" + userId;

Message msg =

new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));

producer.sendMessageInTransaction(msg, null);

}

public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");

consumer.setNamesrvAddr("http://127.0.0.1:9876");

consumer.subscribe("dtm-topic-mq", "*");

/**

* 消费的时候要保证幂等性, RocketMQ为确保能正常消费, 在少数极端场景有重复消费问 题。 */

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 166

try {

bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));

ConsumerStore.INST.add(arr[0]);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

} catch (Exception e) { e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

} });

return consumer;

}// com.huawei.dubbo.common.impl.BankBService.java public void transferOut(int id, int money) {

LOGGER.info("BankB transfer out");

jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);

}

address: zookeeper://${registry_ip}:${registry_port}

metadata-report:

address: zookeeper://${registry_ip}:${registry_port}

scan:

base-packages: com.huawei.dubbo.banka protocols:

dubbo:

datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

url: jdbc:mysql://${db_ip}:3306/banka?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver

2-52 bankA 配置文件参数详解

application:

name: dtm-banka main:

allow-bean-definition-overriding: true cloud:

zookeeper:

enabled: true

connect-string: ${registry_ip}:${registry_port}

4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下

2-53 dtmClientConfig.properties 配置文件参数详解

参数 说明

参数 说明

address: zookeeper://${registry_ip}:${registry_port}

metadata-report:

address: zookeeper://${registry_ip}:${registry_port}

scan:

base-packages: com.huawei.dubbo.bankb protocols:

dubbo:

datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}:3306/bankb?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver

3. 在bankB-service-dubbo.jar包同级目录下,新建配置文件“bootstrap.yaml”,

把下列配置拷贝到“bootstrap.yaml”文件中,使用ZooKeeper作为注册中心时的 注册信息参考表2-52,修改registry_ip和registry_port,完成后保存退出。

spring:

application:

name: dtm-bankb

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

main:

allow-bean-definition-overriding: true cloud:

zookeeper:

enabled: true

connect-string: ${registry_ip}:${registry_port}

4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下

address: zookeeper://${registry_ip}:${registry_port}

metadata-report:

address: zookeeper://${registry_ip}:${registry_port}

scan:

base-packages: com.huawei.dubbo.bank.center protocols:dubbo:

name: dubbo port: -1 server:

port: 8089

2-54 bankCenter 配置文件参数详解

参数 说明

application:

name: dtm-bank-center main:

allow-bean-definition-overriding: true cloud:

zookeeper:

enabled: true

connect-string: ${registry_ip}:${registry_port}

4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下 列配置拷贝到配置文件“dtmClientConfig.properties”中, 参考表2-53,按实际

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 170

修改auto-create-table-dtm-tran-info、dtm-app-name、sc-server-address和

address: zookeeper://${registry_ip}:${registry_port}

metadata-report:

address: zookeeper://${registry_ip}:${registry_port}

scan:

base-packages: com.huawei.dubbo.mq.service protocols:

dubbo:

application:

name: mq-service main:

allow-bean-definition-overriding: true cloud:

zookeeper:

enabled: true

connect-string: ${registry_ip}:${registry_port}

4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下

address: zookeeper://${registry_ip}:${registry_port}

metadata-report:

address: zookeeper://${registry_ip}:${registry_port}

scan:

base-packages: com.huawei.dubbo.mq.consumer protocols:

dubbo:

application:

name: mq-consumer main:

allow-bean-definition-overriding: true cloud:

zookeeper:

enabled: true

connect-string: ${registry_ip}:${registry_port}

datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}:3306/bankb?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver

4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下

address: zookeeper://${registry_ip}:${registry_port}

metadata-report:

address: zookeeper://${registry_ip}:${registry_port}

scan:

base-packages: com.huawei.dubbo.invoke protocols:

dubbo:

name: dubbo

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 172

port: -1

application:

name: dtm-invoke main:

allow-bean-definition-overriding: true cloud:

zookeeper:

enabled: true

connect-string: ${registry_ip}:${registry_port}

4. 在目录“dtm-config”下,新建配置文件“dtmClientConfig.properties”,把下

步骤2 登录ECS,在bankAservicedubbo.jar包同级目录下打开终端窗口,执行java -Dfile.encoding=utf-8 -jar bankA-service-dubbo.jar启动banka服务。

步骤3 打开第二个窗口登录ECS,在bankB-service-dubbo.jar包同级目录下打开一个终端窗 口,执行java -Dfile.encoding=utf-8 -jar bankB-service-dubbo.jar启动bankb服 务。

步骤4 打开第三个窗口登录ECS,在bank-center-dubbo.jar包同级目录下打开一个终端窗口,

执行java -Dfile.encoding=utf-8 -jar bank-center-dubbo.jar启动bankcenter服务。

步骤5 打开第四个窗口登录ECS,在mqservicedubbo.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-service-dubbo.jar启动mq-service服务。

步骤6 打开第五个窗口登录ECS,在invoke-service-mq-dubbo.jar包同级目录下打开一个终端 窗口,执行java -Dfile.encoding=utf-8 -jar invoke-service-mq-dubbo.jar启动 invoke服务。

02021-06-15 21:00:32.905 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Init bankA bankB success

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

步骤8 输入命令1查询BankA和BankB帐号余额。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

2021-03-23 14:25:29,799 [INFO] Run finish. total a 500000000,total b 500000000,sum 1000000000

步骤9 输入命令2,执行非侵入样例DTM对接消息事务的微服务场景验证。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

| 1| 1000000| 1000000| 2000000|

[ERROR] user id: 2, bankA: 1000200, bankB: 999900, total: 2000100 [ERROR] user id: 3, bankA: 1000200, bankB: 999900, total: 2000100

| 4| 1000000| 1000000| 2000000|

...

| 496| 1000000| 1000000| 2000000|

| 497| 1000000| 1000000| 2000000|

[ERROR] user id: 498, bankA: 1000200, bankB: 999900, total: 2000100

| 499| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499995000,sum 1000005000

步骤12 打开第六个窗口登录ECS,在mqconsumerdubbo.jar包所在目录,执行java -Dfile.encoding=utf-8 -jar mq-consumer-dubbo.jar启动mq-consumer服务。

步骤13 在invoke服务中,再输入命令1查询帐号余额,此时通过MQ消息传递的bankB转出100 的消息被消费,最终数据保持一致,结果如下所示。

|--- userId ---|--- bankA-money ---|--- bankB-money ---|---- sum ----|

| 0| 1000000| 1000000| 2000000|

2021-06-15 21:01:41.243 INFO 16772 --- [main] c.h.dtm.invoke.service.TransferService: Run finish. total a 500010000,total b 499990000,sum 1000000000

步骤14 输入命令3退出执行程序。

----结束

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 174

2.5.5 Dubbo ServiceComb 接入 DTM 样例指南

本章节介绍Dubbo ServiceComb框架下MQ消息接入DTM事务的demo,使得Dubbo ServiceComb框架下的MQ消息项目可以快速接入DTM。

其中,Dubbo ServiceComb的样例代码在导入样例工程过程中准备的dtm-demo的 dtm-dubbo-servicecomb项目中。

前提条件

1. 已创建微服务引擎专享版,请参考创建微服务引擎专享版。

说明

DTM不支持开启了安全认证的微服务引擎专享版。

2. 请参考RocketMQ安装和启动,在启动样例之前,先安装、启动RocketMQ,以 便样例中的服务可以注册到RocketMQ上。

样例设计

Dubbo ServiceComb样例中使用Dubbo ServiceComb框架,样例流程参考图2-22。

● 组成:bankA服务、bankB服务、mqService服务、mqConsumer服务。

● 功能:mqService服务和mqConsumer服务会对接消息RocketMQ,保证事务的一 致性。

须知

mqConsumer服务消息消费方,需要保证消费的幂等性。RocketMQ本身的消息实现 机制,会有重复消费消息的问题。

业务流程分析

● 正常场景:BankA转入200,BankB第一次转出100,第二次通过mqConsumer服 务转出100。

● 异常场景:mqService服务主动cancel投递的半消息,转账消息丢弃,BankA转入 的200回滚。

DTM 全局事务发起者

定义非侵入样例微服务场景事务发起端,通过微服务接口调用bankA发起银行转账,

并且投递半消息。

// com.huawei.dubbo.mq.service.controller.MqServiceController.java

@DTMTxBegin(appName = "mq-transfer-dubbo-servicecomb") public String transfer(int id, int money, int errRate) throws Exception { LOGGER.info("mq service start invoke bankA and bankB: {}", DTMContext.getDTMContext().getGlobalTxId());

bankAController.transfer(id, money * 2, errRate);

rocketMqTemplate.sendMsg(id, money);

bankBController.transfer(id, money, errRate);

return "ok";

}

最佳实践 2 使用 DTM 保证银行转账业务数据一致性

mqService 服务

● 使用DTM封装好的事务生产者DtmRocketMqProducer。

// com.huawei.dubbo.mq.service.config.WebConfig.java;

@Bean

public DtmRocketMqProducer dtmRocketMqProducer() throws Exception {

DtmRocketMqProducer producer = new DtmRocketMqProducer("dtm-rocket-mq");

// 设置自己的MQ地址

producer.setNamesrvAddr("http://127.0.0.1:9876");

return producer; public void sendMsg(int userId, int money) throws Exception { // 为了保证消费幂等性,每一个消息带一个唯一的UUID。

String uuid = UUID.randomUUID().toString();

String msgBody = uuid + "__" + money + "__" + userId;

Message msg =

new Message("dtm-topic-mq", "tag", msgBody.getBytes(StandardCharsets.UTF_8));

producer.sendMessageInTransaction(msg, null);

}

public DefaultMQPushConsumer dtmConsumer(BankBService bankBService) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dtm-rocket-mq");

consumer.setNamesrvAddr("http://127.0.0.1:9876");

consumer.subscribe("dtm-topic-mq", "*");

/**

* 消费的时候要保证幂等性, RocketMQ为确保能正常消费, 在少数极端场景有重复消费问 题。 */

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try {

防止重复消费。

*/

bankBService.transferOut(Integer.parseInt(arr[2]), Integer.parseInt(arr[1]));

ConsumerStore.INST.add(arr[0]);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

} catch (Exception e) { e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

} });

return consumer;

}// com.huawei.dubbo.common.impl.BankBService.java public void transferOut(int id, int money) {

LOGGER.info("BankB transfer out");

jdbcTemplate.update(DtmConst.TransferSql.TRANSFER_OUT_SQL, money, id);

}

application:

name: dtm-banka main:

allow-bean-definition-overriding: true datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}/banka?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver

2-55 bankA-service 配置文件参数详解

参数 说明

dubbo.servicecomb.registry.address=${registry_address}

dubbo.servicecomb.config.address=${config_address}

2-56 dubbo.properties 配置文件参数详解

参数 说明

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"

xmlns="http://www.springframework.org/schema/beans"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://

www.springframework.org/schema/beans/spring-beans-4.3.xsd

http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/

dubbo.xsd">

<dubbo:application name="dtm-banka"/>

<dubbo:registry address="sc://${registry_address}"/>

<dubbo:protocol name="dubbo" port="8037"/>

<bean id="bankAController"

class="com.huawei.dubbo.banka.controller.BankAController"/>

<dubbo:service interface="com.huawei.dubbo.common.intf.IBankAController"

ref="bankAController"/>

<dubbo:provider timeout="30000"/>

</beans>

application:

name: dtm-bankb main:

allow-bean-definition-overriding: true datasource:

bank:

username: ${db_user_name}

password: ${db_user_pwd}

url: jdbc:mysql://${db_ip}/bankb?

verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=

UTC driver-class-name: com.mysql.cj.jdbc.Driver

文档版本 01 (2021-12-28) 版权所有 © 华为技术有限公司 178

2. 修改“dtm-dubbo-servicecomb\bankB-service\src\main\resources

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"

xmlns="http://www.springframework.org/schema/beans"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://

www.springframework.org/schema/beans/spring-beans-4.3.xsd

http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/

dubbo.xsd">

<dubbo:application name="dtm-bankb"/>

<dubbo:registry address="sc://${registry_address}"/>

<dubbo:protocol name="dubbo" port="8047"/>

<bean id="bankBController"

class="com.huawei.dubbo.bankb.controller.BankBController"/>

<dubbo:service interface="com.huawei.dubbo.common.intf.IBankBController"

ref="bankBController"/>

<dubbo:provider timeout="30000"/>

</beans>

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"

xmlns="http://www.springframework.org/schema/beans"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://

www.springframework.org/schema/beans/spring-beans-4.3.xsd

http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/

dubbo.xsd">

<dubbo:application name="bank-center"/>

<dubbo:registry address="sc://${registry_address}"/>

<dubbo:protocol name="dubbo" port="8031"/>

<bean id="bankCenterController"

class="com.huawei.dubbo.bank.center.controller.BankCenterController"/>

<dubbo:service interface="com.huawei.dubbo.common.intf.IBankCenterController"

ref="bankCenterController"/>

<dubbo:reference id="bankAController" check="false"

interface="com.huawei.dubbo.common.intf.IBankAController"/>

interface="com.huawei.dubbo.common.intf.IBankAController"/>

相關文件