Golang网络:核心API实现剖析(一)

丁凯 · · 3636 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

这一章节我们将详细描述网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,我们选择忽略所有的错误。这样可以使得代码看起来更为简单。 而且我们只关注tcp协议实现,udp和unix socket不是我们关心的。

Listen

func Listen(net, laddr string) (Listener, error) {
   la, err := resolveAddr("listen", net, laddr, noDeadline)
   ......
   switch la := la.toAddr().(type) {
   case *TCPAddr:
       l, err = ListenTCP(net, la)
   case *UnixAddr:
       ......
   }
  ......
}

// 对于tcp协议,返回的的是TCPListener
func ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) {
   ......
   fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")
   ......
   return &TCPListener{fd}, nil
}

func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) {
   ......
   return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)
}

func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) {
   // 创建底层socket,设置属性为O_NONBLOCK
   s, err := sysSocket(family, sotype, proto)
   ......
   setDefaultSockopts(s, family, sotype, ipv6only)
   // 创建新netFD结构
   fd, err = newFD(s, family, sotype, net)
   ......
   if laddr != nil && raddr == nil {
       switch sotype {
       case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
           // 调用底层listen监听创建的套接字
           fd.listenStream(laddr, listenerBacklog)
           return fd, nil
       case syscall.SOCK_DGRAM:
           ......
       }
   }   
}

// 最终调用该函数来创建一个socket
// 并且将socket属性设置为O_NONBLOCK
func sysSocket(family, sotype, proto int) (int, error) {
   syscall.ForkLock.RLock()
   s, err := syscall.Socket(family, sotype, proto)
   if err == nil {
       syscall.CloseOnExec(s)
   }
   syscall.ForkLock.RUnlock()
   if err != nil {
       return -1, err
   }
   if err = syscall.SetNonblock(s, true); err != nil {
       syscall.Close(s)
       return -1, err
   }
   return s, nil
}

func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {
   if err := setDefaultListenerSockopts(fd.sysfd)
   if lsa, err := laddr.sockaddr(fd.family); err != nil {
       return err
   } else if lsa != nil {
       // Bind绑定至该socket
       if err := syscall.Bind(fd.sysfd, lsa); err != nil {
           return os.NewSyscallError("bind", err)
       }
   }
   // 监听该socket
   if err := syscall.Listen(fd.sysfd, backlog); 
   // 这里非常关键:初始化socket与异步IO相关的内容
   if err := fd.init(); err != nil {
       return err
   }
   lsa, _ := syscall.Getsockname(fd.sysfd)
   fd.setAddr(fd.addrFunc()(lsa), nil)
   return nil
}

我们这里看到了如何实现Listen。流程基本都很简单,但是因为我们使用了异步编程,因此,我们在Listen完该socket后,还必须将其添加到监听队列中,以后该socket有事件到来时能够及时通知到。

对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那我们看对一个监听socket,是如何将其添加到epoll的监听队列中呢?

func (fd *netFD) init() error {
   if err := fd.pd.Init(fd); err != nil {
       return err
   }
   return nil
}

func (pd *pollDesc) Init(fd *netFD) error {
   // 利用了Once机制,保证一个进程只会执行一次
   // runtime_pollServerInit: 
   // TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0
   // JMP runtime·netpollServerInit(SB)
   serverInit.Do(runtime_pollServerInit)
   // runtime_pollOpen:
   // TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0
   // JMP runtime·netpollOpen(SB)
   ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
   if errno != 0 {
       return syscall.Errno(errno)
   }
   pd.runtimeCtx = ctx
   return nil
}

这里就是socket异步编程的关键:

netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd >= 0 {
        return
    }
    epfd = epollcreate(1024)
    if epfd >= 0 {
        closeonexec(epfd)
        return
    }
    ......
}

netpollOpen则在socket被创建出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen

func netpollopen(fd uintptr, pd *pollDesc) int32 {
   var ev epollevent
   ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
   *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
   return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

OK,看到这里,我们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,然后将该socket添加到 epoll的事件监听队列中。

Accept

既然我们描述的重点的tcp协议,因此,我们看看TCPListener的Accept方法是怎么实现的:

func (l *TCPListener) Accept() (Conn, error) {
    c, err := l.AcceptTCP()
    ......
}

func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
    ......
    fd, err := l.fd.accept()
    ......
    // 返回给调用者一个新的TCPConn
    return newTCPConn(fd), nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
    // 为什么对该函数加读锁?
    if err := fd.readLock(); err != nil {
        return nil, err
    }
    defer fd.readUnlock()
    ......
    for {
        // 这个accept是golang包装的系统调用
        // 用来处理跨平台
        s, rsa, err = accept(fd.sysfd)
        if err != nil {
            if err == syscall.EAGAIN {
                // 如果没有可用连接,WaitRead()阻塞该协程
                // 后面会详细分析WaitRead.
                if err = fd.pd.WaitRead(); err == nil {
                    continue
                }
            } else if err == syscall.ECONNABORTED {
                // 如果连接在Listen queue时就已经被对端关闭
                continue
            }
        }
        break
    }

    netfd, err = newFD(s, fd.family, fd.sotype, fd.net)
    ......
    // 这个前面已经分析,将该fd添加到epoll队列中
    err = netfd.init()
    ......
    lsa, _ := syscall.Getsockname(netfd.sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

OK,从前面的编程事例中我们知道,一般在主协程中会accept新的connection,使用异步编程我们知道,如果没有 新连接到来,该协程会一直被阻塞,直到新连接到来有人唤醒了该协程。

一般在主协程中调用accept,如果返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来时被唤醒,WaitRead以及唤醒过程我们会在后面仔细分析。

Read

func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    return c.fd.Read(b)
}

func (fd *netFD) Read(p []byte) (n int, err error) {
    // 为什么对函数调用加读锁
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    // 这个又是干嘛?
    if err := fd.pd.PrepareRead(); err != nil {
        return 0, &OpError{"read", fd.net, fd.raddr, err}
    }
    for {
        n, err = syscall.Read(int(fd.sysfd), p)
        if err != nil {
            n = 0
            // 如果返回EAGIN,阻塞当前协程直到有数据可读被唤醒
            if err == syscall.EAGAIN {
                if err = fd.pd.WaitRead(); err == nil {
                    continue
                }
            }
        }
        // 检查错误,封装io.EOF
        err = chkReadErr(n, err, fd)
        break
    }
    if err != nil && err != io.EOF {
        err = &OpError{"read", fd.net, fd.raddr, err}
    }
    return
}

func chkReadErr(n int, err error, fd *netFD) error {
    if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
        return io.EOF
    }
    return err
}

Read的流程与Accept流程极其一致,阅读起来也很简单。相信不用作过多解释,自己看吧。 需要注意的是每次Read不能保证可以读到想读的那么多内容,比如缓冲区大小是10,而实际可能只读到5,应用程序需要能够处理这种情况。

Write

func (fd *netFD) Write(p []byte) (nn int, err error) {
    // 为什么这里加写锁
    if err := fd.writeLock(); err != nil {
        return 0, err
    }
    defer fd.writeUnlock()
    // 这个是干什么?
    if err := fd.pd.PrepareWrite(); err != nil {
        return 0, &OpError{"write", fd.net, fd.raddr, err}
    }
    // nn记录总共写入的数据量,每次Write可能只能写入部分数据
    for {
        var n int
        n, err = syscall.Write(int(fd.sysfd), p[nn:])
        if n > 0 {
            nn += n
        }
        // 如果数组数据已经全部写完,函数返回
        if nn == len(p) {
            break
        }
        // 如果写入数据时被block了,阻塞当前协程
        if err == syscall.EAGAIN {
            if err = fd.pd.WaitWrite(); err == nil {
                continue
            }
        }
        if err != nil {
            n = 0
            break
        }
        // 如果返回值为0,代表了什么?
        if n == 0 {
            err = io.ErrUnexpectedEOF
            break
        }
    }
    if err != nil {
        err = &OpError{"write", fd.net, fd.raddr, err}
    }
    return nn, err
}

注意Write语义与Read不一样的地方:

Write尽量将用户缓冲区的内容全部写入至底层socket,如果遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时立即返回,可能会导致读取的数据量少于用户缓冲区的大小; 为什么会在实现上有此不同,我想可能read的优先级比较高吧,应用程序可能一直在等着,我们不能等到数据一直读完才返回,会阻塞用户。 而写不一样,优先级相对较低,而且用户一般也不着急写立即返回,所以可以将所有的数据全部写入,而且这样 也能简化应用程序的写法。

总结

上面我们基本说完了golang网络编程内的关键API流程,我们遗留了一个关键内容:当系统调用返回EAGAIN时,会 调用WaitRead/WaitWrite来阻塞当前协程,我会在接下来的章节中继续分析。


有疑问加站长微信联系(非本文作者)

本文来自:知乎专栏

感谢作者:丁凯

查看原文:Golang网络:核心API实现剖析(一)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

3636 次点击  
加入收藏 微博
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传