DTM PHP SDK 0.2.3发布,支持XA事务

yedf · 2022-08-23 11:12:34 · 1243 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2022-08-23 11:12:34 的主题,其中的信息可能已经有所发展或是发生改变。

https://github.com/dtm-labs/dtm 是一款变革性的分布式事务框架,提供了傻瓜式的使用方式,极大的降低了分布式事务的使用门槛,改变了“能不用分布式事务就不用”的行业现状,优雅的解决了服务间的数据一致性问题。

特性

  • 支持多种语言:支持Go、Java、PHP、C#、Python、Nodejs 各种语言的SDK
  • 支持多种事务模式:SAGA、TCC、XA、二阶段消息(本地消息表,事务消息)
  • 支持多种数据库事务:Mysql、Redis、MongoDB、Postgres、TDSQL等
  • 支持多种存储引擎:Mysql(常用)、Redis(高性能)、MongoDB(规划中)
  • 支持多种微服务架构:go-zero、go-kratos/kratos、polarismesh/polaris
  • 支持高可用,易水平扩展

PHP SDK 新版的主要更新

  • 完整支持XA事务
  • 修复事务管理器缺少 execute 方法
  • 修复 GRPC 下TCC,saga,XA无法执行的bug
  • 移除 DtmXaGRPCMiddleware
  • 修复 BarrierId 一直叠加的bug
  • 移除 XA 里面的 var_dump 方法

此次版本主要由PandaLIU-1111贡献,也包含了以下人员的多个PR

XA 模式简介

XA是由X/Open组织提出的分布式事务的规范,XA规范主要定义了(全局)事务管理器(TM)和(局部)资源管理器(RM)之间的接口。本地的数据库如mysql在XA中扮演的是RM角色

XA一共分为两阶段:

第一阶段(prepare):即所有的参与者RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。 第二阶段 (commit/rollback):当事务管理者(TM)确认所有参与者(RM)都ready后,向所有参与者发送commit命令。

目前主流的数据库基本都支持XA事务,包括mysql、oracle、sqlserver、postgre

下面是一个成功完成的XA事物典型的时序图

XA 模式代码示例

以下展示在 Hyperf 框架中的使用方法,其它框架类似

<?php

namespace App\Controller;

use App\Grpc\GrpcClient;
use DtmClient\DbTransaction\DBTransactionInterface;
use DtmClient\TransContext;
use DtmClient\XA;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\GetMapping;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Contract\RequestInterface;
use Psr\Http\Message\ResponseInterface;

#[Controller(prefix: '/xa')]
class XAController
{

    private GrpcClient $grpcClient;

    protected string $serviceUri = 'http://127.0.0.1:9502';

    public function __construct(
        private XA $xa,
        protected ConfigInterface $config,
    ) {
        $server = $this->config->get('dtm.server', '127.0.0.1');
        $port = $this->config->get('dtm.port.grpc', 36790);
        $hostname = $server . ':' . $port;
        $this->grpcClient = new GrpcClient($hostname);
    }


    #[GetMapping(path: 'successCase')]
    public function successCase(): string
    {
        $payload = ['amount' => 50];
        // 开启Xa 全局事物
        $gid = $this->xa->generateGid();
        $this->xa->globalTransaction($gid, function () use ($payload) {
            // 调用子事物接口
            $respone = $this->xa->callBranch($this->serviceUri . '/xa/api/transIn', $payload);
            // XA http模式下获取子事物返回结构
            /* @var ResponseInterface $respone */
            $respone->getBody()->getContents();
            // 调用子事物接口
            $payload = ['amount' => 10];
            $this->xa->callBranch($this->serviceUri . '/xa/api/transOut', $payload);
        });
        // 通过 TransContext::getGid() 获得 全局事务ID 并返回
        return TransContext::getGid();
    }

    #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transIn')]
    public function transIn(RequestInterface $request): array
    {
        $content = $request->post('amount');
        $amount = $content['amount'] ?? 50;
        // 模拟分布式系统下transIn方法
        $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) {
            // 请使用 DBTransactionInterface 处理本地 Mysql 事物
            $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` + ? where id = 1', [$amount]);
        });

        return ['status' => 0, 'message' => 'ok'];
    }

    /**
     * @param RequestInterface $request
     * <a href="/user/return" title="@return">@return</a> array
     */
    #[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transOut')]
    public function transOut(RequestInterface $request): array
    {
        $content = $request->post('amount');
        $amount = $content['amount'] ?? 10;
        // 模拟分布式系统下transOut方法
        $this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) {
            // 请使用 DBTransactionInterface 处理本地 Mysql 事物
            $dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` - ? where id = 2', [$amount]);
        });

        return ['status' => 0, 'message' => 'ok'];
    }
}

上面的代码首先注册了一个全局XA事务,然后添加了两个子事务transIn、transOut。子事务全部执行成功之后,提交给dtm。dtm收到提交的xa全局事务后,会调用所有子事务的xa commit,完成整个xa事务。

可运行的例子参考: dtm-php/dtm-client


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1243 次点击  ∙  2 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传