大家可以关注我微信公众号【技术闲聊吧】
# 以发工资为例使用15行代码实现一个分布式事务
开发一个分布式事务不是一件容易的事,经验丰富的老司机也往往会踩各种坑。
今天,我们就用 [octopus](https://github.com/ikenchina/octopus)来开发一个发工资的分布式事务。
- 微信原文:https://mp.weixin.qq.com/s/B149uXNgpwLIB2xrMSkCwA
- 可以关注我微信公众号,有很多优质文章 【技术闲聊吧】,或点开上面文章,关注作者即可
- 大家赏个star吧 : https://github.com/ikenchina/octopus
首先,我们明确要开发哪些角色
- 发工资的公司:我们称之为AP,也是事务发起方
- 员工的账户银行:称之为RM,事务参与方
- 银联:称为TC,事务协调者。当然事务协调者也可以是放在发工资的公司一方。
如果使用octopus,TC不需要开发,octopus已经集成了,只需要部署就可以了。
那我们来看看如何开发AP和RM代码。
我们使用SAGA类型的分布式事务来发工资,保证事务的最终一致性。
## 开发AP
我们只看核心代码,具体代码参考[demo](https://github.com/ikenchina/octopus/tree/master/demo)。
遍历所有员工,将给每个每个员工账户转账作为一个子事务,添加到分布式事务中。
```
func (app *SagaApplication) Pay(ctx context.Context, users []*saga_rm.BankAccountRecord) (*pb.SagaResponse, error) {
resp, err := saga_cli.SagaTransaction(ctx, app.tcRpcClient, time.Now().Add(1*time.Minute),
func(t *saga_cli.Transaction, gtid string) error {
for i, user := range users {
commit := "/bankservice.SagaBankService/In" // 银行gRPC服务的commit接口名
compensation := "/bankservice.SagaBankService/Out" // 银行gRPC服务的compensation接口名
t.AddGrpcBranch(i + 1, app.getUserBank(user.UserID).Address, commit, compensation, user.ToPb(), saga_cli.WithMaxRetry(1)) // 添加一个子事务
}
return nil
})
return resp, err
}
```
## 开发RM
RM需要实现两个接口, commit和compensation,分别代表提交和回滚(SAGA里称为补偿)。
**提交接口**
开发者不用考虑事务相关的逻辑(提交,回滚,幂等,异常处理等),只需要调用HandleCommit接口来实现业务逻辑即可。
```
func (rm *SagaRmBankGrpcService) In(ctx context.Context, in *pb.SagaRequest) (*pb.SagaResponse, error) {
gtid, bid := cgrpc.ParseContextMeta(ctx)
err := sagarm.HandleCommit(ctx, rm.db, gtid, bid, func(tx *sql.Tx) error {
rx, err := tx.Exec("UPDATE account SET balance=balance+$1 WHERE id=$2", in.GetAccount(), in.GetUserId())
return rm.handleErr(rx, err)
})
return &pb.SagaResponse{}, err
}
```
**补偿接口**
开发者不用考虑事务相关的逻辑(提交,回滚,幂等,异常处理等),只需要调用HandleCompensation接口来实现业务逻辑即可。
```
func (rm *SagaRmBankGrpcService) Out(ctx context.Context, in *pb.SagaRequest) (*pb.SagaResponse, error) {
gtid, bid := cgrpc.ParseContextMeta(ctx)
err := sagarm.HandleCompensation(ctx, rm.db, gtid, bid, func(tx *sql.Tx) error {
rx, err := tx.Exec("UPDATE account SET balance=balance+$1 WHERE id=$2", -1 * in.GetAccount(), in.GetUserId())
return rm.handleErr(rx, err)
})
return &pb.SagaResponse{}, err
}
```
可以看到8行代码就实现了提交和补偿接口,还是非常简单的。
我们只需要部署好TC,参考[部署文档](https://github.com/ikenchina/octopus/blob/master/doc/README_deployment.md), 然后运行AP,则可以提交一个发工资的分布式事务。
有疑问加站长微信联系(非本文作者))