Seata分布式事务框架解读二:TCC模式

2024-04-18

一、概述

上一片篇文章简单介绍了Seata分布式事务框架,接下来我们通过一个案例来详细解读Seata分布式事务框架的TCC模式。

和AT(2PC)模式不同的是,TCC模式需要每个微服务的交易接口内单独实现T、C、R三个方法。

T是prepare/try方法,尝试锁定或者预定资源,如冻结余额。C是commit方法,真正执行提交事务的方法,如扣减余额。R是rollback方法,回滚此交易接口的操作。

也就是说相比于AT模式的自带全局回滚,TCC模式需要每个微服务的交易自己实现回滚。

TCC模式比AT模式相比,一个需要每个交易自己按照TCC三个方法的模式实现业务功能,代码侵入性和编程复杂度更高。但是其回滚的灵活性也更好,可以自定义回滚策略。

下面是一个SpringCloud+seata+tcc分布式事务案例,源码地址为:

springcloud-seata-demo

二、环境准备

① 首先准备基本的IDE,这里是IDEA 2024。基本的JAVA 版本和Maven版本等。笔者的Java版本是21。 高版本的Java在序列化时候可能会出现模块化之后无法反射调用部分Java类的问题。添加下面参数作为VM启动参数:

--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.rmi/sun.rmi.transport=ALL-UNNAMED
--add-opens
java.base/java.lang.reflect=ALL-UNNAMED
--add-opens
java.base/java.util=ALL-UNNAMED
--add-opens
java.base/java.math=ALL-UNNAMED

② 下载seata server和启动server(也可以自己下载源码编译和执行),这里是file模式和standalone单机模式:


sh seata-server.sh -p 8091 -h 127.0.0.1 -m file

如果是使用DB模式,需要修改对应的数据库密码和创建单独的seata数据库。

若使用Nacos,按照官方文档配置和启动Nacos。

③ SpringCloud服务的注册中心。这里默认是使用Consul,也可以使用Nacos。 ④ 为每个服务创建单独的库,并设置连接用户和密码,以Account为例:


server:
  port: 8083

spring:
  application:
    name: tcc-account-service
  cloud:
    loadbalancer:
      ribbon:
        enabled: false
    consul:
      host: 127.0.0.1
      port: 8500
      discovery:
        heartbeat:
          enabled: true
          ttl-value: 5
          ttl-unit: s
        prefer-ip-address: true

  datasource:
    url: jdbc:mysql://127.0.0.1:3306/seata_account?userSSL=false&useUnicode=true&characterEncoding=UTF8&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: mysql


# Seata 配置项,对应 SeataProperties 类
seata:
  enable-auto-data-source-proxy: false
  application-id: tcc-account-service # Seata 应用编号,默认为 ${spring.application.name}
  tx-service-group: tcc-account-service-group # Seata 事务组编号,用于 TC 集群名
  # Seata 服务配置项,对应 ServiceProperties 类
  service:
    # 虚拟组和分组的映射
    vgroup-mapping:
      tcc-account-service-group: default
    # 分组和 Seata 服务的映射
    grouplist:
      default: 127.0.0.1:8091
  # Seata 注册中心配置项,对应 RegistryProperties 类
#  registry:
#    type: nacos # 注册中心类型,默认为 file
#    nacos:
#      cluster: default # 使用的 Seata 分组
#      namespace: # Nacos 命名空间
#      serverAddr: 127.0.0.1:8848  # Nacos 服务地址

创建对应微服务的业务表,因为TCC模式不需要用到undo_log表,所以这里可以不创建undo_log表。

-- ----------------------------
-- Table structure for account
-- ----------------------------
CREATE TABLE `account`
(
    `id`          int(11) NOT NULL AUTO_INCREMENT COMMENT '用户id',
    `balance`     int(11)  DEFAULT NULL COMMENT '总余额',
    `frozen`      int(11)  DEFAULT NULL COMMENT '冻结余额',
    `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`)
) ENGINE = INNODB
  AUTO_INCREMENT = 2
  DEFAULT CHARSET = utf8;
INSERT INTO `seata_account`.`account` (`id`, `balance`, `frozen`)
VALUES ('1', '100', '0');

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`
(
    `id`            bigint(20)   NOT NULL AUTO_INCREMENT,
    `branch_id`     bigint(20)   NOT NULL,
    `xid`           varchar(100) NOT NULL,
    `context`       varchar(128) NOT NULL,
    `rollback_info` longblob     NOT NULL,
    `log_status`    int(11)      NOT NULL,
    `log_created`   datetime     NOT NULL,
    `log_modified`  datetime     NOT NULL,
    `ext`           varchar(100) DEFAULT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8;

然后启动Account、Order、Product这3个服务,最后启动Business服务。

注意本地启动的话,服务端口不能重复了。

服务正常启动就可以开始本地测试。

三、案例执行和解读

3.1 执行链路

这个工程是使用4个微服务组成,分别是Account、Order、Product、Business。Business作为最外层的入口微服务,启动GlobalTransaction,然后调用Account、Order、Product三个微服务的接口。

调用链路是Business->Stock/Product,和Business->Order->Account。

Business是TM,Account、Order、Product是RM。

具体执行流程上,从BusinessController的”/buy”请求路径作为入口,调用Product扣减库存,然后调用Order创建订单。OrderService调用Account扣减余额。

@Override
    @GlobalTransactional
    public String handleBusiness(BusinessDTO businessDTO) {
        String xid = RootContext.getXID();
        logger.info("[handleBusiness] 开始下单");
        logger.info("[handleBusiness] 当前 XID: {}",xid);
        BusinessActionContext actionContext = new BusinessActionContext();
        actionContext.setXid(xid);

        // 扣减库存 Try
        boolean result = productService.productTry(actionContext, businessDTO.getProductId(), businessDTO.getCount());
        if(!result){
            throw new RuntimeException("扣减库存一阶段失败");
        }

        // 查询 商品单价
        Integer price = productService.getPrice(businessDTO.getProductId());
        Integer payAmount = price * businessDTO.getCount();

        // 生成orderId
        Long orderId = IDUtils.nextId();

        // 创建订单 Try
        result = orderService.orderTry(actionContext, orderId, businessDTO.getUserId(), businessDTO.getProductId(),
                businessDTO.getCount(), payAmount);
        if(!result){
            throw new RuntimeException("创建订单一阶段失败");
        }
        logger.info("[handleBusiness] 下单成功, 订单Id: " + orderId);
        return "Place Order Success";
    }

跨微服务之间使用FeignClient从注册中心获取需要调用的server和name和IP,透明化Http调用,调用其他Service/Server的服务。

@FeignClient("tcc-product-service")
@RequestMapping("/product")
public interface ProductService {

    @PostMapping("/try")

    boolean productTry(@RequestBody BusinessActionContext actionContext,
                       @RequestParam("productId") Long productId,
                       @RequestParam("count") Integer count);

    @PostMapping("/confirm")
    boolean productConfirm(@RequestBody BusinessActionContext actionContext);

    @PostMapping("/cancel")
    boolean productCancel(@RequestBody BusinessActionContext actionContext);

    @GetMapping("/getPrice")
    Integer getPrice(@RequestParam("productId") Long productId);
}

所以上面的productTry方法会调用到tcc-product-service的这个请求路径。

 @PostMapping("/try")
    public boolean productTry(@RequestBody BusinessActionContext actionContext,
                              @RequestParam("productId") Long productId,
                              @RequestParam("count") Integer count){
        return productService.productTry(actionContext, productId, count);
    }

整体调用流程基本就是这样。

然后正常启动服务,首先测试全局成功提交的案例。

3.2 全局提交

使用HttpClient发送请求:

POST http://127.0.0.1:9999/tcc/buy
Content-Type: application/json

{
  "userId": "1",
  "productId":"1",
  "count":"2"
}

成功响应:

HTTP/1.1 200 OK
Connection: keep-alive
Content-Type: text/plain;charset=UTF-8
Content-Length: 19
Date: Thu, 18 Apr 2024 03:31:57 GMT

Place Order Success

首先检查数据状态。 Account数据状态:

1,90,0,2024-04-18 11:31:57

Product数据状态:

1,5,98,0,2024-04-18 11:31:57

Order数据状态:

1607941629149216,1,1,2,10,1,2024-04-18 11:31:57,2024-04-18 11:31:57

可以看到数据是全局成功提交的。

以Account的执行日志简单观测一下分支事务的执行流程:

2024-04-18 11:31:57.359 ERROR 796 --- [  XNIO-1 task-1] c.a.druid.pool.DruidAbstractDataSource   : discard long time none received connection. , jdbcUrl : jdbc:mysql://127.0.0.1:3306/seata_account?userSSL=false&useUnicode=true&characterEncoding=UTF8&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai, jdbcUrl : jdbc:mysql://127.0.0.1:3306/seata_account?userSSL=false&useUnicode=true&characterEncoding=UTF8&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai, lastPacketReceivedIdleMillis : 151391
2024-04-18 11:31:57.366  INFO 796 --- [  XNIO-1 task-1] c.d.s.t.a.s.impl.AccountServiceImpl      : [accountTry]: 当前 XID:172.29.16.1:8091:6927048120732393496, branchId:6927048120732393499, 用户:1, 金额:10
2024-04-18 11:31:57.367  INFO 796 --- [  XNIO-1 task-1] c.d.s.t.a.s.impl.AccountServiceImpl      : [accountTry]: 冻结 10 余额成功
2024-04-18 11:31:57.397  INFO 796 --- [h_RMROLE_1_7_48] i.s.c.r.p.c.RmBranchCommitProcessor      : rm client handle branch commit process:xid=172.29.16.1:8091:6927048120732393496,branchId=6927048120732393499,branchType=TCC,resourceId=accountService,applicationData={"actionContext":{"action-start-time":1713411117354,"useTCCFence":false,"payAmount":10,"productId":1,"sys::prepare":"accountTry","orderId":1607941629149216,"count":2,"sys::rollback":"accountCancel","sys::commit":"accountConfirm","host-name":"172.29.16.1","userId":1,"actionName":"accountService","price":10}}
2024-04-18 11:31:57.397  INFO 796 --- [h_RMROLE_1_7_48] io.seata.rm.AbstractRMHandler            : Branch committing: 172.29.16.1:8091:6927048120732393496 6927048120732393499 accountService {"actionContext":{"action-start-time":1713411117354,"useTCCFence":false,"payAmount":10,"productId":1,"sys::prepare":"accountTry","orderId":1607941629149216,"count":2,"sys::rollback":"accountCancel","sys::commit":"accountConfirm","host-name":"172.29.16.1","userId":1,"actionName":"accountService","price":10}}
2024-04-18 11:31:57.398  INFO 796 --- [h_RMROLE_1_7_48] c.d.s.t.a.s.impl.AccountServiceImpl      : [accountConfirm]: 当前 XID:172.29.16.1:8091:6927048120732393496, branchId:6927048120732393499, 用户:1, 金额:10
2024-04-18 11:31:57.399  INFO 796 --- [h_RMROLE_1_7_48] c.d.s.t.a.s.impl.AccountServiceImpl      : [accountConfirm]: 扣减 10 余额成功
2024-04-18 11:31:57.401  INFO 796 --- [h_RMROLE_1_7_48] io.seata.rm.AbstractResourceManager      : TCC resource commit result : true, xid: 172.29.16.1:8091:6927048120732393496, branchId: 6927048120732393499, resourceId: accountService
2024-04-18 11:31:57.402  INFO 796 --- [h_RMROLE_1_7_48] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed

可以看到账户的处理流程是先冻结余额,然后二阶段的解冻余额和扣除余额,完成本地事务的提交或者回滚。 因为没有出现回滚,所以只会执行TCC的T-prepare和C-commit操作。

TCC的Service如下,注意看TwoPhaseBusinessAction注解,包含方法本身和C-accountConfirm、C-accountCancel这两个方法。C-accountConfirm、C-accountCancel这两个方法是Seata TCC模式自动调用的。

@LocalTCC
public interface AccountService {

    @TwoPhaseBusinessAction(name = "accountService", commitMethod = "accountConfirm", rollbackMethod = "accountCancel")
    boolean accountTry(BusinessActionContext actionContext,
                       @BusinessActionContextParameter(paramName = "userId") Long userId,
                       @BusinessActionContextParameter(paramName = "price") Integer price);

    boolean accountConfirm(BusinessActionContext actionContext);

    boolean accountCancel(BusinessActionContext actionContext);
}

具体的DAO接口如下:

@Mapper
@Repository
public interface AccountDao {

    /**
     * Account Try 冻结余额
     * @param userId 用户 ID
     * @param amount 冻结的余额
     * @return 影响的记录行
     */
    @Update("UPDATE account set frozen = frozen + #{amount} WHERE id = #{userId} AND balance >= frozen + #{amount}")
    int accountTry(@Param("userId") Long userId, @Param("amount") Integer amount);

    /**
     * Account Confirm 正式扣减余额,释放冻结余额
     * @param userId 用户 ID
     * @param amount 冻结的余额
     * @return 影响的记录行
     */
    @Update("UPDATE account set frozen = frozen - #{amount}, balance = balance - #{amount} WHERE id = #{userId}")
    int accountConfirm(@Param("userId") Long userId, @Param("amount") Integer amount);

    /**
     * Account Cancel 释放冻结余额
     * @param userId 用户 ID
     * @param amount 冻结的余额
     * @return 影响的记录行
     */
    @Update("UPDATE account set frozen = frozen - #{amount} WHERE id = #{userId}")
    int accountCancel(@Param("userId") Long userId, @Param("amount") Integer amount);

}

accountTry方法:尝试冻结余额,如果余额不足,则冻结失败。

accountConfirm方法:确认提交则是扣除冻结余额和实际余额,完成扣款操作。

accountCancel方法:回滚操作是扣除冻结余额,完成回滚操作。

3.3 全局回滚

手动模拟一个服务中的异常:

@Override
    @Transactional(rollbackFor = Exception.class)
    public boolean accountTry(BusinessActionContext actionContext, Long userId, Integer price) {
        String xId = actionContext.getXid();
        long branchId = actionContext.getBranchId();
        logger.info("[accountTry]: 当前 XID:{}, branchId:{}, 用户:{}, 金额:{}", xId, branchId, userId, price);
        int flag = accountDao.accountTry(userId, price);
        if(flag == 0){
            throw new RuntimeException("账户服务 Try 阶段失败.");
        }
        throw  new RuntimeException("模拟异常");
//        //事务成功,保存一个标识,供第二阶段进行判断
//        ResultHolder.setResult(getClass(), actionContext.getXid(), "p");
//        logger.info("[accountTry]: 冻结 {} 余额成功", price);
//        return true;
    }

然后重新启动Account服务。

首先检查执行前的数据状态:

Account:

| id | balance | frozen | update\_time |
| :--- | :--- | :--- | :--- |
| 1 | 80 | 0 | 2024-04-18 15:43:42 |

product:

| id | price | stock | frozen | update\_time |
| :--- | :--- | :--- | :--- | :--- |
| 1 | 5 | 96 | 0 | 2024-04-18 15:46:43 |

orders:

| id | user\_id | product\_id | count | pay\_amount | status | create\_time | update\_time |
| :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- |
| 1607941629149216 | 1 | 1 | 2 | 10 | 1 | 2024-04-18 11:31:57 | 2024-04-18 11:31:57 |
| 1607973306630176 | 1 | 1 | 2 | 10 | 1 | 2024-04-18 15:43:42 | 2024-04-18 15:43:42 |

然后执行测试:

POST http://127.0.0.1:9999/tcc/buy
Content-Type: application/json

{
  "userId": "1",
  "productId":"1",
  "count":"2"
}

测试输出:

{
  "timestamp": "2024-04-18T07:50:17.873+00:00",
  "status": 500,
  "error": "Internal Server Error",
  "message": "",
  "path": "/tcc/buy"
}

500的错误是案例没有优化和转义错误码,具体的错误是在AccountService中抛出的,如下:

2024-04-18 15:50:17.854 ERROR 22372 --- [  XNIO-1 task-1] io.undertow.request                      : UT005023: Exception handling request to /account/try

org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.RuntimeException: 模拟异常
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.3.6.jar:5.3.6]
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:517) ~[jakarta.servlet-api-4.0.4.jar:4.0.4]
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]

再检查执行后的数据状态,具体如下,还是和执行前一致。说明进行了全局回滚。

Account:

| id | balance | frozen | update\_time |
| :--- | :--- | :--- | :--- |
| 1 | 80 | 0 | 2024-04-18 15:43:42 |

product:

| id | price | stock | frozen | update\_time |
| :--- | :--- | :--- | :--- | :--- |
| 1 | 5 | 96 | 0 | 2024-04-18 15:46:43 |

orders:

| id | user\_id | product\_id | count | pay\_amount | status | create\_time | update\_time |
| :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- |
| 1607941629149216 | 1 | 1 | 2 | 10 | 1 | 2024-04-18 11:31:57 | 2024-04-18 11:31:57 |
| 1607973306630176 | 1 | 1 | 2 | 10 | 1 | 2024-04-18 15:43:42 | 2024-04-18 15:43:42 |

以Account为例检查分支事务执行的日志:

2024-04-18 15:50:17.851  INFO 22372 --- [  XNIO-1 task-1] c.d.s.t.a.s.impl.AccountServiceImpl      : [accountTry]: 当前 XID:172.29.16.1:8091:6927048120732393508, branchId:6927048120732393511, 用户:1, 金额:10
2024-04-18 15:50:17.854 ERROR 22372 --- [  XNIO-1 task-1] io.undertow.request                      : UT005023: Exception handling request to /account/try

org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.RuntimeException: 模拟异常
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-5.3.6.jar:5.3.6]
	//....
Caused by: java.lang.RuntimeException: 模拟异常
	at cn.dmego.seata.tcc.account.service.impl.AccountServiceImpl.accountTry(AccountServiceImpl.java:38) ~[classes/:na]
	//......

2024-04-18 15:50:17.868  INFO 22372 --- [h_RMROLE_1_2_48] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=172.29.16.1:8091:6927048120732393508,branchId=6927048120732393511,branchType=TCC,resourceId=accountService,applicationData={"actionContext":{"action-start-time":1713426617842,"useTCCFence":false,"payAmount":10,"productId":1,"sys::prepare":"accountTry","orderId":1607974135005216,"count":2,"sys::rollback":"accountCancel","sys::commit":"accountConfirm","host-name":"172.29.16.1","userId":1,"actionName":"accountService","price":10}}
2024-04-18 15:50:17.868  INFO 22372 --- [h_RMROLE_1_2_48] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.29.16.1:8091:6927048120732393508 6927048120732393511 accountService
2024-04-18 15:50:17.869  INFO 22372 --- [h_RMROLE_1_2_48] c.d.s.t.a.s.impl.AccountServiceImpl      : [accountCancel]: 当前 XID:172.29.16.1:8091:6927048120732393508, branchId:6927048120732393511, 用户:1, 金额:10
2024-04-18 15:50:17.869  INFO 22372 --- [h_RMROLE_1_2_48] io.seata.rm.AbstractResourceManager      : TCC resource rollback result : true, xid: 172.29.16.1:8091:6927048120732393508, branchId: 6927048120732393511, resourceId: accountService
2024-04-18 15:50:17.869  INFO 22372 --- [h_RMROLE_1_2_48] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

可以看到这里的回滚操作是accountCancel方法,按照XID和branchId进行回滚,执行accountCancel方法。

整体来说和2PC有点类似的,不过回滚逻辑需要业务代码自己实现。

在TCC 中,事务的执行是分阶段的,首先尝试执行事务(Try),然后确认事务(Confirm),最后根据需要进行补偿操作(Cancel)。

TCC更具有灵活性和可扩展性,但需要开发者实现 Try、Confirm 和 Cancel 三个阶段的业务逻辑。

四. 参考资料

  1. https://seata.apache.org/zh-cn/docs/overview/what-is-seata
  2. https://github.com/apache/incubator-seata-samples.git
  3. https://github.com/dmego/springcloud-seata-demo.git