【生产BUG】一个生产Bug到GORM框架分析

ltp5343 · · 592 次点击 · · 开始浏览    

目录:

1.Bug背景介绍

2.Bug分析过程

   2.1. 百度搜索解决方案

   2.2. 根据关键错误信息分析源代码:database/sql(Golang版本:1.14.3)

   2.3. 分析gorm的Begin/Commit/Rollback/Find源代码(Golang版本:1.14.3)

   2.4. 分析业务代码调用Gorm的逻辑关系

3.Gorm/database/mysql架构分析

  3.1. Gorm/database/mysql分层架构

  3.2. Gorm/database/mysql分层架构与DDD分层架构思想对比

 

1. Bug背景介绍

BUG:线上服务micro-supplier-service的资料(案例、产品、基础信息、品牌、企业风采等)带数据库事务的接口返回如下错误时,

            相关的其他查询接口也会返回如下错误信息,直接导致对应的功能无法使用。 案例保存接口返回如下错误:

```
     {"success":false,"message":"sql: transaction has already been committed or rolled back","error_code":2,"data":null}
```

 

影响范围:

2. Bug分析过程

2.1. 百度搜索解决方案

     a)搜索错误信息:“sql: transaction has already been committed or rolled back”,给出了一些解决方案:https://studygolang.com/articles/12566 。

     b)验证方法:增加接口响应时长,测试grpc中断造成的影响。

          1)在案例保存接口进入事务后,增加测试代码(sleep(5));

          2)使用grpc工具测试,命令未执行完成之前中断命令执行;

          3)检查接口返回错误:

          关键错误信息:“response err: rpc error: code = Unknown desc = context canceled, details: [{\"code\":20010000,\"message\":\"context canceled\"}]"} “

```
    {"level":"info","app":"micro-supplier-service","grpc.metadata":{":authority":["127.0.0.1:31003"],"authorization":["bearer 9404f3f08af62110f30f3bdfc628288eedb71cc814b1461f04cc741b3558d992.{\"app_id\":\"micro-supplier-service\",\"time_stamp\":123456789}"],"content-type":["application/grpc"],"stark-service-name":["micro-supplier-service"],"stark-service-type":["1"],"stark-service-version":["4.0.3"],"user-agent":["grpcurl/dev-build (no version set) grpc-go/1.33.1"]},"span.id":"","trace.id":"8de9a6c9ad4a1748ed5599724acd5bb8","transaction.id":"8de9a6c9ad4a1748","time":"2021-09-07 19:57:52.260","tag":"grpc.track","msg":"access response, grpc method: /micro_supplier.SupplierProjectService/SaveProject, response err: rpc error: code = Unknown desc = context canceled, details: [{\"code\":20010000,\"message\":\"context canceled\"}]"} 
```

2.2. 根据关键错误信息分析源代码:database/sql(Golang版本:1.14.3)

       a)go/src/database/sql/sql.go的2017行:

              关键变量:var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")

```
	// ErrTxDone is returned by any operation that is performed on a transaction
	// that has already been committed or rolled back.
	var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")    
```

       

       b)这个“ErrTxDone”在结构(type Tx struct)的三个方法中用到:

             1)Commit方法(go/src/database/sql/sql.go文件2078行)

              关键代码段:if atomic.LoadInt32(&tx.done) == 1 { return ErrTxDone }

```
	// Commit commits the transaction.
	func (tx *Tx) Commit() error {
		// Check context first to avoid transaction leak.
		// If put it behind tx.done CompareAndSwap statement, we can't ensure
		// the consistency between tx.done and the real COMMIT operation.
		select {
		default:
		case <-tx.ctx.Done():
			if atomic.LoadInt32(&tx.done) == 1 {
				return ErrTxDone
			}
			return tx.ctx.Err()
		}
		if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
			return ErrTxDone
		}
		var err error
		withLock(tx.dc, func() {
			err = tx.txi.Commit()
		})
		if err != driver.ErrBadConn {
			tx.closePrepared()
		}
		tx.close(err)
		return err
	}
```

 

              2)Rollback方法(go/src/database/sql/sql.go文件2108行)

              关键代码行:if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) { return ErrTxDone }

```
	// rollback aborts the transaction and optionally forces the pool to discard
	// the connection.
	func (tx *Tx) rollback(discardConn bool) error {
		if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
			return ErrTxDone
		}
		var err error
		withLock(tx.dc, func() {
			err = tx.txi.Rollback()
		})
		if err != driver.ErrBadConn {
			tx.closePrepared()
		}
		if discardConn {
			err = driver.ErrBadConn
		}
		tx.close(err)
		return err
	}

	// Rollback aborts the transaction.
	func (tx *Tx) Rollback() error {
		return tx.rollback(false)
	}
```

              3)grabConn方法(go/src/database/sql/sql.go文件2048行)

              关键代码:if tx.isDone() { tx.closemu.RUnlock() return nil, nil, ErrTxDone }

```
	func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
		select {
		default:
		case <-ctx.Done():
			return nil, nil, ctx.Err()
		}

		// closemu.RLock must come before the check for isDone to prevent the Tx from
		// closing while a query is executing.
		tx.closemu.RLock()
		if tx.isDone() {
			tx.closemu.RUnlock()
			return nil, nil, ErrTxDone
		}
		if hookTxGrabConn != nil { // test hook
			hookTxGrabConn()
		}
		return tx.dc, tx.closemuRUnlockRelease, nil
	}

	...
	// QueryContext executes a query that returns rows, typically a SELECT.
	func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
		dc, release, err := tx.grabConn(ctx)
		if err != nil {
			return nil, err
		}

		return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args)
	}

	// Query executes a query that returns rows, typically a SELECT.
	func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) {
		return tx.QueryContext(context.Background(), query, args...)
	}
```

 

         分析过程:

         1. 在database/sql目录下搜索关键词“transaction has already been committed or rolled back”,发现了结构体的三个方法(commit,rollback,Query方法都会返回该错误)。

         2. Commit方法:事务开始之后,重复提交两次以上commit或被回滚的事物被commit,第二次就会报错:“transaction has already been committed or rolled back”;

         3. Rollback方法:事务开始之后,被提交的事务被rollback或重复回滚两次以上rollback,第二次就会报错:“transaction has already been committed or rolled back”;

         4. Query方法:事务开始之后,被提交commit或被回滚rollback的事务连接上进行数据(增、删、改、查)都会报错:“transaction has already been committed or rolled back”;

 

         总结:

          1. 保存案例时发生错误:“transaction has already been committed or rolled back”,从源代码分析来看,同一个事务被提交两次或被回滚两次或提交一次回滚一次或回滚一次提交一次。

          2. 查询操作报错误:“transaction has already been committed or rolled back”,从源代分析来看,应该是使用了被提交或被回滚的事务连接进行查询操作导致。

 

2.3. 分析gorm的Begin/Commit/Rollback/Find源代码(Golang版本:1.14.3)

       1)Begin开启事务,vendor/github.com/jinzhu/gorm/main.go +554

       关键代码段:tx, err := db.BeginTx(ctx, opts)

```
	// Begin begins a transaction
	func (s *DB) Begin() *DB {
		return s.BeginTx(context.Background(), &sql.TxOptions{})
	}

	// BeginTx begins a transaction with options
	func (s *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) *DB {
		c := s.clone()
		if db, ok := c.db.(sqlDb); ok && db != nil {
			tx, err := db.BeginTx(ctx, opts)
			c.db = interface{}(tx).(SQLCommon)

			c.dialect.SetDB(c.db)
			c.AddError(err)
		} else {
			c.AddError(ErrCantStartTransaction)
		}
		return c
	}
```

       2)Commit提交事务,vendor/github.com/jinzhu/gorm/main.go +574

       关键代码段:s.AddError(db.Commit())

```
	// Commit commit a transaction
	func (s *DB) Commit() *DB {
		var emptySQLTx *sql.Tx
		if db, ok := s.db.(sqlTx); ok && db != nil && db != emptySQLTx {
			s.AddError(db.Commit())
		} else {
			s.AddError(ErrInvalidTransaction)
		}
		return s
	}
```

       3)Rollback回滚事务,vendor/github.com/jinzhu/gorm/main.go +585

       关键代码段:if err := db.Rollback()

```
	// Rollback rollback a transaction
	func (s *DB) Rollback() *DB {
		var emptySQLTx *sql.Tx
		if db, ok := s.db.(sqlTx); ok && db != nil && db != emptySQLTx {
			if err := db.Rollback(); err != nil && err != sql.ErrTxDone {
				s.AddError(err)
			}
		} else {
			s.AddError(ErrInvalidTransaction)
		}
		return s
	}
```

 

       4)Find查找操作,vendor/github.com/jinzhu/gorm/main.go +353

```
	// Find find records that match given conditions
	func (s *DB) Find(out interface{}, where ...interface{}) *DB {
		return s.NewScope(out).inlineCondition(where...).callCallbacks(s.parent.callbacks.queries).db
	}
```

 

       分析:

       1. Begin/Commit/Rollback都是直接调用了golang标准库的database/sql的接口完成相应的动作;

       2. Find使用Gorm的Scope来限制查询范围,使用callCallbacks调用执行提前注册的回调方法,在回调方法里会进行sql语句的拼装,最后调用database/sql的Query查询接口完成查询操作。

           备注:注册的回调查询方法参考代码:vendor/github.com/jinzhu/gorm/callback_query.go +11

2.4. 分析业务代码调用Gorm的逻辑关系

          a)业务逻辑代码分析,案例保存分析:

                1)供应商服务启动时,注册的Grpc方法依赖的server、service、repository层的struct以及db实例理论上都是单例(在整个程序运行过程中只会有一个实例运行)。

                2)在案例保存过程中,在server层重获取了一个db实例,调用通用的事务处理方法process()完成事务的开始/提交/回滚。 

```
	server层面调用process逻辑:
	saveParams := &bo.SupplierProjectSaveBo{
		SupplierProject: projectModel,
		Files:           req.Project.Files,
		DetailInfo:      req.DetailInfo,
		OperatorUid:     req.OperatorUid,
		OperatorName:    req.OperatorName,
		OperatorType:    int(req.OperatorType),
	}
	isSuccess, err := impl.Process(ctx, stark.MySQLConn.GetClient(), s.projectService, saveParams)
	if err != nil {
		return &response, ykerrcode.TogRPCError(errcode.COMMON_ERROR, err.Error())
	}

    ...
	process代码如下:
	func Process(ctx context.Context, db *gorm.DB, s contract.TransactionServiceIface, params interface{}) (bool, error) {
		db = db.Begin()
		//s.InitDb(db)   //TODO:修正bug前的逻辑。
		ctx = context.WithValue(ctx, "tx_db", db) //TODO:修正bug后的逻辑。
		result, err := s.ProcessBusiness(ctx, params)
		if err != nil {
			stark.Logger.Infof(ctx, "transaction-service-processbusiness-err-1: %s", err.Error())
			db.Rollback()
			return false, err
		}

		if !result {
			stark.Logger.Infof(ctx, "transaction-service-processbusiness-err: %s", err.Error())
			return false, db.Rollback().Error
		}

		err = db.Commit().Error
		if err != nil {
			stark.Logger.Infof(ctx, "transaction-service-err: %s", err.Error())
			return false, err
		}

		return true, nil
	}

	...
	案例的业务逻辑:
	//
	// 业务逻辑
	//
	func (s *projectService) ProcessBusiness(ctx context.Context, params interface{}) (bool, error) {
		defer func() {
			s.recoveryDb()
		}()

		p, ok := params.(*bo.SupplierProjectSaveBo)
		if !ok {
			return false, errors.New("type not ok")
		}
		return s.SupplierProjectSave(ctx, p)
	}

	...
	案例repositroy实现类:
	//
	// 案例repository实现
	//
	type mysqlUucSupplierProjectRepository struct {
		impl.CommonSetDB
	}

	//
	// 构造函数
	//
	func NewMysqlUucSupplierProjectRepository(db *gorm.DB) project_repo.UucSupplierProjectRepositoryIface {
		return &mysqlUucSupplierProjectRepository{
			CommonSetDB: impl.NewCommonSetDb(db),
		}
	}

	//
	// 新增案例
	//
	func (r *mysqlUucSupplierProjectRepository) InsertByModel(ctx context.Context, m *model.UucSupplierProject) (*model.UucSupplierProject, error) {
		err := r.GetDb(ctx).Save(&m).Error
		if err != nil {
			return nil, err
		}
		return m, nil
	}

	...
	每个repository实现类必须继承如下struct:
	type CommonSetDB struct {
		db       *gorm.DB
		//backupDb *gorm.DB
	}

	func NewCommonSetDb(db *gorm.DB) CommonSetDB {
		return CommonSetDB{db: db}
	}

	func (c *CommonSetDB) SetDb(db *gorm.DB) {
		//备份之前的db
		//this.backupDb = this.db

		//事务db
		c.db = db
	}

	func (c *CommonSetDB) GetDb(ctx context.Context) *gorm.DB {
        // return c.db
		db := c.db
		txDbValue := ctx.Value("tx_db")
		txDB, ok := txDbValue.(*gorm.DB)
		if ok {
			db = txDB
		}

		return db
	}

	func (c *CommonSetDB) RecoveryDb() {
		//if this.backupDb != nil {
		//	this.db = this.backupDb
		//}
	}
```

  

       分析:

      1)问题的偶发性:平时一个事务执行的时间很短时(一般50ms以下),并且无并发量的情况下,不容易出现两个事务在50ms以内同时执行的情况,所以该问题不容易暴露出来。

           两个事务执行的时序流程图如下图所示:

     

      2)根源分析:Repo层在程序启动时,默认注入了一个db实例,在有事务的需求场景,从外部传入了一个事务db替换了repo本身的db,在该场景,有两个事务同时在运行,由于repo层是有状态对象,会存在db这个对象资源竞争问题。

      3)解决方案:共享资源问题解决方案较多:

           1)可以使用锁来控制资源的并发读写问题,但是比较影响性能;

           2)区分事务repo与非事务repo,但是目前的项目改造成本较大;

           3)repo还是一个实例,在repo层面抽象一个通用repo struct,事务db通过context.Context参数传参解决db传入问题。如下代码所示:

```
	func (c *CommonSetDB) GetDb(ctx context.Context) *gorm.DB {
        // return c.db
		db := c.db
		txDbValue := ctx.Value("tx_db")
		txDB, ok := txDbValue.(*gorm.DB)
		if ok {
			db = txDB
		}

		return db
	}
```

 

3.Gorm/database/mysql架构分析

  3.1. Gorm/database/mysql分层架构

 3.2. Gorm/database/mysql分层架构与DDD分层架构思想对比,软件架构中处处存在分层+面向抽象编程思想

 


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077

592 次点击  
加入收藏 微博
4 回复  |  直到 2021-09-22 09:27:52
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传