groupcache 源码系列五 peers.go http.go

懒皮 · · 626 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

先上结论:

当客户端连上groupcache时,能做的只有get获取数据,如果本地有所需要的数据,则直接返回,如果没有,则通过一致性哈希函数判断这个key所对应的peer,然后通过http从这个peer上获取数据;如果这个peer上有需要的数据,则通过http回复给之前的那个groupcache;groupcache收到之后,保存在本地hotCache中,并返回给客户端;如果peer上也没有所需要的数据,则groupcache从数据源(数据库或者文件)获取数据,并将数据保存在本地mainCache,并返回给客户端

以上结论对应的源码部分出现在groupcache.go中,暂时不作分析,先来看看peers.go中的内容。

1.NoPeers

peers.go默认实现了一个没有peer的结构,也就是在groupcache 源码系列四 Sink ByteView里那个简单的例子所运行的情况。

// Context is an opaque value passed through calls to the
// ProtoGetter. It may be nil if your ProtoGetter implementation does
// not require a context.
type Context interface{}

// ProtoGetter is the interface that must be implemented by a peer.
type ProtoGetter interface {
    Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error
}

// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
    // PickPeer returns the peer that owns the specific key
    // and true to indicate that a remote peer was nominated.
    // It returns nil, false if the key owner is the current peer.
    PickPeer(key string) (peer ProtoGetter, ok bool)
}

// NoPeers is an implementation of PeerPicker that never finds a peer.
type NoPeers struct{}

func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return }

所有的peer结构都要实现PeerPicker接口,即给定一个字符串,返回一个ProtoGetter,ok bool。在后面还会看到,如果是有peer的情况,用的是HTTPPool来代替NoPeers。

2.func getPeers

接下来提供了一个get方法,会根据groupName返回这种peer结构,当然它们的共同点是一样的,也就是返回值类型为PeerPicker接口

func getPeers(groupName string) PeerPicker {
    if portPicker == nil {
        return NoPeers{}
    }
    pk := portPicker(groupName)
    if pk == nil {
        pk = NoPeers{}
    }
    return pk
}

这里portPicker是一个func类型,允许自定义:

var (
    portPicker func(groupName string) PeerPicker
)

// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPeerPicker(fn func() PeerPicker) {
    if portPicker != nil {
        panic("RegisterPeerPicker called more than once")
    }
    portPicker = func(_ string) PeerPicker { return fn() }
}

// RegisterPerGroupPeerPicker registers the peer initialization function,
// which takes the groupName, to be used in choosing a PeerPicker.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPerGroupPeerPicker(fn func(groupName string) PeerPicker) {
    if portPicker != nil {
        panic("RegisterPeerPicker called more than once")
    }
    portPicker = fn
}

也就是说只要一个func返回PeerPicker,就能自定义了。在http.go里可以搜索到这种使用方式:

func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
    ...

    p := &HTTPPool{
        self:        self,
        httpGetters: make(map[string]*httpGetter),
    }
    ...
    if p.opts.Replicas == 0 {
        p.opts.Replicas = defaultReplicas
    }
    p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)

    RegisterPeerPicker(func() PeerPicker { return p })
    return p
}
3.HTTPPool

上面的NewHTTPPoolOpts中,RegisterPeerPicker注册的函数里,直接返回了p,是个HTTPPool类型,那么可以确定HTTPPool必然实现了PeerPicker接口,也就是PickPeer方法,很容易得到验证:

func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
    p.mu.Lock()
    defer p.mu.Unlock()
    if p.peers.IsEmpty() {
        return nil, false
    }
    if peer := p.peers.Get(key); peer != p.self {
        return p.httpGetters[peer], true
    }
    return nil, false
}

所以p.peers是什么呢,p.httpGetters又是什么?接着找:

peers       *consistenthash.Map

简单来说这是个映射,实现了哈希一致,具体参考groupcache 源码系列一 consistent hash一致性哈希算法,提供Get,Add方法。顺便也能明白上面提到的Replicas,是哈希一致用到的虚拟节点数。

func (m *Map) Get(key string) string {
func (m *Map) Add(keys ...string) {

Get已经看到在哪里用了,那么Add呢?

// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".
func (p *HTTPPool) Set(peers ...string) {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
    p.peers.Add(peers...)
    p.httpGetters = make(map[string]*httpGetter, len(peers))
    for _, peer := range peers {
        p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
    }
}

在这个Set方法中,还对peers和httpGetters做了关联,就和func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {里看到的一样,也是通过映射,用peer这个字符串作为key。比如

peers_addrs = []string{"http://127.0.0.1:8001", 
"http://127.0.0.1:8002", "http://127.0.0.1:8003"}
peers.Set(peers_addrs...)

这样当我们查一个值时,如果需要找相应的peer字符串,就把这个值扔到consistenthash.Map里哈希一下,找到相应的节点,返回那个节点对应的字符串,比如"http://127.0.0.1:8001",然后再拿这个串,找到相应的httpGetter。

4.httpGetter

下面肯定是要去看httpGetter是怎么回事了,

type httpGetter struct {
    transport func(Context) http.RoundTripper
    baseURL   string
}

var bufferPool = sync.Pool{
    New: func() interface{} { return new(bytes.Buffer) },
}

这里可以参考Golang http.RoundTripperGolang sync.Pool 和 伪共享false share

然后就是实现了peers.go里面提到的ProtoGetter接口,也就是说httpGetter其实就是一个ProtoGetter,它可以帮助我们在peer之间使用protobuf来处理数据。

func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error {
    u := fmt.Sprintf(
        "%v%v/%v",
        h.baseURL,
        url.QueryEscape(in.GetGroup()),
        url.QueryEscape(in.GetKey()),
    )
    req, err := http.NewRequest("GET", u, nil)
    if err != nil {
        return err
    }
    tr := http.DefaultTransport
    if h.transport != nil {
        tr = h.transport(context)
    }
    res, err := tr.RoundTrip(req)
    if err != nil {
        return err
    }
    defer res.Body.Close()
    if res.StatusCode != http.StatusOK {
        return fmt.Errorf("server returned: %v", res.Status)
    }
    b := bufferPool.Get().(*bytes.Buffer)
    b.Reset()
    defer bufferPool.Put(b)
    _, err = io.Copy(b, res.Body)
    if err != nil {
        return fmt.Errorf("reading response body: %v", err)
    }
    err = proto.Unmarshal(b.Bytes(), out)
    if err != nil {
        return fmt.Errorf("decoding response body: %v", err)
    }
    return nil
}

通过fmt.Sprintf参数,用/把group和key分割开,然后做了一个Get请求,然后再用proto.Unmarshal把数据放到out * pb.GetResponse里面。这个方法在哪里调用呢,在groupcache.go里面有:

func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
    req := &pb.GetRequest{
        Group: &g.name,
        Key:   &key,
    }
    res := &pb.GetResponse{}
    err := peer.Get(ctx, req, res)
    if err != nil {
        return ByteView{}, err
    }
    value := ByteView{b: res.Value}
    // TODO(bradfitz): use res.MinuteQps or something smart to
    // conditionally populate hotCache.  For now just do it some
    // percentage of the time.
    if rand.Intn(10) == 0 {
        g.populateCache(key, value, &g.hotCache)
    }
    return value, nil
}

构造了GetRequest,GetResponse结构,然后用传入的ProtoGetter执行了上面说的Get请求。

5.对peer请求的处理

对peer的请求确实是发出来了,但是peer作为一个http服务器,应该怎么处理这个请求呢?在http.go里很容易找到:

func NewHTTPPool(self string) *HTTPPool {
    p := NewHTTPPoolOpts(self, nil)
    http.Handle(p.opts.BasePath, p)
    return p
}

这里基础知识可以参考Golang http.Handler接口,根据func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {,很显然HTTPPool还要实现http.Handler接口,也就是ServeHTTP(w ResponseWriter, r *Request):

func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // Parse request.
    if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
        panic("HTTPPool serving unexpected path: " + r.URL.Path)
    }
    parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
    if len(parts) != 2 {
        http.Error(w, "bad request", http.StatusBadRequest)
        return
    }
    fmt.Println("parts:",parts)
    groupName := parts[0]
    key := parts[1]

    // Fetch the value for this group/key.
    group := GetGroup(groupName)
    if group == nil {
        http.Error(w, "no such group: "+groupName, http.StatusNotFound)
        return
    }
    var ctx Context
    if p.Context != nil {
        ctx = p.Context(r)
    }

    group.Stats.ServerRequests.Add(1)
    var value []byte
    err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    // Write the value to the response body as a proto message.
    body, err := proto.Marshal(&pb.GetResponse{Value: value})
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    w.Header().Set("Content-Type", "application/x-protobuf")
    w.Write(body)
}

前面说到包装Get请求时,用/把groupname和key组合到一起,这里当然是要先拆分开,再解析。剩下的事情就是从相应的group里找到数据,用proto包装好,扔出去。

6.group
上面部分代码是groupcache.go中的,也涉及到了Group结构。下一篇重点讲述Group。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:懒皮

查看原文:groupcache 源码系列五 peers.go http.go

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

626 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传