一、概述
上一片篇文章简单介绍了Seata分布式事务框架,接下来我们通过一个案例来详细解读Seata分布式事务框架的TCC模式。
和AT(2PC)模式不同的是,TCC模式需要每个微服务的交易接口内单独实现T、C、R三个方法。
T是prepare/try方法,尝试锁定或者预定资源,如冻结余额。C是commit方法,真正执行提交事务的方法,如扣减余额。R是rollback方法,回滚此交易接口的操作。
也就是说相比于AT模式的自带全局回滚,TCC模式需要每个微服务的交易自己实现回滚。
TCC模式比AT模式相比,一个需要每个交易自己按照TCC三个方法的模式实现业务功能,代码侵入性和编程复杂度更高。但是其回滚的灵活性也更好,可以自定义回滚策略。
下面是一个SpringCloud+seata+tcc分布式事务案例,源码地址为:
二、环境准备
① 首先准备基本的IDE,这里是IDEA 2024。基本的JAVA 版本和Maven版本等。笔者的Java版本是21。 高版本的Java在序列化时候可能会出现模块化之后无法反射调用部分Java类的问题。添加下面参数作为VM启动参数:
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.rmi/sun.rmi.transport=ALL-UNNAMED
--add-opens
java.base/java.lang.reflect=ALL-UNNAMED
--add-opens
java.base/java.util=ALL-UNNAMED
--add-opens
java.base/java.math=ALL-UNNAMED
② 下载seata server和启动server(也可以自己下载源码编译和执行),这里是file模式和standalone单机模式:
sh seata-server.sh -p 8091 -h 127.0.0.1 -m file
如果是使用DB模式,需要修改对应的数据库密码和创建单独的seata数据库。
若使用Nacos,按照官方文档配置和启动Nacos。
③ SpringCloud服务的注册中心。这里默认是使用Consul,也可以使用Nacos。 ④ 为每个服务创建单独的库,并设置连接用户和密码,以Account为例:
server:
port: 8083
spring:
application:
name: tcc-account-service
cloud:
loadbalancer:
ribbon:
enabled: false
consul:
host: 127.0.0.1
port: 8500
discovery:
heartbeat:
enabled: true
ttl-value: 5
ttl-unit: s
prefer-ip-address: true
datasource:
url: jdbc:mysql://127.0.0.1:3306/seata_account?userSSL=false&useUnicode=true&characterEncoding=UTF8&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: mysql
# Seata 配置项,对应 SeataProperties 类
seata:
enable-auto-data-source-proxy: false
application-id: tcc-account-service # Seata 应用编号,默认为 ${spring.application.name}
tx-service-group: tcc-account-service-group # Seata 事务组编号,用于 TC 集群名
# Seata 服务配置项,对应 ServiceProperties 类
service:
# 虚拟组和分组的映射
vgroup-mapping:
tcc-account-service-group: default
# 分组和 Seata 服务的映射
grouplist:
default: 127.0.0.1:8091
# Seata 注册中心配置项,对应 RegistryProperties 类
# registry:
# type: nacos # 注册中心类型,默认为 file
# nacos:
# cluster: default # 使用的 Seata 分组
# namespace: # Nacos 命名空间
# serverAddr: 127.0.0.1:8848 # Nacos 服务地址
创建对应微服务的业务表,因为TCC模式不需要用到undo_log表,所以这里可以不创建undo_log表。
-- ----------------------------
-- Table structure for account
-- ----------------------------
CREATE TABLE `account`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户id',
`balance` int(11) DEFAULT NULL COMMENT '总余额',
`frozen` int(11) DEFAULT NULL COMMENT '冻结余额',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE = INNODB
AUTO_INCREMENT = 2
DEFAULT CHARSET = utf8;
INSERT INTO `seata_account`.`account` (`id`, `balance`, `frozen`)
VALUES ('1', '100', '0');
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
然后启动Account、Order、Product这3个服务,最后启动Business服务。
注意本地启动的话,服务端口不能重复了。
服务正常启动就可以开始本地测试。
三、案例执行和解读
3.1 执行链路
这个工程是使用4个微服务组成,分别是Account、Order、Product、Business。Business作为最外层的入口微服务,启动GlobalTransaction,然后调用Account、Order、Product三个微服务的接口。
调用链路是Business->Stock/Product,和Business->Order->Account。
Business是TM,Account、Order、Product是RM。
具体执行流程上,从BusinessController的”/buy”请求路径作为入口,调用Product扣减库存,然后调用Order创建订单。OrderService调用Account扣减余额。
@Override
@GlobalTransactional
public String handleBusiness(BusinessDTO businessDTO) {
String xid = RootContext.getXID();
logger.info("[handleBusiness] 开始下单");
logger.info("[handleBusiness] 当前 XID: {}",xid);
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
// 扣减库存 Try
boolean result = productService.productTry(actionContext, businessDTO.getProductId(), businessDTO.getCount());
if(!result){
throw new RuntimeException("扣减库存一阶段失败");
}
// 查询 商品单价
Integer price = productService.getPrice(businessDTO.getProductId());
Integer payAmount = price * businessDTO.getCount();
// 生成orderId
Long orderId = IDUtils.nextId();
// 创建订单 Try
result = orderService.orderTry(actionContext, orderId, businessDTO.getUserId(), businessDTO.getProductId(),
businessDTO.getCount(), payAmount);
if(!result){
throw new RuntimeException("创建订单一阶段失败");
}
logger.info("[handleBusiness] 下单成功, 订单Id: " + orderId);
return "Place Order Success";
}
跨微服务之间使用FeignClient从注册中心获取需要调用的server和name和IP,透明化Http调用,调用其他Service/Server的服务。
@FeignClient("tcc-product-service")
@RequestMapping("/product")
public interface ProductService {
@PostMapping("/try")
boolean productTry(@RequestBody BusinessActionContext actionContext,
@RequestParam("productId") Long productId,
@RequestParam("count") Integer count);
@PostMapping("/confirm")
boolean productConfirm(@RequestBody BusinessActionContext actionContext);
@PostMapping("/cancel")
boolean productCancel(@RequestBody BusinessActionContext actionContext);
@GetMapping("/getPrice")
Integer getPrice(@RequestParam("productId") Long productId);
}
所以上面的productTry方法会调用到tcc-product-service的这个请求路径。
@PostMapping("/try")
public boolean productTry(@RequestBody BusinessActionContext actionContext,
@RequestParam("productId") Long productId,
@RequestParam("count") Integer count){
return productService.productTry(actionContext, productId, count);
}
整体调用流程基本就是这样。
然后正常启动服务,首先测试全局成功提交的案例。
3.2 全局提交
使用HttpClient发送请求:
POST http://127.0.0.1:9999/tcc/buy
Content-Type: application/json
{
"userId": "1",
"productId":"1",
"count":"2"
}
成功响应:
HTTP/1.1 200 OK
Connection: keep-alive
Content-Type: text/plain;charset=UTF-8
Content-Length: 19
Date: Thu, 18 Apr 2024 03:31:57 GMT
Place Order Success
首先检查数据状态。 Account数据状态:
1,90,0,2024-04-18 11:31:57
Product数据状态:
1,5,98,0,2024-04-18 11:31:57
Order数据状态:
1607941629149216,1,1,2,10,1,2024-04-18 11:31:57,2024-04-18 11:31:57
可以看到数据是全局成功提交的。
以Account的执行日志简单观测一下分支事务的执行流程:
2024-04-18 11:31:57.359 ERROR 796 --- [ XNIO-1 task-1] c.a.druid.pool.DruidAbstractDataSource : discard long time none received connection. , jdbcUrl : jdbc:mysql://127.0.0.1:3306/seata_account?userSSL=false&useUnicode=true&characterEncoding=UTF8&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai, jdbcUrl : jdbc:mysql://127.0.0.1:3306/seata_account?userSSL=false&useUnicode=true&characterEncoding=UTF8&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai, lastPacketReceivedIdleMillis : 151391
2024-04-18 11:31:57.366 INFO 796 --- [ XNIO-1 task-1] c.d.s.t.a.s.impl.AccountServiceImpl : [accountTry]: 当前 XID:172.29.16.1:8091:6927048120732393496, branchId:6927048120732393499, 用户:1, 金额:10
2024-04-18 11:31:57.367 INFO 796 --- [ XNIO-1 task-1] c.d.s.t.a.s.impl.AccountServiceImpl : [accountTry]: 冻结 10 余额成功
2024-04-18 11:31:57.397 INFO 796 --- [h_RMROLE_1_7_48] i.s.c.r.p.c.RmBranchCommitProcessor : rm client handle branch commit process:xid=172.29.16.1:8091:6927048120732393496,branchId=6927048120732393499,branchType=TCC,resourceId=accountService,applicationData={"actionContext":{"action-start-time":1713411117354,"useTCCFence":false,"payAmount":10,"productId":1,"sys::prepare":"accountTry","orderId":1607941629149216,"count":2,"sys::rollback":"accountCancel","sys::commit":"accountConfirm","host-name":"172.29.16.1","userId":1,"actionName":"accountService","price":10}}
2024-04-18 11:31:57.397 INFO 796 --- [h_RMROLE_1_7_48] io.seata.rm.AbstractRMHandler : Branch committing: 172.29.16.1:8091:6927048120732393496 6927048120732393499 accountService {"actionContext":{"action-start-time":1713411117354,"useTCCFence":false,"payAmount":10,"productId":1,"sys::prepare":"accountTry","orderId":1607941629149216,"count":2,"sys::rollback":"accountCancel","sys::commit":"accountConfirm","host-name":"172.29.16.1","userId":1,"actionName":"accountService","price":10}}
2024-04-18 11:31:57.398 INFO 796 --- [h_RMROLE_1_7_48] c.d.s.t.a.s.impl.AccountServiceImpl : [accountConfirm]: 当前 XID:172.29.16.1:8091:6927048120732393496, branchId:6927048120732393499, 用户:1, 金额:10
2024-04-18 11:31:57.399 INFO 796 --- [h_RMROLE_1_7_48] c.d.s.t.a.s.impl.AccountServiceImpl : [accountConfirm]: 扣减 10 余额成功
2024-04-18 11:31:57.401 INFO 796 --- [h_RMROLE_1_7_48] io.seata.rm.AbstractResourceManager : TCC resource commit result : true, xid: 172.29.16.1:8091:6927048120732393496, branchId: 6927048120732393499, resourceId: accountService
2024-04-18 11:31:57.402 INFO 796 --- [h_RMROLE_1_7_48] io.seata.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
可以看到账户的处理流程是先冻结余额,然后二阶段的解冻余额和扣除余额,完成本地事务的提交或者回滚。 因为没有出现回滚,所以只会执行TCC的T-prepare和C-commit操作。
TCC的Service如下,注意看TwoPhaseBusinessAction注解,包含方法本身和C-accountConfirm、C-accountCancel这两个方法。C-accountConfirm、C-accountCancel这两个方法是Seata TCC模式自动调用的。
@LocalTCC
public interface AccountService {
@TwoPhaseBusinessAction(name = "accountService", commitMethod = "accountConfirm", rollbackMethod = "accountCancel")
boolean accountTry(BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "price") Integer price);
boolean accountConfirm(BusinessActionContext actionContext);
boolean accountCancel(BusinessActionContext actionContext);
}
具体的DAO接口如下:
@Mapper
@Repository
public interface AccountDao {
/**
* Account Try 冻结余额
* @param userId 用户 ID
* @param amount 冻结的余额
* @return 影响的记录行
*/
@Update("UPDATE account set frozen = frozen + #{amount} WHERE id = #{userId} AND balance >= frozen + #{amount}")
int accountTry(@Param("userId") Long userId, @Param("amount") Integer amount);
/**
* Account Confirm 正式扣减余额,释放冻结余额
* @param userId 用户 ID
* @param amount 冻结的余额
* @return 影响的记录行
*/
@Update("UPDATE account set frozen = frozen - #{amount}, balance = balance - #{amount} WHERE id = #{userId}")
int accountConfirm(@Param("userId") Long userId, @Param("amount") Integer amount);
/**
* Account Cancel 释放冻结余额
* @param userId 用户 ID
* @param amount 冻结的余额
* @return 影响的记录行
*/
@Update("UPDATE account set frozen = frozen - #{amount} WHERE id = #{userId}")
int accountCancel(@Param("userId") Long userId, @Param("amount") Integer amount);
}
accountTry方法:尝试冻结余额,如果余额不足,则冻结失败。
accountConfirm方法:确认提交则是扣除冻结余额和实际余额,完成扣款操作。
accountCancel方法:回滚操作是扣除冻结余额,完成回滚操作。
3.3 全局回滚
手动模拟一个服务中的异常:
@Override
@Transactional(rollbackFor = Exception.class)
public boolean accountTry(BusinessActionContext actionContext, Long userId, Integer price) {
String xId = actionContext.getXid();
long branchId = actionContext.getBranchId();
logger.info("[accountTry]: 当前 XID:{}, branchId:{}, 用户:{}, 金额:{}", xId, branchId, userId, price);
int flag = accountDao.accountTry(userId, price);
if(flag == 0){
throw new RuntimeException("账户服务 Try 阶段失败.");
}
throw new RuntimeException("模拟异常");
// //事务成功,保存一个标识,供第二阶段进行判断
// ResultHolder.setResult(getClass(), actionContext.getXid(), "p");
// logger.info("[accountTry]: 冻结 {} 余额成功", price);
// return true;
}
然后重新启动Account服务。
首先检查执行前的数据状态:
Account:
| id | balance | frozen | update\_time |
| :--- | :--- | :--- | :--- |
| 1 | 80 | 0 | 2024-04-18 15:43:42 |
product:
| id | price | stock | frozen | update\_time |
| :--- | :--- | :--- | :--- | :--- |
| 1 | 5 | 96 | 0 | 2024-04-18 15:46:43 |
orders:
| id | user\_id | product\_id | count | pay\_amount | status | create\_time | update\_time |
| :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- |
| 1607941629149216 | 1 | 1 | 2 | 10 | 1 | 2024-04-18 11:31:57 | 2024-04-18 11:31:57 |
| 1607973306630176 | 1 | 1 | 2 | 10 | 1 | 2024-04-18 15:43:42 | 2024-04-18 15:43:42 |
然后执行测试:
POST http://127.0.0.1:9999/tcc/buy
Content-Type: application/json
{
"userId": "1",
"productId":"1",
"count":"2"
}
测试输出:
{
"timestamp": "2024-04-18T07:50:17.873+00:00",
"status": 500,
"error": "Internal Server Error",
"message": "",
"path": "/tcc/buy"
}
500的错误是案例没有优化和转义错误码,具体的错误是在AccountService中抛出的,如下:
2024-04-18 15:50:17.854 ERROR 22372 --- [ XNIO-1 task-1] io.undertow.request : UT005023: Exception handling request to /account/try
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.RuntimeException: 模拟异常
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-5.3.6.jar:5.3.6]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.3.6.jar:5.3.6]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:517) ~[jakarta.servlet-api-4.0.4.jar:4.0.4]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]
再检查执行后的数据状态,具体如下,还是和执行前一致。说明进行了全局回滚。
Account:
| id | balance | frozen | update\_time |
| :--- | :--- | :--- | :--- |
| 1 | 80 | 0 | 2024-04-18 15:43:42 |
product:
| id | price | stock | frozen | update\_time |
| :--- | :--- | :--- | :--- | :--- |
| 1 | 5 | 96 | 0 | 2024-04-18 15:46:43 |
orders:
| id | user\_id | product\_id | count | pay\_amount | status | create\_time | update\_time |
| :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- |
| 1607941629149216 | 1 | 1 | 2 | 10 | 1 | 2024-04-18 11:31:57 | 2024-04-18 11:31:57 |
| 1607973306630176 | 1 | 1 | 2 | 10 | 1 | 2024-04-18 15:43:42 | 2024-04-18 15:43:42 |
以Account为例检查分支事务执行的日志:
2024-04-18 15:50:17.851 INFO 22372 --- [ XNIO-1 task-1] c.d.s.t.a.s.impl.AccountServiceImpl : [accountTry]: 当前 XID:172.29.16.1:8091:6927048120732393508, branchId:6927048120732393511, 用户:1, 金额:10
2024-04-18 15:50:17.854 ERROR 22372 --- [ XNIO-1 task-1] io.undertow.request : UT005023: Exception handling request to /account/try
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.RuntimeException: 模拟异常
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-5.3.6.jar:5.3.6]
//....
Caused by: java.lang.RuntimeException: 模拟异常
at cn.dmego.seata.tcc.account.service.impl.AccountServiceImpl.accountTry(AccountServiceImpl.java:38) ~[classes/:na]
//......
2024-04-18 15:50:17.868 INFO 22372 --- [h_RMROLE_1_2_48] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:xid=172.29.16.1:8091:6927048120732393508,branchId=6927048120732393511,branchType=TCC,resourceId=accountService,applicationData={"actionContext":{"action-start-time":1713426617842,"useTCCFence":false,"payAmount":10,"productId":1,"sys::prepare":"accountTry","orderId":1607974135005216,"count":2,"sys::rollback":"accountCancel","sys::commit":"accountConfirm","host-name":"172.29.16.1","userId":1,"actionName":"accountService","price":10}}
2024-04-18 15:50:17.868 INFO 22372 --- [h_RMROLE_1_2_48] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.29.16.1:8091:6927048120732393508 6927048120732393511 accountService
2024-04-18 15:50:17.869 INFO 22372 --- [h_RMROLE_1_2_48] c.d.s.t.a.s.impl.AccountServiceImpl : [accountCancel]: 当前 XID:172.29.16.1:8091:6927048120732393508, branchId:6927048120732393511, 用户:1, 金额:10
2024-04-18 15:50:17.869 INFO 22372 --- [h_RMROLE_1_2_48] io.seata.rm.AbstractResourceManager : TCC resource rollback result : true, xid: 172.29.16.1:8091:6927048120732393508, branchId: 6927048120732393511, resourceId: accountService
2024-04-18 15:50:17.869 INFO 22372 --- [h_RMROLE_1_2_48] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
可以看到这里的回滚操作是accountCancel方法,按照XID和branchId进行回滚,执行accountCancel方法。
整体来说和2PC有点类似的,不过回滚逻辑需要业务代码自己实现。
在TCC 中,事务的执行是分阶段的,首先尝试执行事务(Try),然后确认事务(Confirm),最后根据需要进行补偿操作(Cancel)。
TCC更具有灵活性和可扩展性,但需要开发者实现 Try、Confirm 和 Cancel 三个阶段的业务逻辑。
四. 参考资料
- https://seata.apache.org/zh-cn/docs/overview/what-is-seata
- https://github.com/apache/incubator-seata-samples.git
- https://github.com/dmego/springcloud-seata-demo.git