在groupcache 源码系列四 Sink ByteView里那个简单的例子中(后文简称第四节的例子),有一个Get方法,这被定义在Getter接口中:
if err := stringGroup.Get(nil, k, groupcache.AllocatingByteSliceSink(&dest)); err != nil {
rw.WriteHeader(http.StatusNotFound)
rw.Write([]byte("this key doesn't exists"))
} else {
rw.Write([]byte(dest))
}
// A Getter loads data for a key.
type Getter interface {
// Get returns the value identified by key, populating dest.
//
// The returned data must be unversioned. That is, key must
// uniquely describe the loaded data, without an implicit
// current time, and without relying on cache expiration
// mechanisms.
Get(ctx Context, key string, dest Sink) error
}
1.缓存中找不到时,要怎么做的Getter
第四节例子中,出现这样的代码:
stringGroup := groupcache.NewGroup(name, 1<<20,
groupcache.GetterFunc(func(_ groupcache.Context, key string, dest groupcache.Sink) error {
在源码中:
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
return newGroup(name, cacheBytes, getter, nil)
}
// A GetterFunc implements Getter with a function.
type GetterFunc func(ctx Context, key string, dest Sink) error
func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
return f(ctx, key, dest)
}
NewGroup最后一个参数要求是Getter 类型,只要实现Get(ctx Context, key string, dest Sink) error即可。然后GetterFunc允许我们传入一个方法,就能自动转化成一个Getter这样的类型,参数也是一致的。是不是熟悉的味道,在Golang http.Handler接口也有这种写法:
func main() {
db := database{"shoes": 50, "socks": 5}
mux := http.NewServeMux()
mux.Handle("/list", http.HandlerFunc(db.list))
mux.Handle("/price", http.HandlerFunc(db.price))
log.Fatal(http.ListenAndServe("localhost:8000", mux))
}
type database map[string]dollars
func (db database) list(w http.ResponseWriter, req *http.Request) {
for item, price := range db {
fmt.Fprintf(w, "%s: %s\n", item, price)
}
}
func (db database) price(w http.ResponseWriter, req *http.Request) {
item := req.URL.Query().Get("item")
price, ok := db[item]
if !ok {
w.WriteHeader(http.StatusNotFound) // 404
fmt.Fprintf(w, "no such item: %q\n", item)
return
}
fmt.Fprintf(w, "%s\n", price)
}
mux.Handle要的是一个Handler类型:
func (mux *ServeMux) Handle(pattern string, handler Handler) {
不过传入的却是个方法:
package http
type HandlerFunc func(w ResponseWriter, r *Request)
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}
对比一下,和GetterFunc做的事情是一样的。
2.Group
var (
mu sync.RWMutex
groups = make(map[string]*Group)
initPeerServerOnce sync.Once
initPeerServer func()
)
type Group struct {
//group 名字
name string
//getter 当缓存中不存在对应数据时,使用该函数获取数据并缓存
getter Getter
peersOnce sync.Once
//http实现了该接口,使用
//func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool)
//函数选取节点
peers PeerPicker
//缓存最大空间 byte
cacheBytes int64 // limit for sum of mainCache and hotCache size
// mainCache is a cache of the keys for which this process
// (amongst its peers) is authoritative. That is, this cache
// contains keys which consistent hash on to this process's
// peer number.
//使用lru策略实现的缓存结构,也是key hash值在本地的缓存
mainCache cache
// hotCache contains keys/values for which this peer is not
// authoritative (otherwise they would be in mainCache), but
// are popular enough to warrant mirroring in this process to
// avoid going over the network to fetch from a peer. Having
// a hotCache avoids network hotspotting, where a peer's
// network card could become the bottleneck on a popular key.
// This cache is used sparingly to maximize the total number
// of key/value pairs that can be stored globally.
//使用lru策略实现的缓存结构,key hash值不再本地,作为热点缓存,负载均衡
hotCache cache
// loadGroup ensures that each key is only fetched once
// (either locally or remotely), regardless of the number of
// concurrent callers.
//使用该结构保证当缓存中不存在key对应的数据时,只有一个goroutine
//调用getter函数取数据,其他正在并发的goroutine会等待直到
//第一个goroutine返回数据,然后大家一起返回数据
loadGroup flightGroup
// Stats are statistics on the group.
Stats Stats //统计信息
}
//创建group
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
return newGroup(name, cacheBytes, getter, nil)
}
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
initPeerServerOnce.Do(callInitPeerServer)
if _, dup := groups[name]; dup {
panic("duplicate registration of group " + name)
}
g := &Group{
//maincache、hotcache、peerPick都是在函数调用过程中赋值或初始化的
name: name,
getter: getter,
peers: peers,
cacheBytes: cacheBytes,
loadGroup: &singleflight.Group{},
}
if fn := newGroupHook; fn != nil {
fn(g) //此处函数空
}
groups[name] = g //保存创建的group
return g
}
newGroupHook和initPeerServer在初始化时,可以自定义一下。其它一些简单方法也忽略不说了。关于cache,可以参考groupcache 源码系列三 LRU
由上面的结构体我们可以看出来,groupcache支持namespace概念,不同的namespace有自己的配额以及cache,不同group之间cache是独立的。也就是不能存在某个group的行为影响到另外一个namespace的情况。groupcache的每个节点的cache分为2层,由本节点直接访问后端的,存在maincache,其他存在在hotcache。
3.查找数据
//group查找
func (g *Group) Get(ctx Context, key string, dest Sink) error {
g.peersOnce.Do(g.initPeers) //把httppool赋值给 groupcache.PeerPicker
g.Stats.Gets.Add(1) //统计信息
if dest == nil {
return errors.New("groupcache: nil dest Sink")
}
value, cacheHit := g.lookupCache(key) //从maincache、hotcache查找
if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
// Optimization to avoid double unmarshalling or copying: keep
// track of whether the dest was already populated. One caller
// (if local) will set this; the losers will not. The common
// case will likely be one caller.
destPopulated := false
//从对等节点或自定义查找逻辑(getter)中获取数据
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value) //把数据设置给sink
}
//从maincache、hotcache查找,cache底层使用链表实现并使用lru策略修改链表
func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
if g.cacheBytes <= 0 {
return
}
value, ok = g.mainCache.get(key)
if ok {
return
}
value, ok = g.hotCache.get(key)
return
}
//从对等节点或自定义查找逻辑(getter)中获取数据
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
//此函数使用flightGroup执行策略,保证只有一个goroutine 调用getter函数取数据
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// Check the cache again because singleflight can only dedup calls
// that overlap concurrently. It's possible for 2 concurrent
// requests to miss the cache, resulting in 2 load() calls. An
// unfortunate goroutine scheduling would result in this callback
// being run twice, serially. If we don't check the cache again,
// cache.nbytes would be incremented below even though there will
// be only one entry for this key.
//
// Consider the following serialized event ordering for two
// goroutines in which this callback gets called twice for hte
// same key:
// 1: Get("key") //展示了一个有可能2个以上的goroutine同时执行进入了load,
//这样会导致同一个key对应的数据被多次获取并统计,所以又执行了一次g.lookupCache(key)
// 2: Get("key")
// 1: lookupCache("key")
// 2: lookupCache("key")
// 1: load("key")
// 2: load("key")
// 1: loadGroup.Do("key", fn)
// 1: fn()
// 2: loadGroup.Do("key", fn)
// 2: fn()
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
//通过一致性hash获取对等节点,与httppool对应
if peer, ok := g.peers.PickPeer(key); ok {
//构造protobuf数据,向其他节点发起http请求,查找数据,并存储到hotcache
value, err = g.getFromPeer(ctx, peer, key)
if err == nil {
g.Stats.PeerLoads.Add(1)
return value, nil
}
g.Stats.PeerErrors.Add(1)
// TODO(bradfitz): log the peer's error? keep
// log of the past few for /groupcachez? It's
// probably boring (normal task movement), so not
// worth logging I imagine.
}
//调用getter函数获取数据,并存储到maincache
value, err = g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.LocalLoadErrs.Add(1)
return nil, err
}
g.Stats.LocalLoads.Add(1)
destPopulated = true // only one caller of load gets this return value
g.populateCache(key, value, &g.mainCache)
return value, nil
})
if err == nil {
value = viewi.(ByteView)
}
return
}
func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
err := g.getter.Get(ctx, key, dest)
if err != nil {
return ByteView{}, err
}
return dest.view()
}
这里可以防止缓存击穿,可以参考groupcache 源码系列二 sigleflight
有疑问加站长微信联系(非本文作者)