groupcache 源码系列六 Group

懒皮 · · 561 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

groupcache 源码系列四 Sink ByteView里那个简单的例子中(后文简称第四节的例子),有一个Get方法,这被定义在Getter接口中:

if err := stringGroup.Get(nil, k, groupcache.AllocatingByteSliceSink(&dest)); err != nil {
    rw.WriteHeader(http.StatusNotFound)
    rw.Write([]byte("this key doesn't exists"))
} else {
    rw.Write([]byte(dest))
}

// A Getter loads data for a key.
type Getter interface {
    // Get returns the value identified by key, populating dest.
    //
    // The returned data must be unversioned. That is, key must
    // uniquely describe the loaded data, without an implicit
    // current time, and without relying on cache expiration
    // mechanisms.
    Get(ctx Context, key string, dest Sink) error
}
1.缓存中找不到时,要怎么做的Getter

第四节例子中,出现这样的代码:

    stringGroup := groupcache.NewGroup(name, 1<<20,
        groupcache.GetterFunc(func(_ groupcache.Context, key string, dest groupcache.Sink) error {

在源码中:

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
    return newGroup(name, cacheBytes, getter, nil)
}

// A GetterFunc implements Getter with a function.
type GetterFunc func(ctx Context, key string, dest Sink) error

func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
    return f(ctx, key, dest)
}

NewGroup最后一个参数要求是Getter 类型,只要实现Get(ctx Context, key string, dest Sink) error即可。然后GetterFunc允许我们传入一个方法,就能自动转化成一个Getter这样的类型,参数也是一致的。是不是熟悉的味道,在Golang http.Handler接口也有这种写法:

func main() {
    db := database{"shoes": 50, "socks": 5}
    mux := http.NewServeMux()
    mux.Handle("/list", http.HandlerFunc(db.list))
    mux.Handle("/price", http.HandlerFunc(db.price))
    log.Fatal(http.ListenAndServe("localhost:8000", mux))
}

type database map[string]dollars
func (db database) list(w http.ResponseWriter, req *http.Request) {
    for item, price := range db {
        fmt.Fprintf(w, "%s: %s\n", item, price)
    }
}

func (db database) price(w http.ResponseWriter, req *http.Request) {
    item := req.URL.Query().Get("item")
    price, ok := db[item]
    if !ok {
        w.WriteHeader(http.StatusNotFound) // 404
        fmt.Fprintf(w, "no such item: %q\n", item)
        return
    }
    fmt.Fprintf(w, "%s\n", price)
}

mux.Handle要的是一个Handler类型:

func (mux *ServeMux) Handle(pattern string, handler Handler) {

不过传入的却是个方法:

package http

type HandlerFunc func(w ResponseWriter, r *Request)
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
    f(w, r)
}

对比一下,和GetterFunc做的事情是一样的。

2.Group
var (
    mu     sync.RWMutex
    groups = make(map[string]*Group)

    initPeerServerOnce sync.Once
    initPeerServer     func()
)

type Group struct {
    //group 名字
    name       string 
    //getter 当缓存中不存在对应数据时,使用该函数获取数据并缓存
    getter     Getter 
    peersOnce  sync.Once
    //http实现了该接口,使用 
    //func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) 
    //函数选取节点
    peers      PeerPicker 
    //缓存最大空间 byte
    cacheBytes int64 // limit for sum of mainCache and hotCache size 

    // mainCache is a cache of the keys for which this process
    // (amongst its peers) is authoritative. That is, this cache
    // contains keys which consistent hash on to this process's
    // peer number.
    //使用lru策略实现的缓存结构,也是key hash值在本地的缓存
    mainCache cache 

    // hotCache contains keys/values for which this peer is not
    // authoritative (otherwise they would be in mainCache), but
    // are popular enough to warrant mirroring in this process to
    // avoid going over the network to fetch from a peer.  Having
    // a hotCache avoids network hotspotting, where a peer's
    // network card could become the bottleneck on a popular key.
    // This cache is used sparingly to maximize the total number
    // of key/value pairs that can be stored globally.
    //使用lru策略实现的缓存结构,key hash值不再本地,作为热点缓存,负载均衡
    hotCache cache 

    // loadGroup ensures that each key is only fetched once
    // (either locally or remotely), regardless of the number of
    // concurrent callers.
    //使用该结构保证当缓存中不存在key对应的数据时,只有一个goroutine 
    //调用getter函数取数据,其他正在并发的goroutine会等待直到
    //第一个goroutine返回数据,然后大家一起返回数据
    loadGroup flightGroup 

    // Stats are statistics on the group.
    Stats Stats //统计信息
}

//创建group
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
    return newGroup(name, cacheBytes, getter, nil)
}

func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
    if getter == nil {
        panic("nil Getter")
    }
    mu.Lock()
    defer mu.Unlock()
    initPeerServerOnce.Do(callInitPeerServer)
    if _, dup := groups[name]; dup {
        panic("duplicate registration of group " + name)
    }
    g := &Group{
    //maincache、hotcache、peerPick都是在函数调用过程中赋值或初始化的
        name:       name,  
        getter:     getter,
        peers:      peers,
        cacheBytes: cacheBytes,
        loadGroup:  &singleflight.Group{},
    }
    if fn := newGroupHook; fn != nil {
        fn(g) //此处函数空
    }
    groups[name] = g //保存创建的group
    return g
}

newGroupHook和initPeerServer在初始化时,可以自定义一下。其它一些简单方法也忽略不说了。关于cache,可以参考groupcache 源码系列三 LRU

由上面的结构体我们可以看出来,groupcache支持namespace概念,不同的namespace有自己的配额以及cache,不同group之间cache是独立的。也就是不能存在某个group的行为影响到另外一个namespace的情况。groupcache的每个节点的cache分为2层,由本节点直接访问后端的,存在maincache,其他存在在hotcache。

3.查找数据
//group查找
func (g *Group) Get(ctx Context, key string, dest Sink) error {
    g.peersOnce.Do(g.initPeers) //把httppool赋值给 groupcache.PeerPicker
    g.Stats.Gets.Add(1) //统计信息
    if dest == nil {
        return errors.New("groupcache: nil dest Sink")
    }
    value, cacheHit := g.lookupCache(key) //从maincache、hotcache查找

    if cacheHit {
        g.Stats.CacheHits.Add(1)
        return setSinkView(dest, value)
    }

    // Optimization to avoid double unmarshalling or copying: keep
    // track of whether the dest was already populated. One caller
    // (if local) will set this; the losers will not. The common
    // case will likely be one caller.
    destPopulated := false
    //从对等节点或自定义查找逻辑(getter)中获取数据
    value, destPopulated, err := g.load(ctx, key, dest) 
    if err != nil {
        return err
    }
    if destPopulated {
        return nil
    }
    return setSinkView(dest, value) //把数据设置给sink
}

//从maincache、hotcache查找,cache底层使用链表实现并使用lru策略修改链表
func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
    if g.cacheBytes <= 0 {
        return
    }
    value, ok = g.mainCache.get(key)
    if ok {
        return
    }
    value, ok = g.hotCache.get(key)
    return
}

//从对等节点或自定义查找逻辑(getter)中获取数据
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
    g.Stats.Loads.Add(1)
    //此函数使用flightGroup执行策略,保证只有一个goroutine 调用getter函数取数据
    viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { 
        // Check the cache again because singleflight can only dedup calls
        // that overlap concurrently.  It's possible for 2 concurrent
        // requests to miss the cache, resulting in 2 load() calls.  An
        // unfortunate goroutine scheduling would result in this callback
        // being run twice, serially.  If we don't check the cache again,
        // cache.nbytes would be incremented below even though there will
        // be only one entry for this key.
        //
        // Consider the following serialized event ordering for two
        // goroutines in which this callback gets called twice for hte
        // same key:
        // 1: Get("key") //展示了一个有可能2个以上的goroutine同时执行进入了load,
        //这样会导致同一个key对应的数据被多次获取并统计,所以又执行了一次g.lookupCache(key)
        // 2: Get("key")
        // 1: lookupCache("key")
        // 2: lookupCache("key")
        // 1: load("key")
        // 2: load("key")
        // 1: loadGroup.Do("key", fn)
        // 1: fn()
        // 2: loadGroup.Do("key", fn)
        // 2: fn()
        if value, cacheHit := g.lookupCache(key); cacheHit {
            g.Stats.CacheHits.Add(1)
            return value, nil
        }
        g.Stats.LoadsDeduped.Add(1)
        var value ByteView
        var err error
        //通过一致性hash获取对等节点,与httppool对应
        if peer, ok := g.peers.PickPeer(key); ok { 
        //构造protobuf数据,向其他节点发起http请求,查找数据,并存储到hotcache
            value, err = g.getFromPeer(ctx, peer, key) 
            if err == nil {
                g.Stats.PeerLoads.Add(1)
                return value, nil
            }
            g.Stats.PeerErrors.Add(1)
            // TODO(bradfitz): log the peer's error? keep
            // log of the past few for /groupcachez?  It's
            // probably boring (normal task movement), so not
            // worth logging I imagine.
        }
        //调用getter函数获取数据,并存储到maincache
        value, err = g.getLocally(ctx, key, dest) 
        if err != nil {
            g.Stats.LocalLoadErrs.Add(1)
            return nil, err
        }
        g.Stats.LocalLoads.Add(1)
        destPopulated = true // only one caller of load gets this return value
        g.populateCache(key, value, &g.mainCache)
        return value, nil
    })
    if err == nil {
        value = viewi.(ByteView)
    }
    return
}


func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
    err := g.getter.Get(ctx, key, dest)
    if err != nil {
        return ByteView{}, err
    }
    return dest.view()
}

这里可以防止缓存击穿,可以参考groupcache 源码系列二 sigleflight


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:懒皮

查看原文:groupcache 源码系列六 Group

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

561 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传