[https://github.com/dtm-labs/dtm](https://github.com/dtm-labs/dtm) 是一款变革性的分布式事务框架,提供了傻瓜式的使用方式,极大的降低了分布式事务的使用门槛,改变了“能不用分布式事务就不用”的行业现状,优雅的解决了服务间的数据一致性问题。
## 特性
- 支持多种语言:支持Go、Java、PHP、C#、Python、Nodejs 各种语言的SDK
- 支持多种事务模式:SAGA、TCC、XA、二阶段消息(本地消息表,事务消息)
- 支持多种数据库事务:Mysql、Redis、MongoDB、Postgres、TDSQL等
- 支持多种存储引擎:Mysql(常用)、Redis(高性能)、MongoDB(规划中)
- 支持多种微服务架构:go-zero、go-kratos/kratos、polarismesh/polaris
- 支持高可用,易水平扩展
## PHP SDK 新版的主要更新
- 完整支持XA事务
- 修复事务管理器缺少 execute 方法
- 修复 GRPC 下TCC,saga,XA无法执行的bug
- 移除 DtmXaGRPCMiddleware
- 修复 BarrierId 一直叠加的bug
- 移除 XA 里面的 var_dump 方法
此次版本主要由[PandaLIU-1111](https://github.com/PandaLIU-1111)贡献,也包含了以下人员的多个PR
- [kids-return](https://github.com/kids-return)
- [huangzhhui](https://github.com/huangzhhui)
## XA 模式简介
XA是由X/Open组织提出的分布式事务的规范,XA规范主要定义了(全局)事务管理器(TM)和(局部)资源管理器(RM)之间的接口。本地的数据库如mysql在XA中扮演的是RM角色
XA一共分为两阶段:
第一阶段(prepare):即所有的参与者RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。 第二阶段 (commit/rollback):当事务管理者(TM)确认所有参与者(RM)都ready后,向所有参与者发送commit命令。
目前主流的数据库基本都支持XA事务,包括mysql、oracle、sqlserver、postgre
下面是一个成功完成的XA事物典型的时序图
<img src="https://dtm.pub/assets/xa_normal.5a0ce600.jpg" height=600/>
## XA 模式代码示例
以下展示在 Hyperf 框架中的使用方法,其它框架类似
```php
<?php
namespace App\Controller;
use App\Grpc\GrpcClient;
use DtmClient\DbTransaction\DBTransactionInterface;
use DtmClient\TransContext;
use DtmClient\XA;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\GetMapping;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Hyperf\HttpServer\Contract\RequestInterface;
use Psr\Http\Message\ResponseInterface;
#[Controller(prefix: '/xa')]
class XAController
{
private GrpcClient $grpcClient;
protected string $serviceUri = 'http://127.0.0.1:9502';
public function __construct(
private XA $xa,
protected ConfigInterface $config,
) {
$server = $this->config->get('dtm.server', '127.0.0.1');
$port = $this->config->get('dtm.port.grpc', 36790);
$hostname = $server . ':' . $port;
$this->grpcClient = new GrpcClient($hostname);
}
#[GetMapping(path: 'successCase')]
public function successCase(): string
{
$payload = ['amount' => 50];
// 开启Xa 全局事物
$gid = $this->xa->generateGid();
$this->xa->globalTransaction($gid, function () use ($payload) {
// 调用子事物接口
$respone = $this->xa->callBranch($this->serviceUri . '/xa/api/transIn', $payload);
// XA http模式下获取子事物返回结构
/* @var ResponseInterface $respone */
$respone->getBody()->getContents();
// 调用子事物接口
$payload = ['amount' => 10];
$this->xa->callBranch($this->serviceUri . '/xa/api/transOut', $payload);
});
// 通过 TransContext::getGid() 获得 全局事务ID 并返回
return TransContext::getGid();
}
#[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transIn')]
public function transIn(RequestInterface $request): array
{
$content = $request->post('amount');
$amount = $content['amount'] ?? 50;
// 模拟分布式系统下transIn方法
$this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) {
// 请使用 DBTransactionInterface 处理本地 Mysql 事物
$dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` + ? where id = 1', [$amount]);
});
return ['status' => 0, 'message' => 'ok'];
}
/**
* @param RequestInterface $request
* <a href="/user/return" title="@return">@return</a> array
*/
#[RequestMapping(methods: ["GET", "POST", "PUT"], path: 'api/transOut')]
public function transOut(RequestInterface $request): array
{
$content = $request->post('amount');
$amount = $content['amount'] ?? 10;
// 模拟分布式系统下transOut方法
$this->xa->localTransaction(function (DBTransactionInterface $dbTransaction) use ($amount) {
// 请使用 DBTransactionInterface 处理本地 Mysql 事物
$dbTransaction->xaExecute('UPDATE `order` set `amount` = `amount` - ? where id = 2', [$amount]);
});
return ['status' => 0, 'message' => 'ok'];
}
}
```
上面的代码首先注册了一个全局XA事务,然后添加了两个子事务transIn、transOut。子事务全部执行成功之后,提交给dtm。dtm收到提交的xa全局事务后,会调用所有子事务的xa commit,完成整个xa事务。
可运行的例子参考: [dtm-php/dtm-client](https://github.com/dtm-php/dtm-client)
有疑问加站长微信联系(非本文作者)