先上结论:
当客户端连上groupcache时,能做的只有get获取数据,如果本地有所需要的数据,则直接返回,如果没有,则通过一致性哈希函数判断这个key所对应的peer,然后通过http从这个peer上获取数据;如果这个peer上有需要的数据,则通过http回复给之前的那个groupcache;groupcache收到之后,保存在本地hotCache中,并返回给客户端;如果peer上也没有所需要的数据,则groupcache从数据源(数据库或者文件)获取数据,并将数据保存在本地mainCache,并返回给客户端
以上结论对应的源码部分出现在groupcache.go中,暂时不作分析,先来看看peers.go中的内容。
1.NoPeers
peers.go默认实现了一个没有peer的结构,也就是在groupcache 源码系列四 Sink ByteView里那个简单的例子所运行的情况。
// Context is an opaque value passed through calls to the
// ProtoGetter. It may be nil if your ProtoGetter implementation does
// not require a context.
type Context interface{}
// ProtoGetter is the interface that must be implemented by a peer.
type ProtoGetter interface {
Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error
}
// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
// PickPeer returns the peer that owns the specific key
// and true to indicate that a remote peer was nominated.
// It returns nil, false if the key owner is the current peer.
PickPeer(key string) (peer ProtoGetter, ok bool)
}
// NoPeers is an implementation of PeerPicker that never finds a peer.
type NoPeers struct{}
func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return }
所有的peer结构都要实现PeerPicker接口,即给定一个字符串,返回一个ProtoGetter,ok bool。在后面还会看到,如果是有peer的情况,用的是HTTPPool来代替NoPeers。
2.func getPeers
接下来提供了一个get方法,会根据groupName返回这种peer结构,当然它们的共同点是一样的,也就是返回值类型为PeerPicker接口
func getPeers(groupName string) PeerPicker {
if portPicker == nil {
return NoPeers{}
}
pk := portPicker(groupName)
if pk == nil {
pk = NoPeers{}
}
return pk
}
这里portPicker是一个func类型,允许自定义:
var (
portPicker func(groupName string) PeerPicker
)
// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPeerPicker(fn func() PeerPicker) {
if portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
portPicker = func(_ string) PeerPicker { return fn() }
}
// RegisterPerGroupPeerPicker registers the peer initialization function,
// which takes the groupName, to be used in choosing a PeerPicker.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPerGroupPeerPicker(fn func(groupName string) PeerPicker) {
if portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
portPicker = fn
}
也就是说只要一个func返回PeerPicker,就能自定义了。在http.go里可以搜索到这种使用方式:
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
...
p := &HTTPPool{
self: self,
httpGetters: make(map[string]*httpGetter),
}
...
if p.opts.Replicas == 0 {
p.opts.Replicas = defaultReplicas
}
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
RegisterPeerPicker(func() PeerPicker { return p })
return p
}
3.HTTPPool
上面的NewHTTPPoolOpts中,RegisterPeerPicker注册的函数里,直接返回了p,是个HTTPPool类型,那么可以确定HTTPPool必然实现了PeerPicker接口,也就是PickPeer方法,很容易得到验证:
func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.peers.IsEmpty() {
return nil, false
}
if peer := p.peers.Get(key); peer != p.self {
return p.httpGetters[peer], true
}
return nil, false
}
所以p.peers是什么呢,p.httpGetters又是什么?接着找:
peers *consistenthash.Map
简单来说这是个映射,实现了哈希一致,具体参考groupcache 源码系列一 consistent hash一致性哈希算法,提供Get,Add方法。顺便也能明白上面提到的Replicas,是哈希一致用到的虚拟节点数。
func (m *Map) Get(key string) string {
func (m *Map) Add(keys ...string) {
Get已经看到在哪里用了,那么Add呢?
// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
}
}
在这个Set方法中,还对peers和httpGetters做了关联,就和func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
里看到的一样,也是通过映射,用peer这个字符串作为key。比如
peers_addrs = []string{"http://127.0.0.1:8001",
"http://127.0.0.1:8002", "http://127.0.0.1:8003"}
peers.Set(peers_addrs...)
这样当我们查一个值时,如果需要找相应的peer字符串,就把这个值扔到consistenthash.Map里哈希一下,找到相应的节点,返回那个节点对应的字符串,比如"http://127.0.0.1:8001"
,然后再拿这个串,找到相应的httpGetter。
4.httpGetter
下面肯定是要去看httpGetter是怎么回事了,
type httpGetter struct {
transport func(Context) http.RoundTripper
baseURL string
}
var bufferPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
这里可以参考Golang http.RoundTripper和Golang sync.Pool 和 伪共享false share。
然后就是实现了peers.go里面提到的ProtoGetter接口,也就是说httpGetter其实就是一个ProtoGetter,它可以帮助我们在peer之间使用protobuf来处理数据。
func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
url.QueryEscape(in.GetKey()),
)
req, err := http.NewRequest("GET", u, nil)
if err != nil {
return err
}
tr := http.DefaultTransport
if h.transport != nil {
tr = h.transport(context)
}
res, err := tr.RoundTrip(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("server returned: %v", res.Status)
}
b := bufferPool.Get().(*bytes.Buffer)
b.Reset()
defer bufferPool.Put(b)
_, err = io.Copy(b, res.Body)
if err != nil {
return fmt.Errorf("reading response body: %v", err)
}
err = proto.Unmarshal(b.Bytes(), out)
if err != nil {
return fmt.Errorf("decoding response body: %v", err)
}
return nil
}
通过fmt.Sprintf参数,用/把group和key分割开,然后做了一个Get请求,然后再用proto.Unmarshal把数据放到out * pb.GetResponse里面。这个方法在哪里调用呢,在groupcache.go里面有:
func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
}
res := &pb.GetResponse{}
err := peer.Get(ctx, req, res)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: res.Value}
// TODO(bradfitz): use res.MinuteQps or something smart to
// conditionally populate hotCache. For now just do it some
// percentage of the time.
if rand.Intn(10) == 0 {
g.populateCache(key, value, &g.hotCache)
}
return value, nil
}
构造了GetRequest,GetResponse结构,然后用传入的ProtoGetter执行了上面说的Get请求。
5.对peer请求的处理
对peer的请求确实是发出来了,但是peer作为一个http服务器,应该怎么处理这个请求呢?在http.go里很容易找到:
func NewHTTPPool(self string) *HTTPPool {
p := NewHTTPPoolOpts(self, nil)
http.Handle(p.opts.BasePath, p)
return p
}
这里基础知识可以参考Golang http.Handler接口,根据func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
,很显然HTTPPool还要实现http.Handler接口,也就是ServeHTTP(w ResponseWriter, r *Request):
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Parse request.
if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
fmt.Println("parts:",parts)
groupName := parts[0]
key := parts[1]
// Fetch the value for this group/key.
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
var ctx Context
if p.Context != nil {
ctx = p.Context(r)
}
group.Stats.ServerRequests.Add(1)
var value []byte
err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Write the value to the response body as a proto message.
body, err := proto.Marshal(&pb.GetResponse{Value: value})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
w.Write(body)
}
前面说到包装Get请求时,用/把groupname和key组合到一起,这里当然是要先拆分开,再解析。剩下的事情就是从相应的group里找到数据,用proto包装好,扔出去。
6.group
上面部分代码是groupcache.go中的,也涉及到了Group结构。下一篇重点讲述Group。
有疑问加站长微信联系(非本文作者)