一、应用场景
二、使用示例
RPC超时控制
Context传递LogID
三、使用原则
四、源码实现
4.1 思考
4.2 标准实现
4.2.1 cancelCtx
4.2.2 timerCtx
4.2.3 valueCtx
一、应用场景
- 链路跟踪,业务需要传递上下游元信息;
- 主协程和子协程同步信号,减少计算资源的浪费。例如rpc调用时的超时控制。
二、使用示例
RPC超时控制
// RPCTimeoutMW .
func RPCTimeoutMW(next endpoint.EndPoint) endpoint.EndPoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
rpcInfo := GetRPCInfo(ctx)
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, time.Duration(rpcInfo.RPCTimeout)*time.Millisecond) //1
defer cancel()
var resp interface{}
var err error
done := make(chan error, 1)
go func() {
defer func() {
if err := recover(); err != nil {
...
done <- fmt.Errorf("KITC: panic, %v\n%s", err, buf)
}
close(done)
}()
resp, err = next(ctx, request) //2
}()
select {
case panicErr := <-done:
if panicErr != nil {
panic(panicErr.Error()) // throws panic error
}
return resp, err
case <-ctx.Done(): //3
return nil, makeTimeoutErr(ctx, rpcInfo, start)
}
}
}
Context传递LogID
//每次rpc调用时,会将logID传递下去。
if logID, ok := kitutil.GetCtxLogID(ctx); !ok || logID == "" {
logID = logid.GenLogID()
ctx = kitutil.NewCtxWithLogID(ctx, logID)
}
三、使用原则
- Context 应该随 Request 消亡而消亡,不要把Context放在结构体中,要以参数的方式传递。
- 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。
- 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
- Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递
- Context是线程安全的,可以放心的在多个goroutine中传递
四、源码实现
4.1 思考
首先考虑自己来实现context的上下文控制和信息传递时,可能如下:
type Context struct {
lock //并发安全
C chan int //信号控制
Values map[string]interface{} //数据
}
因为父节点需要控制子节点,所以
type Context struct {
lock //并发安全
C chan int //信号控制
Values map[string]interface{} //数据
child []*Context
}
然后新建ctx时,将子节点挂在父节点上,同时提供ctx.cancel,负责close(C)。
核心:子节点挂在父节点,父节点取消时传递到子节点。
4.2 标准实现
Context接口
type Context interface {
Deadline() (deadline time.Time, ok bool)//获取是否设置了到期时间以及所设置的截止时间。
Done() <-chan struct{} //返回一个通道,如果通道关闭则代表该Context已经被取消;如果返回的为nil,则代表该Context是一个永远不会被取消的Context。
Err() error //返回该Context被取消的原因
Value(key interface{}) interface{} //
}
emptyContext
- emptyCtx是一个空的context,可以作为context树的跟节点;
- emptyCtx不会被取消,没有值,也没有超时时间;
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
4.2.1 cancelCtx
cancelCtx是真正具有取消功能的Context类型:
- 包含了一个Context类型的值,存储了当前cancelCtx的父Context的指针。
- done作为取消信号的通道,子协程监听该通道了解到是否需要取消任务
- children存储了当前Context衍生的所有可取消类型的子Context
- err会被第一次取消的时候设置
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
新建cancelCtx
- newCancelCtx标记父节点;
- propagateCancel是传递父子关系的关键;
- 返回cancel函数,外界可以结束该ctx;
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)// 保证父节点被取消时,子节点可以被取消。
return &c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled 不需要,因为父节点不会被取消,也就不用传递取消
}
//parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. parentCancelCtx会找到上一个可以被取消的父节点,然后挂上去;
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{} //挂上去!
}
p.mu.Unlock()
} else { //监听父节点的信号,因为没有找到一个可以挂上去的,不会被集联通知,只能自己监听。
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
- 大部分的子节点都不需要监听父节点,因为挂在了父节点中,等待集联通知即可;
- Done chan是懒加载;
取消cancelCtx
为啥父节点取消时,可以把子节点集联取消呢? 遍历,深度优先搜索
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled 可以重复取消
}
c.err = err
if c.done == nil {
c.done = closedchan //都复用一个closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
4.2.2 timerCtx
- timerCtx 内部仍然使用cancelCtx实现取消;
- 新增一个定时器Timer定时调用cancle函数实现该功能(WithTimeOut将当前时间+超时时间计算得到绝对时间后使用WithDeadLine实现)。
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
新建timerCtx
关键是新建了一个timer,定时cancel;
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent) //子节点比父节点还要迟,直接是一个cancelCtx
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c) //挂上
dur := time.Until(d)
if dur <= 0 { //
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
4.2.3 valueCtx
- valueCtx 在context的基础上,新增了一个k-v对;
- value获取值会递归父节点,在这条链表上实现了一个map;O(n)
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
有疑问加站长微信联系(非本文作者)