golang http2长链接

wenxuwan · · 1832 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前几天项目中遇到了一个长链接假死问题,服务端和client端采用的是h2c长连接。服务端作为sidecar部署在k8s的pod里面,当滚动升级pod的时候,client端和老的pod的连接一直存在,即使老的pod已经被删除了。(client和网关是一个东西) #发现问题 突然有一天前端同事说调用全部503(内部服务不可用),赶紧去环境上查看log,发现网关发送到后段的请求全部超时。用netstat查看连接状态也是没问题的。 ![网关端状态图](https://upload-images.jianshu.io/upload_images/11119312-da736bebf845e70e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/666) 去新建的pod上查看,发现并没有连接,因此网关端的连接并不是和新建的pod建立的。 ![服务端状态图](https://upload-images.jianshu.io/upload_images/11119312-c2b8e6047126db29.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/666) #解决问题 ##1.查看网关端代码 首先看一下网关测的代码: ``` func NewH2cClient() *http.Client { client := &http.Client{ Transport: &http2.Transport{ AllowHTTP: true, DialTLS: dialH2cTimeout, }, Timeout: 10 * time.Second, } return client } func dialH2cTimeout(network, addr string, cfg *tls.Config) (net.Conn, error) { tylog.Infof("connect to remote address: %+v:", addr) return net.Dial(network, addr) } ``` client端的实现很简单,复用http.Client对象。在这里设置了TimeOut为10s,这个是发送超时时间。通过log也可以看到10s超时后返回了error,但连接并没有终端,查阅了一些net/http的文档服务端对于超时的控制还是比较细腻的。 ![服务端超时控制](https://upload-images.jianshu.io/upload_images/11119312-3416aa114a04ec34.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 但client端的控制就比较简陋(其实应该是net/http封装的东西太多,导致用户可直接操作的东西太少) ![client端超时控制](https://upload-images.jianshu.io/upload_images/11119312-5d99cd25729e03a8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 从图中可以看出,client端不像服务端有读写超时设置,client端就一个Timeout,是从建立链接到接收到body的时间段。因此我们在client端可以做的东西貌似并不多。 ##2.回归问题 查阅了部分资料后,回到问题的所在点: 为什么服务端已经被删除,但长链接还存在?? 因此我们需要看的就是client端的conn什么时候应该断开。http包中的读和写是分开的,分别对应的是两个goroutine,我们主要应该看的是读操作。 ``` func (cc *ClientConn) readLoop() { rl := &clientConnReadLoop{cc: cc} defer rl.cleanup() //这个会清理链接 cc.readerErr = rl.run() if ce, ok := cc.readerErr.(ConnectionError); ok { cc.wmu.Lock() cc.fr.WriteGoAway(0, ErrCode(ce), nil) cc.wmu.Unlock() } } ``` 上述代码是read的入口,我们看一下run这个函数的实现 ``` func (rl *clientConnReadLoop) run() error { cc := rl.cc rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse gotReply := false // ever saw a HEADERS reply gotSettings := false for { f, err := cc.fr.ReadFrame() if err != nil { cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } if se, ok := err.(StreamError); ok { if cs := cc.streamByID(se.StreamID, false); cs != nil { cs.cc.writeStreamReset(cs.ID, se.Code, err) cs.cc.forgetStreamID(cs.ID) if se.Cause == nil { se.Cause = cc.fr.errDetail } rl.endStreamError(cs, se) } continue } else if err != nil { return err } if VerboseLogs { cc.vlogf("http2: Transport received %s", summarizeFrame(f)) } if !gotSettings { if _, ok := f.(*SettingsFrame); !ok { cc.logf("protocol error: received %T before a SETTINGS frame", f) return ConnectionError(ErrCodeProtocol) } gotSettings = true } maybeIdle := false // whether frame might transition us to idle switch f := f.(type) { case *MetaHeadersFrame: err = rl.processHeaders(f) maybeIdle = true gotReply = true case *DataFrame: err = rl.processData(f) maybeIdle = true case *GoAwayFrame: err = rl.processGoAway(f) maybeIdle = true case *RSTStreamFrame: err = rl.processResetStream(f) maybeIdle = true case *SettingsFrame: err = rl.processSettings(f) case *PushPromiseFrame: err = rl.processPushPromise(f) case *WindowUpdateFrame: err = rl.processWindowUpdate(f) case *PingFrame: err = rl.processPing(f) default: cc.logf("Transport: unhandled response frame type %T", f) } if err != nil { if VerboseLogs { cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err) } return err } if rl.closeWhenIdle && gotReply && maybeIdle { cc.closeIfIdle() } } } ``` 再看一下ReadFrame函数: ``` // ReadFrame reads a single frame. The returned Frame is only valid // until the next call to ReadFrame. // // If the frame is larger than previously set with SetMaxReadFrameSize, the // returned error is ErrFrameTooLarge. Other errors may be of type // ConnectionError, StreamError, or anything else from the underlying // reader. func (fr *Framer) ReadFrame() (Frame, error) { fr.errDetail = nil if fr.lastFrame != nil { fr.lastFrame.invalidate() } fh, err := readFrameHeader(fr.headerBuf[:], fr.r) if err != nil { return nil, err } if fh.Length > fr.maxReadSize { return nil, ErrFrameTooLarge } payload := fr.getReadBuf(fh.Length) if _, err := io.ReadFull(fr.r, payload); err != nil { return nil, err } f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload) if err != nil { if ce, ok := err.(connError); ok { return nil, fr.connError(ce.Code, ce.Reason) } return nil, err } if err := fr.checkFrameOrder(f); err != nil { return nil, err } if fr.logReads { fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f)) } if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil { return fr.readMetaFrame(f.(*HeadersFrame)) } return f, nil } ``` 其实就是一直阻塞读取数据,只有在收到reset或者fin包(EOF)的时候才会退出,然后会执行前面的清理函数: ``` defer rl.cleanup() ``` 该函数会调用MarkDead函数清理conn关联的map ``` func (p *clientConnPool) MarkDead(cc *ClientConn) { p.mu.Lock() defer p.mu.Unlock() for _, key := range p.keys[cc] { vv, ok := p.conns[key] if !ok { continue } newList := filterOutClientConn(vv, cc) if len(newList) > 0 { p.conns[key] = newList } else { delete(p.conns, key) } } delete(p.keys, cc) } ``` 看到这基本就知道问题的所在了: 1. 删除pod的时候服务端没有发送Fin包 2. 服务端发送了Fin包,但网关没收到 ##3.验证问题所在 使用了tcpdump抓包,发现client端的确没有收到Fin包 ![client端抓包](https://upload-images.jianshu.io/upload_images/11119312-73a7402ddfb60850.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1024) 从抓包信息看的确没有收到Fin包,服务端抓也没抓到,于是感觉像k8s升级的时候直接删掉了老的pod(类似直接delete),并没有发送kill命令。但集群权限是拿不到的,不太好认证这个猜想。 ##4.解决方案 因为服务端也是自己写的,因此可以设置deployment来设置pod更新时的动作,比如: ``` "lifecycle": { "preStop": { "httpGet": { "path": "/hc.do?a=offline", "scheme": "HTTP", "port": 8809 } } }, ``` 让服务监听8809端口,k8s在删除pod前会访问 http127.0.0.1:8809/hc.do?a=offline, 这时候在服务里面做优雅下线,关掉tcp链接,这个问题就不会发生了。 ##5.总结 net/http包虽然很容易实现http服务,但对于client端的确不够友好,至少我没找到什么方式可以直接去控制超时时间,或者管理conn连接池,当然我也可以在它基础上实现一个自己的连接池(ClientConnPool接口)但成本不低。所以很多人都会自己去实现。 对于pod删除时信号量的问题,因为没有权限,所以没法看pod滚动升级时,kublet是怎么删除老的节点的,按照我的理解应该是先给容器发kill -15, 如果没退出再kill -9, 但就目前情况来看服务端并没有捕捉到信号量(我加了捕捉信号量,但并未打印出来log),后面有机会再去深入研究一下k8s的那块代码。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1832 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传