Go提供了功能完备的标准网络库:net
包,net
包的实现相当之全面,httptcpudp均有实现且对用户提供了简单友好的使用接口。在Linux系统上Go使用了epoll来实现net包的核心部分,本文从用户接口层入手,分析Go在Linux平台上的epoll使用,文中若有不当之处请指出。
对于服务端程序而言,主要流程是Listen->Accept->Send/Write,客户端主要流程Connect->Send/Write,本文以这两个流程深入分析net
包在Go中是如何实现的。
Listen
监听方法是在ListenConfig结构中的Listen方法实现的(net/dial.go
):
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
// ...
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
}
return l, nil
}
在Listen
函数实现中,两个关键流程是DefaultResolver.resolveAddrList
和listenTCP
。
-
DefaultResolver.resolveAddrList
是根据协议名称和地址取得Internet协议族地址列表,由于resolveAddrList
的代码比较固定,在此不做详细解释,感兴趣的童鞋可以去翻阅。 -
listenTCP
和listenUnix
从地址列表中取得满足条件的地址进行实际监听操作, 具体根据传入的协议族来确定。
接下来看看listenTCP
的代码(net/tcpsock_posix.go
):
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
创建监听socket fd是在internetSocket中进行的,而socket fs是通过socket函数创建的(net/sock_posix.go
):
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 调用各平台对应的socket api创建socket
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
// 设置socket选项
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
// 创建fd
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
// 监听
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// TCP
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
// UDP
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
// 发起连接,非listen socket会走到这里来
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
socket
函数主要流程:新建socket-->设置socket option-->创建fd-->进入监听逻辑。sysSocket
根据平台有不同实现,windows实现在socket_windows.go
中,linux实现则在sock_cloexec.go
中,本文重点分析在linux平台上的实现(net/sock_cloexec.go
):
func sysSocket(family, sotype, proto int) (int, error) {
// 系统socket函数
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
switch err {
case nil:
return s, nil
default:
return -1, os.NewSyscallError("socket", err)
case syscall.EPROTONOSUPPORT, syscall.EINVAL:
}
// linux内核版本低于2.6.27时,代码会走到这里,下面的内容主要是防止在fork时候导致描述符泄露
// 实际上手动实现简易版SOCK_CLOEXEC
syscall.ForkLock.RLock()
// do other things...
}
socketFunc
创建了socket,通知将socket设置非阻塞(SOCK_NONBLOCK
)以及fork时关闭(SOCK_CLOEXEC
),这两个标志是在linux内核版本2.6.27之后添加,在此之前的版本代码将会走到syscall.ForkLock.RLock()
,主要是为了防止在fork时导致文件描述符泄露。
当socket创建之后进入新建fd流程,在Go的包装层面,fd均以netFD
结构表示,该接口描述原始socket的地址信息、协议类型、协议族以及option,netFD
在整个包装结构中居于用户接口的下一层。最后进入监听逻辑,逻辑走向区分TCP和UDP,而监听逻辑比较简单,即调用系统bind和listen接口(net/sock_posix.go
):
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
// ...
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
listenStream
主要做了以下几件事:
- 检查未完成连接和已完成连接两个队列是否超出系统预设。
- 调用socket bind接口。
- 调用socket listen接口。
- 初始化fd。
- 调用socket getsockname接口。
以上流程和日常写socket代码流程并无太大差异,唯有第4流程不同,第4流程是与底层的netpoll
交互。
Linux平台上,系统提供了五种IO模型:阻塞IO、非阻塞IO、IO多路复用、信号驱动IO和异步IO,对应到内核层面提供的用户接口即select、poll和epoll。Go net包是基于epoll进行封装的,基本模型结合了epoll和Go语言的优势:epoll+goroutine,这样达到异步且高并发。
回到源代码上,fd.init()
完成网络轮询器初始化操作,开始与更底层的封装打交道,最底层的封装是epoll调用(runtime/netpoll_epoll.go
):
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 {
return
}
epfd = epollcreate(1024)
if epfd >= 0 {
closeonexec(epfd)
return
}
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
epollcreate
创建了epoll handle并设置为CLOEXEC
属性,此处是epoll handle的创建,netpollinit
之后调用(runtime/netpoll_epoll.go
):
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
// 可读,可写,对端断开,边缘触发
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
// 存放user data,后面读写均会用到pollDesc
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
// 注册epoll事件
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
由上可见,调用epoll_ctl
完成epoll监听事件注册,_EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
为所关心事件,具体含义可以查看epoll手册。
Accept
Listen
成功之后返回TCPListener
对象,手动调用Accept
进入监听状态,最终会走到与epoll交互流程: TCPListener.Accept
-->TCPListener.accept
-->netFD.accept
-->FD.Accept
从这里开始,进入到与pollDesc
交互的地方(internal/poll/fd_unix.go
):
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
continue
}
return -1, nil, errcall, err
}
}
fd.pd.prepareRead
检查当前fd是否允许accept,实际上是检查更底层的pollDesc
是否可读。检查完毕之后,尝试调用accept
获取已连接的socket,注意此代码在for循环内,说明Accept
是阻塞的,直到有连接进来;当遇到EAGIN
和ECONNABORTED
错误会重试,其他错误都抛给更上层。
fd.pd.waitRead
阻塞等待fd
是否可读,即是否有新连接进来,最终进入到runtime.poll_runtime_pollWait
里(runtime/netpoll.go
),在解释poll_runtime_pollWait
代码之前,先来看看最重要的结构:
type pollDesc struct {
// ...
rg uintptr
wg uintptr
// ...
}
const (
pdReady uintptr = 1
pdWait uintptr = 2
)
pollDesc
是与epoll交互最重要的结构之一,可以理解为与epoll之间的桥梁,其中rg
和wg
为状态信号量,可能的值为pdReady
、pdWait
、等待文件描述符可读或者可写的goroutine地址以及nil
(0)。
可能出现的情况:
- 当值为pdRead时,代表网络IO就绪,处理完之后应该设置为nil。
-
当值为pdWait时,即等待被挂起(现在并未被挂起)。后面可能出现的情况是:
- goroutine被挂起并设置为goroutine的地址;
- 收到了IO通知就绪;
- 超时或者被关闭设置为nil。
接下来看看poll_runtime_pollWait
代码:
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// ...
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
// (1)
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// (2)
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// (3)
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// (4)
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
poll_runtime_pollWait
等待fd可读,这里最重要的逻辑在netpollblock
函数里完成(根据代码中注释序号依次解释):
(1) 根据mode
获取对应的信号量地址gpp,判断当前是否pdReady
。
(2) 这段代码的逻辑是当gpp
的值如果等于0时,将gpp
的值更替为pdWait
,该操作属于原子操作且内部实现了自旋锁。
(3) 当值为pdWait
之后,防止此时可能会有其他的并发操作修改pd里的内容,所以需要再次检查错误状态。gopark
将当前goroutine置于等待状态并等待下一次的调度,但gopark
仍有可能因为超时或者关闭会立即返回,由于gopark
涉及到goroutine调度,在此不做赘述。
(4) 通过原子操作将gpp
的值设置为0,返回修改前的值并判断是否pdReady
。
至此,FD.Accept
结束返回,之后的操作与前面Listen
类似,创建netFD、初始化netFD、创建TCPConn对象。
小结
通过上面流程化的跟踪,发现netFD
、FD
、pollDesc
在这个过程中占据非常重要的位置,小结内容将会着重分析这几个结构,目的在于解构封装层次。
netFD
netFD
包含在conn
结构中,而conn
又包含在TCPConn
结构中,由此可见netFD
处于用户接口层下面。
type netFD struct {
pfd poll.FD
family int
sotype int
isConnected bool
net string
laddr Addr
raddr Addr
}
netFD
比较简单,只有一些基本的socket信息,pfd
是其下一层,用户层接口的调用会进入到pfd
中。
FD
type FD struct {
// 读写锁
fdmu fdMutex
// 系统文件描述符
Sysfd int
// I/O poller.
pd pollDesc
// 用于在一次函数调用中读、写多个非连续缓冲区,这里主要是写
iovecs *[]syscall.Iovec
// 关闭文件时的信号量
csema uint32
// 如果此文件已设置为阻止模式,则为非零值
isBlocking uint32
// TCP或UDP
IsStream bool
// 读取到0字节时是否为错误,对于基于消息的基础socket而言为false
ZeroReadIsEOF bool
// 是否系统中真实文件还是socket连接
isFile bool
}
FD
是Go中通用的文件描述符类型,net包和os包用FD
来表示网络连接或者文件,FD
提供了用户接口层到runtime之间逻辑处理。此处的pollDesc是poll.pollDesc
而非runtime.pollDesc
, poll.pollDesc
在internal/poll/fd_poll_runtime.go
中实现了与runtime交互的接口。
runtime.pollDesc
type pollDesc struct {
// 存放pollDesc,全局
link *pollDesc
// lock保护pollOpen, pollSetDeadline, pollUnblock and deadlineimpl等并发操作
// 以上操作包括了seq、rt和wt变量,fd在pollDesc生命周期内恒定,其他变量均以无锁方式
lock mutex
fd uintptr
// 关闭标记,一般主动关闭或者超时
closing bool
// 是否标记事件扫描错误
everr bool
user uint32
// fd被重用或者读计时器被重置
rseq uintptr
// 读信号量,值可能为pdRead、pdWait、goroutine地址或nil(0)
rg uintptr
// 读的等待过期时间
rt timer
rd int64
// fd被重用或者写计时器被重置
wseq uintptr
/// 写信号量,值可能为pdRead、pdWait、goroutine地址或nil(0)
wg uintptr
// 写的等待过期时间
wt timer
wd int64
}
pollDesc
是抽象实现,它将epoll、kqueue、iocp等方式抽象统一,规定了各个平台实现的接口规范,即netpoll
。
现在我们大致清楚了epoll在Go中的封装结构:netFD
将接口逻辑转发到FD
,FD
提供了用户接口层到runtime之间的逻辑处理,且FD
是通用抽象逻辑,适用于文件和网络连接;poll.pollDesc
抽象了与runtime交互的接口和逻辑,而epoll的逻辑则被拆分到runtime/netpoll.go
和runtime/netpoll_epoll.go
中,整个结构渐渐式,分工责任明确,大致层次结构如下:
有了上面的层次结构图基础,Read
和Write
的流程就比较简单了,Read
的调用链:conn.Read
-->netFD.Read
-->FD.Read
,Write
调用链类似,Read
和Write
的底层实现在internal/poll/fd_unix.go
文件中,有兴趣可以翻阅。
有疑问加站长微信联系(非本文作者)