Golang 并发 与 context标准库

Ovenvan · · 1342 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

这篇文章将:介绍context工作机制;简单说明接口和结构体功能;通过简单Demo介绍外部API创建并使用context标准库;从源码角度分析context工作流程(不包括mutex的使用分析以及timerCtx计时源码)。

context是一个很好的解决多goroutine下通知传递和元数据的Go标准库。由于Go中的goroutine之间没有父子关系,因此也不存在子进程退出后的通知机制。多个goroutine协调工作涉及 通信同步通知退出 四个方面:

通信:chan通道是各goroutine之间通信的基础。注意这里的通信主要指程序的数据通道。
同步:可以使用不带缓冲的chan;sync.WaitGroup为多个gorouting提供同步等待机制;mutex锁与读写锁机制。
通知:通知与上文通信的区别是,通知的作用为管理,控制流数据。一般的解决方法是在输入端绑定两个chan,通过select收敛处理。这个方案可以解决简单的问题,但不是一个通用的解决方案。
退出:简单的解决方案与通知类似,即增加一个单独的通道,借助chan和select的广播机制(close chan to broadcast)实现退出。

但由于Go之间的goroutine都是平等的,因此当遇到复杂的并发结构时处理退出机制则会显得力不从心。因此Go1.7版本开始提供了context标准库来解决这个问题。他提供两个功能:退出通知元数据传递。他们都可以传递给整个goroutine调用树的每一个goroutine。同时这也是一个不太复杂的,适合初学Gopher学习的一段源码。


工作机制

第一个创建Context的goroutine被称为root节点:root节点负责创建一个实现Context接口的具体对象,并将该对象作为参数传递至新拉起的goroutine作为其上下文。下游goroutine继续封装该对象并以此类推向下传递。

interface

Context接口:作为一个基本接口,所有的Context对象都要实现该接口,并将其作为使用者调度时的参数类型:

type Context interface{
    Deadline()(deadline time.Time, ok bool)  
//如果Context实现了超时控制,该方法返回 超时时间,true。否则ok为false
    Done() <-chan struct{}
//依旧使用<-chan struct{}来通知退出,供被调用的goroutine监听。
    Err() error
//当Done()返回的chan收到通知后,防卫Err()获知被取消的原因
    Value(key interface{}) interface
}

canceler接口:拓展接口,规定了取消通知的Context具体类型需要实现的接口:

type canceler interface {
    cancel(removeFromParent bool, err error)
//通知后续创建的goroutine退出
    Done() <-chan struct{}
//作者对这个Done()方法的理解是多余的
}

struct

emptyCtx:实现了一个不具备任何功能的Context接口,其存在的目的就是作为Context对象树的root节点:

type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}
func (*emptyCtx) Done() <-chan struct{} {
    return nil
}
func (*emptyCtx) Err() error {
    return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
}
//......
var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)
func Background() Context {
    return background
}
func TODO() Context {
    return todo
}
//这两者返回值是一样的,文档上建议main函数可以使用Background()创建root context

cancelCtx:可以认为他与emptyCtx最大的区别在于,具体实现了cancel函数。即他可以向子goroutine传递cancel消息。
timerCtx:另一个实现Context接口的具体类型,内部封装了cancelCtx类型实例,同时拥有deadline变量,用于实现定时退出通知。
valueCtx:实现了Context接口的具体类型,内部分装cancelCtx类型实例,同时封装了一个kv存储变量,用于传递通知消息。

API

除了root context可以使用Background()创建以外,其余的context都应该从cancelCtxtimerCtxvalueCtx中选取一个来构建具体对象:
func WithCancel(parent Context) (Context, CancelFunc):创建cancelCtx实例。
func WithDeadline(parent Context, deadline time.Time)(Context, CancelFunc)func WithTimeout(parent Context, timeout time.Duration)(Context, CancelFunc):两种方法都可以创建一个带有超时通知的Context具体对象timerCtx,具体差别在于传递绝对或相对时间。
func WithValue(parent Context, key, val interface{}) Context:创建valueCtx实例。


  1. 创建root context并构建一个WithCancel类型的上下文,使用该上下文注册一个goroutine模拟运行:
func main(){
    ctxa, cancel := context.WithCancel(context.Background())
    go work(ctxa, "work1")
}
func work(ctx context.Context, name string){
    for{
        select{
        case <-ctx.Done():
            println(name," get message to quit")
            return
        default:
            println(name," is running")
            time.Sleep(time.Second)
        }
    }
}
  1. 使用WithDeadline包装ctxa,并使用新的上下文注册另一个goroutine:
func main(){
    ctxb, _ := context.WithTimeout(ctxa, time.Second * 3)
    go work(ctxb, "work2")
}
  1. 使用WithValue包装ctxb,并注册新的goroutine:
func main(){
    ctxc := context.WithValue(ctxb, "key", "custom value")
    go workWithValue(ctxc, "work3")
}
func workWithValue(ctx context.Context, name string){
    for{
        select {
        case <-ctx.Done():
            println(name," get message to quit")
            return
        default:
            value:=ctx.Value("key").(string)
            println(name, " is running with value", value)
            time.Sleep(time.Second)
        }
    }
}
  1. 最后在main函数中手动关闭ctxa,并等待输出结果:
func main(){
    time.Sleep(5*time.Second)
    cancel()
    time.Sleep(time.Second)
}

至此我们运行程序并查看输出结果:

work1  is running
work3  is running with value custom value
work2  is running
work1  is running
work2  is running
work3  is running with value custom value
work2  is running
work3  is running with value custom value
work1  is running
//work2超时并通知work3退出
work2  get message to quit
work3  get message to quit
work1  is running
work1  is running
work1  get message to quit

可以看到,当ctxb因超时而退出之后,会通知由他包装的所有子goroutine(ctxc),并通知退出。各context的关系结构如下:

Background() -> ctxa -> ctxb -> ctxc


源码分析

我们主要研究两个问题,即各Context如何保存父类和子类上下文;以及cancel方法如何实现通知子类context实现退出功能。

context的数据结构
  1. emptyCtx只是一个uint类型的变量,其目的只是为了作为第一个goroutine ctx的parent,因此他不需要,也没法保存子类上下文结构。

  2. cancelCtx的数据结构:

type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     chan struct{}         // created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

Context接口保存的就是父类的context。children map[canceler]struct{}保存的是所有直属与这个context的子类context。done chan struct{}用于发送退出信号。
我们查看创建cancelCtx的APIfunc WithCancel(...)...

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)
    return &c, func() { c.cancel(true, Canceled) }
}
func newCancelCtx(parent Context) cancelCtx {
    return cancelCtx{Context: parent}
}

propagateCancel函数的作用是将自己注册至parent context。我们稍后会讲解这个函数。

  1. timerCtx的数据结构:
type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

timerCtx继承于cancelCtx,并为定时退出功能新增自己的数据结构。

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        // The current deadline is already sooner than the new one.
        return WithCancel(parent)
    }
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c)
//以下内容与定时退出机制有关,在本文不作过多分析和解释
    dur := time.Until(d)
    if dur <= 0 {
        c.cancel(true, DeadlineExceeded) // deadline has already passed
        return c, func() { c.cancel(true, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}
func newCancelCtx(parent Context) cancelCtx {
    return cancelCtx{Context: parent}
}

timerCtx查看parent context的方法是timerCtx.cancelCtx.Context

  1. valueCtx的数据结构:
type valueCtx struct {
    Context
    key, val interface{}
}

相较于timerCtx而言非常简单,没有继承于cancelCtx struct,而是直接继承于Context接口。

func WithValue(parent Context, key, val interface{}) Context {
    if key == nil {
        panic("nil key")
    }
    if !reflect.TypeOf(key).Comparable() {
        panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}

辅助函数

这里我们会有两个疑问,第一,valueCtx为什么没有propagateCancel函数向parent context注册自己。既然没有注册,为何ctxb超时后能通知ctxc一起退出。第二,valueCtx是如何存储children和parent context结构的。相较于同样绑定Context接口的cancelCtx,valueCtx并没有children数据。
第二个问题能解决一半第一个问题,即为何不向parent context注册。先说结论:valueCtx的children context注册在valueCtx的parent context上。函数func propagateCancel(...)负责注册信息,我们先看一下他的构造:

func propagateCancel

func propagateCancel(parent Context, child canceler) {
    if parent.Done() == nil {
        return // parent is never canceled
    }
    if p, ok := parentCancelCtx(parent); ok {
        p.mu.Lock()
        if p.err != nil {
            // parent has already been canceled
            child.cancel(false, p.err)
        } else {
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            p.children[child] = struct{}{}
        }
        p.mu.Unlock()
    } else {
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

这个函数的主要逻辑如下:接收parent context 和 child canceler方法,若parent为emptyCtx,则不注册;否则通过funcparentCancelCtx寻找最近的一个*cancelCtx;若该cancelCtx已经结束,则调用child的cancel方法,否则向该cancelCtx注册child。

func parentCancelCtx

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    for {
        switch c := parent.(type) {
        case *cancelCtx:
            return c, true
        case *timerCtx:
            return &c.cancelCtx, true
        case *valueCtx:
            parent = c.Context
        default:
            return nil, false
        }
    }
}

func parentCancelCtx从parentCtx中向上迭代寻找第一个*cancelCtx并返回。从函数逻辑中可以看到,只有当parent.(type)为*valueCtx的时候,parent才会向上迭代而不是立即返回。否则该函数都是直接返回或返回经过包装的*cancelCtx。因此我们可以发现,valueCtx是依赖于parentCtx的*cancelCtx结构的。

至于第二个问题,事实上,parentCtx根本无需,也没有办法通过Done()方法通知valueCtx,valueCtx也没有额外实现Done()方法。可以理解为:valueCtx与parentCtx公用一个done channel,当parentCtx调用了cancel方法并关闭了done channel时,监听valueCtx的done channel的goroutine同样会收到退出信号。另外,当parentCtx没有实现cancel方法(如emptyCtx)时,可以认为valueCtx也是无法cancel的。

func (c *cancelCtx) cancel

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
        panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
        c.mu.Unlock()
        return // already canceled
    }
    c.err = err
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done)
    }
    for child := range c.children {
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
        removeChild(c.Context, c)
    }
}

该方法的主要逻辑如下:若外部err为空,则代表这是一个非法的cancel操作,抛出panic;若cancelCtx内部err不为空,说明该Ctx已经执行过cancel操作,直接返回;关闭done channel,关联该Ctx的goroutine收到退出通知;遍历children,若有的话,执行child.cancel操作;调用removeChild将自己从parent context中移除。

func (c *timerCtx) cancel

与cancelCtx十分类似,不作过多阐述。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:Ovenvan

查看原文:Golang 并发 与 context标准库

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1342 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传