【2-4 Golang】Go并发编程—网络IO

tomato01 · · 726 次点击 · · 开始浏览    

&emsp;&emsp;我们都知道用户程序读写socket的时候,可能阻塞当前协程,那么是不是说明Go语言采用阻塞方式调用socket相关系统调用呢?你有没有想过,Go语言又是如何实现高性能网络IO呢?有没有使用传说中的IO多路复用,如epoll呢? ## 探索Go语言网络IO &emsp;&emsp;HTTP服务肯定涉及到socket的读写吧,而且Go语言启动一个HTTP服务还是非常简单的,几行代码就可以搞定,前面也不需要反向代理服务如Nginx,我们写一个简单的HTTP服务来测试: ``` package main import ( "fmt" "net/http" ) func main() { http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { writer.Write([]byte("hello world")) }) server := &http.Server{ Addr: "0.0.0.0:80", } err := server.ListenAndServe() fmt.Println(err) } //curl http://127.0.0.1:10086/ping //hello world ``` &emsp;&emsp;可以暂时不理解http.Server,只需要知道这是Go语言提供的HTTP服务;我们启动的HTTP服务监听80端口,所有请求都返回"hello world"。程序挺简单的,但是如何验证我们提出的疑问呢?Go语言层面的socket读写,最终肯定会转化为具体的系统调用吧,有一个工具strace,可以监听进程所有的系统调用,我们先通过strace简单看一下。 ``` # ps aux | grep test root 27435 0.0 0.0 219452 4636 pts/0 Sl+ 11:00 0:00 ./test # strace -p 27435 strace: Process 27435 attached epoll_pwait(5, [{EPOLLIN, {u32=1030856456, u64=140403511762696}}], 128, -1, NULL, 0) = 1 futex(0x9234d0, FUTEX_WAKE_PRIVATE, 1) = 1 accept4(3, {sa_family=AF_INET6, sin6_port=htons(56447), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28], SOCK_CLOEXEC|SOCK_NONBLOCK) = 4 epoll_ctl(5, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=1030856248, u64=140403511762488}}) = 0 getsockname(4, {sa_family=AF_INET6, sin6_port=htons(10086), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 0 setsockopt(4, SOL_TCP, TCP_NODELAY, [1], 4) = 0 setsockopt(4, SOL_SOCKET, SO_KEEPALIVE, [1], 4) = 0 setsockopt(4, SOL_TCP, TCP_KEEPINTVL, [180], 4) = 0 setsockopt(4, SOL_TCP, TCP_KEEPIDLE, [180], 4) = 0 futex(0xc000036848, FUTEX_WAKE_PRIVATE, 1) = 1 accept4(3, 0xc0000abac0, 0xc0000abaa4, SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable) futex(0x923d48, FUTEX_WAIT_PRIVATE, 0, NULL) = 0 nanosleep({0, 3000}, NULL) ``` &emsp;&emsp;strace使用起来还是非常简单的,ps命令查出来进程的pid,然后strace -p pid就可以了,同时我们手动curl请求一下。可以很清楚的看到epoll_pwait,epoll_ctl,accept4等系统调用,很明显,Go使用了IO多路复用epoll(不同系统Linux,Windows,Mac不一样)。另外,注意第二个accept4系统调用,返回EAGAIN,并且第四个参数包含标识SOCK_NONBLOCK,看到这基本也能猜到,Go语言采取的是非阻塞方式调用socket相关系统调用。 &emsp;&emsp;Linux系统,高性能网络IO通常使用epoll,epoll可以同时监听多个socket fd是否可读或者可写(socket缓冲区有数据了就是可读,socket缓冲区有空间了就是可写)。epoll使用也比较简单,我们不做过多介绍,读者可以自己查阅相关资料,了解下epoll基于红黑树+双向列表实现,以及水平触发边缘触发等概念。epoll只有三个API: ``` //创建epoll对象 int epoll_create(int size) //添加/修改/删除监听的socket,包括可以设置监听socket的可读还是可写 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); //阻塞等待,直到监听的多个socket可读或者可写;events就是返回的事件列表 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); ``` ## 网络IO与调度器schedule &emsp;&emsp;我们可以猜测下Go语言网络IO流程:socket读写采取的都是非阻塞式,如果不可读或者不可写,会立即返回EAGAIN,此时Go语言会将该socket添加到epoll对象监听,同时阻塞用户协程切换到调度器schedule。而等到合适的时机,再调用epoll_wait获取可读或者可写的socket,从而恢复这些由于socket读写阻塞的用户协程。 &emsp;&emsp;什么时候是合适的时机呢?还记得我们上一篇文章介绍的调度器schedule吗,调度器在获取可执行协程时,还会尝试检测一下,当前是否有协程已经解除阻塞了,其中就包括检测监听的socket是否可读或者可写。这些逻辑都在runtime.findrunnable函数内可以看到: ``` func findrunnable() (gp *g, inheritTime bool) { //本地队列 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } //全局队列 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } //检测是否有socket可读或者可写 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } } } ``` &emsp;&emsp;netpoll对应的就是我们提到的epoll_wait,注意这里输入参数是0(超时时间为0,不会阻塞),即不管是否存在socket可读或者可写,都立即返回,而且返回的就是gList,解除阻塞的协程列表。injectglist函数将协程添加到全局队列,或者是P的本地队列。 &emsp;&emsp;但是我们也可以看到,什么时候检测是否有socket可读或者可写呢?在查找当前P的本地队列,以及查找全局队列之后。那问题来了,如果这两个队列一直有协程怎么办?是不是就一直不会检测socket了状态了,也就是说这些协程会一直这么阻塞了。这肯定不行啊,那怎么办?别忘了我们还有一个辅助线程sysmon,这个函数也会以10ms周期检测的. ``` func sysmon() { delay = 10 * 1000 // up to 10ms for { usleep(delay) lastpoll := int64(atomic.Load64(&sched.lastpoll)) if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) list := netpoll(0) // non-blocking - returns list of goroutines if !list.empty() { incidlelocked(-1) injectglist(&list) incidlelocked(1) } } } } ``` &emsp;&emsp;与调度器schedule类似,同样超时时间为0,不会阻塞;同样的将解除阻塞的协程添加到全局队列,或者是P的本地队列。 &emsp;&emsp;接下来该研究socket读写操作的流程了,当然肯定与我们的猜测类似,非阻塞读写,如果不可读或者不可写,立即返回EAGAIN,于是将该socket添加到epoll对象监听,并且阻塞当前协程,并切换到调度器schedule。一方面,我们可以从上往下,如从server.ListenAndServe往底层逐层去探索,研究socket读写的实现;另一方面,我们已经知道底层一定会走到epoll_ctl,只是我们不知道Go语言统一封装的方法名称,简单浏览下runtime包下的文件,可以找到runtime/netpoll_epoll.go,根据名称基本就能判断这是对epoll的封装,这下简单了,打开调试模式(Goland、dlv都可以调试),打断点,再查看调用栈,socket读写操作的调用链瞬间就清楚了。 ``` 0 0x00000000010304ea in runtime.netpollopen at /Users/xxx/Documents/go1.18/src/runtime/netpoll_epoll.go:64 1 0x000000000105cdf4 in internal/poll.runtime_pollOpen at /Users/xxx/Documents/go1.18/src/runtime/netpoll.go:239 2 0x000000000109e32d in internal/poll.(*pollDesc).init at /Users/xxx/Documents/go1.18/src/internal/poll/fd_poll_runtime.go:39 3 0x000000000109eca6 in internal/poll.(*FD).Init at /Users/xxx/Documents/go1.18/src/internal/poll/fd_unix.go:63 4 0x0000000001150078 in net.(*netFD).init at /Users/xxx/Documents/go1.18/src/net/fd_unix.go:41 5 0x0000000001150078 in net.(*netFD).accept at /Users/xxx/Documents/go1.18/src/net/fd_unix.go:184 6 0x000000000115f5a8 in net.(*TCPListener).accept at /Users/xxx/Documents/go1.18/src/net/tcpsock_posix.go:139 7 0x000000000115e91d in net.(*TCPListener).Accept at /Users/xxx/Documents/go1.18/src/net/tcpsock.go:288 8 0x00000000011ff56a in net/http.(*onceCloseListener).Accept at <autogenerated>:1 9 0x00000000011f3145 in net/http.(*Server).Serve at /Users/xxx/Documents/go1.18/src/net/http/server.go:3039 10 0x00000000011f2d7d in net/http.(*Server).ListenAndServe at /Users/xxx/Documents/go1.18/src/net/http/server.go:2968 ``` &emsp;&emsp;有了这个调用栈,socket读写操作的整个流程基本上没有太大问题了,这里就不再赘述了。我们可以简单看一下Accept的逻辑,是不是之前我们说的,非阻塞读写,如果不可读或者不可写,立即返回EAGAIN,同时将该socket添加到epoll对象监听,以及阻塞当前协程,并切换到调度器schedule。 ``` func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { if err := fd.readLock(); err != nil { return -1, nil, "", err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return -1, nil, "", err } for { s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { case syscall.EINTR: continue case syscall.EAGAIN: if fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: // This means that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. continue } return -1, nil, errcall, err } } ``` &emsp;&emsp;for循环一直尝试执行accept,如果返回EAGAIN;函数waitRead底层就是监听读socket事件,并且阻塞协程以及切换到调度器schedule。 ## 读写超时 &emsp;&emsp;上面我们简单介绍了socket读写操作基本流程,调度器Schedule以及辅助线程sysmon检测socket基本流程。还有两个问题,我们没有提到:1)socket可读或者可写时,如何关联到协程呢?怎么知道哪些协程因为这个socket阻塞了呢?2)高性能服务,socket读写操作肯定是需要设置合理的超时时间的,不然假如依赖服务变慢,用户协程也会跟着长时间阻塞。socket读写超时,怎么实现呢? &emsp;&emsp;我们先回答第一个问题,在回顾下epoll的三个API,其中涉及到一个结构体epoll_event,不仅包含了socket fd,还包含一个void类型指针,通常指向用户自定义数据。Go语言也是这么做的,自定义了结构runtime.pollDesc: ``` type pollDesc struct { fd uintptr // constant for pollDesc usage lifetime //指向读socket阻塞的协程 rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil //指向写socket阻塞的协程 wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil //读超时定时器 rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline (a nanotime in the future, -1 when expired) //写超时定时器 wt timer // write deadline timer wd int64 // write deadline (a nanotime in the future, -1 when expired) } ``` &emsp;&emsp;pollDesc结构包含了读写socket阻塞的协程指针,这样一来,在通过epoll_ctl监听socket时,使得epoll_event指向pollDesc结构就行了,epoll_wait返回事件列表之后,就能解析出来结构pollDesc,从而解除对应协程的阻塞。 &emsp;&emsp;另外,我们也能看到pollDesc结构还包含了设置的读写超时时间,以及超时定时器。通过这定义也基本能确定,socket超时是基于定时器实现的。如果你仔细研究过上一小节介绍的socket读写操作流程,应该就能在internal/poll/fd_poll_runtime.go发现还有其他一些函数声明,包括runtime_pollSetDeadline,设置超时时间。Go语言处理HTTP请求时,默认是有读写超时时间的,同样的,我们可以输出该流程的调用栈: ``` 0 0x000000000105d0ef in internal/poll.runtime_pollSetDeadline at /Users/xxx/Documents/go1.18/src/runtime/netpoll.go:323 1 0x000000000109e95e in internal/poll.setDeadlineImpl at /Users/xxx/Documents/go1.18/src/internal/poll/fd_poll_runtime.go:160 2 0x000000000115a0c8 in internal/poll.(*FD).SetReadDeadline at /Users/xxx/Documents/go1.18/src/internal/poll/fd_poll_runtime.go:137 3 0x000000000115a0c8 in net.(*netFD).SetReadDeadline at /Users/xxx/Documents/go1.18/src/net/fd_posix.go:142 4 0x000000000115a0c8 in net.(*conn).SetReadDeadline at /Users/xxx/Documents/go1.18/src/net/net.go:250 5 0x00000000011ea591 in net/http.(*conn).readRequest at /Users/xxx/Documents/go1.18/src/net/http/server.go:975 6 0x00000000011ee9ab in net/http.(*conn).serve at /Users/xxx/Documents/go1.18/src/net/http/server.go:1891 7 0x00000000011f352e in net/http.(*Server).Serve.func3 at /Users/xxx/Documents/go1.18/src/net/http/server.go:3071 ``` &emsp;&emsp;我们已经知道,超时时间是通过定时器实现的,所以函数poll_runtime_pollSetDeadline最终其实也是添加了定时器而已(定时器将在下一篇文章介绍),而定时器的处理函数为netpollReadDeadline或netpollDeadline或netpollWriteDeadline(根据读写操作不同)。超时了怎么办?一来肯定是设置超时标识,二来如果当前有协程因为socket阻塞还需唤醒该协程。 ## 总结 &emsp;&emsp;Go语言高性能网络IO其实还是基于IO多路复用技术(如epoll)实现的,读写socket都是非阻塞操作,如果不可读或者不可写,则将该socket添加到epoll对象监听。而调度器schedule,以及辅助线程sysmon也会不定时检测是否有socket又改变状态可读或者可写了,从而恢复这些由于socket读写而阻塞的协程。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

726 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传