用Java轻松完成一个分布式事务TCC,自动处理空补偿、悬挂、幂等

yedf · · 1228 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

什么是TCC,TCC是Try、Confirm、Cancel三个词语的缩写,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。 ### TCC组成 TCC分为3个阶段 - Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性) - Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源 - Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。 TCC分布式事务里,有3个角色,与经典的XA分布式事务一样: - AP/应用程序,发起全局事务,定义全局事务包含哪些事务分支 - RM/资源管理器,负责分支事务各项资源的管理 - TM/事务管理器,负责协调全局事务的正确执行,包括Confirm,Cancel的执行,并处理网络异常 如果我们要进行一个类似于银行跨行转账的业务,转出(TransOut)和转入(TransIn)分别在不同的微服务里,一个成功完成的TCC事务典型的时序图如下: ![image.png](https://static.studygolang.com/211206/34f179622c3be6fd95ac0c836ca06bc2.png) ### TCC网络异常 TCC在整个全局事务的过程中,可能发生各类网络异常情况,典型的是空回滚、幂等、悬挂。下面介绍的示例,使用了dtm的子事务屏障SDK,优雅的处理了这些异常情况。在我了解的各个开源项目,当前(2021-12-01)都需要开发人员手动处理这些异常,而我们项目的系统自动处理,属于首创,极大地降低了分布式事务的开发难度。 这里有一篇文章[分布式事务的这些常见用法都有坑,来看看正确姿势](https://segmentfault.com/a/1190000041031586),详细的讲解了这类网络异常问题,当前的技术现状与问题,及我们的解决方案。 ### TCC实践 对于前面的跨行转账操作,最简单的做法是,在Try阶段调整余额,在Cancel阶段反向调整余额,Confirm阶段则空操作。这么做带来的问题是,如果A扣款成功,金额转入B失败,最后回滚,把A的余额调整为初始值。在这个过程中如果A发现自己的余额被扣减了,但是收款方B迟迟没有收到余额,那么会对A造成困扰。 更好的做法是,Try阶段冻结A转账的金额,Confirm进行实际的扣款,Cancel进行资金解冻,这样用户在任何一个阶段,看到的数据都是清晰明了的。 下面我们进行一个TCC事务的具体开发 我们的例子采用Java语言,使用的分布式事务框架为[https://github.com/yedf/dtm](https://github.com/yedf/dtm),它对分布式事务的支持非常优雅。下面来详细讲解TCC的组成 我们首先创建用户余额表,建表语句如下: ``` create table if not exists dtm_busi.user_account( id int(11) PRIMARY KEY AUTO_INCREMENT, user_id int(11) UNIQUE, balance DECIMAL(10, 2) not null default '0', trading_balance DECIMAL(10, 2) not null default '0', create_time datetime DEFAULT now(), update_time datetime DEFAULT now(), key(create_time), key(update_time) ); ``` 表中,trading_balance记录正在交易的金额。 我们先编写核心代码,冻结/解冻资金操作,会检查约束balance+trading_balance >= 0,如果约束不成立,执行失败 ``` java public void adjustTrading(Connection connection, TransReq transReq) throws Exception { String sql = "update dtm_busi.user_account set trading_balance=trading_balance+?" + " where user_id=? and trading_balance + ? + balance >= 0"; PreparedStatement preparedStatement = null; try { preparedStatement = connection.prepareStatement(sql); preparedStatement.setInt(1, transReq.getAmount()); preparedStatement.setInt(2, transReq.getUserId()); preparedStatement.setInt(3, transReq.getAmount()); if (preparedStatement.executeUpdate() > 0) { System.out.println("交易金额更新成功"); } else { throw new FailureException("交易失败"); } } finally { if (null != preparedStatement) { preparedStatement.close(); } } } ``` 然后是调整余额 ``` java public void adjustBalance(Connection connection, TransReq transReq) throws SQLException { PreparedStatement preparedStatement = null; try { String sql = "update dtm_busi.user_account set trading_balance=trading_balance-?,balance=balance+? where user_id=?"; preparedStatement = connection.prepareStatement(sql); preparedStatement.setInt(1, transReq.getAmount()); preparedStatement.setInt(2, transReq.getAmount()); preparedStatement.setInt(3, transReq.getUserId()); if (preparedStatement.executeUpdate() > 0) { System.out.println("余额更新成功"); } } finally { if (null != preparedStatement) { preparedStatement.close(); } } } ``` 下面我们来编写具体的Try/Confirm/Cancel的处理函数 ``` java @RequestMapping("barrierTransOutTry") public Object TransOutTry(HttpServletRequest request) throws Exception { BranchBarrier branchBarrier = new BranchBarrier(request.getParameterMap()); logger.info("barrierTransOutTry branchBarrier:{}", branchBarrier); TransReq transReq = extracted(request); Connection connection = dataSourceUtil.getConnecion(); branchBarrier.call(connection, (barrier) -> { System.out.println("用户: +" + transReq.getUserId() + ",转出" + Math.abs(transReq.getAmount()) + "元准备"); this.adjustTrading(connection, transReq); }); connection.close(); return TransResponse.buildTransResponse(Constant.SUCCESS_RESULT); } @RequestMapping("barrierTransOutConfirm") public Object TransOutConfirm(HttpServletRequest request) throws Exception { BranchBarrier branchBarrier = new BranchBarrier(request.getParameterMap()); logger.info("barrierTransOutConfirm branchBarrier:{}", branchBarrier); Connection connection = dataSourceUtil.getConnecion(); TransReq transReq = extracted(request); branchBarrier.call(connection, (barrier) -> { System.out.println("用户: +" + transReq.getUserId() + ",转出" + Math.abs(transReq.getAmount()) + "元提交"); adjustBalance(connection, transReq); }); connection.close(); return TransResponse.buildTransResponse(Constant.SUCCESS_RESULT); } @RequestMapping("barrierTransOutCancel") public Object TransOutCancel(HttpServletRequest request) throws Exception { BranchBarrier branchBarrier = new BranchBarrier(request.getParameterMap()); logger.info("barrierTransOutCancel branchBarrier:{}", branchBarrier); TransReq transReq = extracted(request); Connection connection = dataSourceUtil.getConnecion(); branchBarrier.call(connection, (barrier) -> { System.out.println("用户: +" + transReq.getUserId() + ",转出" + Math.abs(transReq.getAmount()) + "元回滚"); this.adjustTrading(connection, transReq); }); connection.close(); return TransResponse.buildTransResponse(Constant.SUCCESS_RESULT); } // TransIn相关函数与TransOut类似,这里省略 ``` 到此各个子事务的处理函数已经OK了,在上面的代码中,下面这几行是子事务屏障相关代码,只要按照这个方式来调用您的业务逻辑,子事务屏障保证重复请求、悬挂、空补偿情况出现时,您的业务逻辑不会被调用,保证了正常业务的正确进行 ``` BranchBarrier branchBarrier = new BranchBarrier(request.getParameterMap()); branchBarrier.call(connection, (barrier)-> { ... }); ``` 然后是开启TCC事务,进行分支调用 ``` java @RequestMapping("tccBarrier") public String tccBarrier() { // 创建dmt client DtmClient dtmClient = new DtmClient(ipPort); //创建tcc事务 try { dtmClient.tccGlobalTransaction(dtmClient.genGid(), TccTestController::tccBarrierTrans); } catch (Exception e) { log.error("tccGlobalTransaction error", e); return "fail"; } return "success"; } public static void tccBarrierTrans(Tcc tcc) throws Exception { // 用户1 转出30元 Response outResponse = tcc .callBranch(new TransReq(1, -30), svc + "/barrierTransOutTry", svc + "/barrierTransOutConfirm", svc + "/barrierTransOutCancel"); log.info("outResponse:{}", outResponse); // 用户2 转入30元 Response inResponse = tcc .callBranch(new TransReq(2, 30), svc + "/barrierTransInTry", svc + "/barrierTransInConfirm", svc + "/barrierTransInCancel"); log.info("inResponse:{}", inResponse); } ``` 至此,一个完整的TCC分布式事务编写完成。 如果您想要完整运行一个成功的示例,那么按照[dtmcli-java-sample](https://github.com/yedf/dtmcli-java-sample)项目的说明搭建好环境启动之后,运行下面命令运行tcc的例子即可 `curl http://localhost:8081/tccBarrier` ### TCC的回滚 假如银行将金额准备转出用户2时,发现用户2的账户异常,返回失败,会怎么样?我们的例子中,可用户余额为10000,发起一笔100000的转账会触发异常而失败: `curl http://localhost:8081/tccBarrierError` 这是事务失败交互的时序图 ![image.png](https://static.studygolang.com/211206/96feca182d4cc15eb0875be7d45d0426.png) 这个跟成功的TCC差别就在于,当某个子事务返回失败后,后续就回滚全局事务,调用各个子事务的Cancel操作,保证全局事务全部回滚。 上面讲述了非常符合预期的一种回滚情况,实际运行的业务,还会有很多种回滚,例如TransIn请求还没有开始处理就异常了,或者处理结束,事务已经提交后异常了。 在子事务屏障的帮助下,用户并不需要关心这些不同的异常情况,它会自动处理。您可以尝试在TransIn的处理中,在事务提交之后,抛出异常。您可以看到这种情况下最后事务是被正确回滚的,两个用户的最终余额,和转账前是一样的。 ### 小结 在这篇文章里,我们介绍了TCC的理论知识,也通过一个例子,完整给出了编写一个TCC事务的过程,涵盖了正常成功完成,以及成功回滚的情况。相信读者通过这边文章,对TCC已经有了深入的理解。 关于分布式事务中需要处理的幂等、悬挂、空补偿,请参考另一篇文章:[分布式事务的这些常见用法都有坑,来看看正确姿势](https://segmentfault.com/a/1190000041031586) 关于分布式事务更多更全面的知识,请参考[分布式事务最经典的七种解决方案](https://segmentfault.com/a/1190000040321750) 文中使用的例子选自[yedf/dtmcli-java-sample](https://github.com/yedf/dtmcli-java-sample)。使用的分布式事务管理器为[https://github.com/yedf/dtm](https://github.com/yedf/dtm),支持多种事务模式:TCC、SAGA、XA、事务消息 跨语言支持,已支持 golang、python、PHP、nodejs等语言的客户端。提供子事务屏障功能,优雅解决幂等、悬挂、空补偿等问题。 阅读完此篇干货,欢迎大家访问[https://github.com/yedf/dtm](https://github.com/yedf/dtm)项目,给颗星星支持!

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1228 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传