前几天项目中遇到了一个长链接假死问题,服务端和client端采用的是h2c长连接。服务端作为sidecar部署在k8s的pod里面,当滚动升级pod的时候,client端和老的pod的连接一直存在,即使老的pod已经被删除了。(client和网关是一个东西)
发现问题
突然有一天前端同事说调用全部503(内部服务不可用),赶紧去环境上查看log,发现网关发送到后段的请求全部超时。用netstat查看连接状态也是没问题的。
去新建的pod上查看,发现并没有连接,因此网关端的连接并不是和新建的pod建立的。
解决问题
1.查看网关端代码
首先看一下网关测的代码:
func NewH2cClient() *http.Client {
client := &http.Client{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLS: dialH2cTimeout,
},
Timeout: 10 * time.Second,
}
return client
}
func dialH2cTimeout(network, addr string, cfg *tls.Config) (net.Conn, error) {
tylog.Infof("connect to remote address: %+v:", addr)
return net.Dial(network, addr)
}
client端的实现很简单,复用http.Client对象。在这里设置了TimeOut为10s,这个是发送超时时间。通过log也可以看到10s超时后返回了error,但连接并没有终端,查阅了一些net/http的文档服务端对于超时的控制还是比较细腻的。
但client端的控制就比较简陋(其实应该是net/http封装的东西太多,导致用户可直接操作的东西太少)
从图中可以看出,client端不像服务端有读写超时设置,client端就一个Timeout,是从建立链接到接收到body的时间段。因此我们在client端可以做的东西貌似并不多。
2.回归问题
查阅了部分资料后,回到问题的所在点:
为什么服务端已经被删除,但长链接还存在??
因此我们需要看的就是client端的conn什么时候应该断开。http包中的读和写是分开的,分别对应的是两个goroutine,我们主要应该看的是读操作。
func (cc *ClientConn) readLoop() {
rl := &clientConnReadLoop{cc: cc}
defer rl.cleanup() //这个会清理链接
cc.readerErr = rl.run()
if ce, ok := cc.readerErr.(ConnectionError); ok {
cc.wmu.Lock()
cc.fr.WriteGoAway(0, ErrCode(ce), nil)
cc.wmu.Unlock()
}
}
上述代码是read的入口,我们看一下run这个函数的实现
func (rl *clientConnReadLoop) run() error {
cc := rl.cc
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
for {
f, err := cc.fr.ReadFrame()
if err != nil {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(StreamError); ok {
if cs := cc.streamByID(se.StreamID, false); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
cs.cc.forgetStreamID(cs.ID)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
rl.endStreamError(cs, se)
}
continue
} else if err != nil {
return err
}
if VerboseLogs {
cc.vlogf("http2: Transport received %s", summarizeFrame(f))
}
if !gotSettings {
if _, ok := f.(*SettingsFrame); !ok {
cc.logf("protocol error: received %T before a SETTINGS frame", f)
return ConnectionError(ErrCodeProtocol)
}
gotSettings = true
}
maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) {
case *MetaHeadersFrame:
err = rl.processHeaders(f)
maybeIdle = true
gotReply = true
case *DataFrame:
err = rl.processData(f)
maybeIdle = true
case *GoAwayFrame:
err = rl.processGoAway(f)
maybeIdle = true
case *RSTStreamFrame:
err = rl.processResetStream(f)
maybeIdle = true
case *SettingsFrame:
err = rl.processSettings(f)
case *PushPromiseFrame:
err = rl.processPushPromise(f)
case *WindowUpdateFrame:
err = rl.processWindowUpdate(f)
case *PingFrame:
err = rl.processPing(f)
default:
cc.logf("Transport: unhandled response frame type %T", f)
}
if err != nil {
if VerboseLogs {
cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
}
return err
}
if rl.closeWhenIdle && gotReply && maybeIdle {
cc.closeIfIdle()
}
}
}
再看一下ReadFrame函数:
// ReadFrame reads a single frame. The returned Frame is only valid
// until the next call to ReadFrame.
//
// If the frame is larger than previously set with SetMaxReadFrameSize, the
// returned error is ErrFrameTooLarge. Other errors may be of type
// ConnectionError, StreamError, or anything else from the underlying
// reader.
func (fr *Framer) ReadFrame() (Frame, error) {
fr.errDetail = nil
if fr.lastFrame != nil {
fr.lastFrame.invalidate()
}
fh, err := readFrameHeader(fr.headerBuf[:], fr.r)
if err != nil {
return nil, err
}
if fh.Length > fr.maxReadSize {
return nil, ErrFrameTooLarge
}
payload := fr.getReadBuf(fh.Length)
if _, err := io.ReadFull(fr.r, payload); err != nil {
return nil, err
}
f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
if err != nil {
if ce, ok := err.(connError); ok {
return nil, fr.connError(ce.Code, ce.Reason)
}
return nil, err
}
if err := fr.checkFrameOrder(f); err != nil {
return nil, err
}
if fr.logReads {
fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f))
}
if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil {
return fr.readMetaFrame(f.(*HeadersFrame))
}
return f, nil
}
其实就是一直阻塞读取数据,只有在收到reset或者fin包(EOF)的时候才会退出,然后会执行前面的清理函数:
defer rl.cleanup()
该函数会调用MarkDead函数清理conn关联的map
func (p *clientConnPool) MarkDead(cc *ClientConn) {
p.mu.Lock()
defer p.mu.Unlock()
for _, key := range p.keys[cc] {
vv, ok := p.conns[key]
if !ok {
continue
}
newList := filterOutClientConn(vv, cc)
if len(newList) > 0 {
p.conns[key] = newList
} else {
delete(p.conns, key)
}
}
delete(p.keys, cc)
}
看到这基本就知道问题的所在了:
- 删除pod的时候服务端没有发送Fin包
- 服务端发送了Fin包,但网关没收到
3.验证问题所在
使用了tcpdump抓包,发现client端的确没有收到Fin包
从抓包信息看的确没有收到Fin包,服务端抓也没抓到,于是感觉像k8s升级的时候直接删掉了老的pod(类似直接delete),并没有发送kill命令。但集群权限是拿不到的,不太好认证这个猜想。
4.解决方案
因为服务端也是自己写的,因此可以设置deployment来设置pod更新时的动作,比如:
"lifecycle": {
"preStop": {
"httpGet": {
"path": "/hc.do?a=offline",
"scheme": "HTTP",
"port": 8809
}
}
},
让服务监听8809端口,k8s在删除pod前会访问 http127.0.0.1:8809/hc.do?a=offline, 这时候在服务里面做优雅下线,关掉tcp链接,这个问题就不会发生了。
5.总结
net/http包虽然很容易实现http服务,但对于client端的确不够友好,至少我没找到什么方式可以直接去控制超时时间,或者管理conn连接池,当然我也可以在它基础上实现一个自己的连接池(ClientConnPool接口)但成本不低。所以很多人都会自己去实现。
对于pod删除时信号量的问题,因为没有权限,所以没法看pod滚动升级时,kublet是怎么删除老的节点的,按照我的理解应该是先给容器发kill -15, 如果没退出再kill -9, 但就目前情况来看服务端并没有捕捉到信号量(我加了捕捉信号量,但并未打印出来log),后面有机会再去深入研究一下k8s的那块代码。
有疑问加站长微信联系(非本文作者)