说明
前面的章节我们基本聊完了golang网络编程的关键API流程,但遗留了一个关键内容:当系统调用返回EAGAIN时,会调用WaitRead/WaitWrite来阻塞当前协程,现在我们接着聊。
WaitRead/WaitWrite
func (pd *pollDesc) Wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res)
}
func (pd *pollDesc) WaitRead() error {
return pd.Wait('r')
}
func (pd *pollDesc) WaitWrite() error {
return pd.Wait('w')
}
最终runtime_pollWait走到下面去了:
TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0
JMP runtime·netpollWait(SB)
我们仔细考虑应该明白:netpollWait的主要作用是:等待关心的socket是否有事件(其实后面我们知道只是等待一个标记位是否发生改变),如果没有事件,那么就将当前的协程挂起,直到有通知事件发生,我们接下来看看到底如何实现:
func netpollWait(pd *pollDesc, mode int) int {
// 先检查该socket是否有error发生(如关闭、超时等)
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris uses level-triggered IO.
if GOOS == "solaris" {
onM(func() {
netpollarm(pd, mode)
})
}
// 循环等待netpollblock返回值为true
// 如果返回值为false且该socket未出现任何错误
// 那该协程可能被意外唤醒,需要重新被挂起
// 还有一种可能:该socket由于超时而被唤醒
// 此时netpollcheckerr就是用来检测超时错误的
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 首先将轮询状态设置为pdWait
// 为什么要使用for呢?因为casuintptr使用了自旋锁
// 为什么使用自旋锁就要加for循环呢?
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
gothrow("netpollblock: double wait")
}
// 将socket轮询相关的状态设置为pdWait
if casuintptr(gpp, 0, pdWait) {
break
}
}
// 如果未出错将该协程挂起,解锁函数是netpollblockcommit
if waitio || netpollcheckerr(pd, mode) == 0 {
f := netpollblockcommit
gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
}
// 可能是被挂起的协程被唤醒
// 或者由于某些原因该协程压根未被挂起
// 获取其当前状态记录在old中
old := xchguintptr(gpp, 0)
if old > pdWait {
gothrow("netpollblock: corrupted state")
}
return old == pdReady
}
从上面的分析我们看到,如果无法读写,golang会将当前协程挂起,在协程被唤醒的时候,该标记位应该会被置位。 我们接下来看看这些挂起的协程何时会被唤醒。
事件通知
golang运行库在系统运行过程中存在socket事件检查点,目前,该检查点主要位于以下几个地方:
runtime·startTheWorldWithSema(void):在完成gc后;
findrunnable():这个暂时不知道何时会触发?
sysmon:golang中的监控协程,会周期性检查就绪socket
TODO: 为什么是在这些地方检查socket就绪事件呢?
接下来我们看看如何检查socket就绪事件,在socket就绪后又是如何唤醒被挂起的协程?主要调用函数runtime-netpoll()
我们只关注epoll的实现,对于epoll,上面的方法具体实现是netpoll_epoll.go中的netpoll
func netpoll(block bool) (gp *g) {
if epfd == -1 {
return
}
waitms := int32(-1)
if !block {
// 如果调用者不希望block
// 设置waitsm为0
waitms = 0
}
var events [128]epollevent
retry:
// 调用epoll_wait获取就绪事件
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
...
}
goto retry
}
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
// 对每个事件,调用了netpollready
// pd主要记录了与该socket关联的等待协程
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
}
}
// 如果调用者同步等待且本次未获取到就绪socket
// 继续重试
if block && gp == nil {
goto retry
}
return gp
}
这个函数主要调用epoll_wait(当然,golang封装了系统调用)来获取就绪socket fd,对每个就绪的fd,调用netpollready()作进一步处理。这个函数的最终返回值就是一个已经就绪的协程(g)链表。
netpollready主要是将该socket fd标记为IOReady,并唤醒等待在该fd上的协程g,将其添加到传入的g链表中。
// make pd ready, newly runnable goroutines (if any) are returned in rg/wg
func netpollready(gpp **g, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
// 将就绪协程添加至链表中
if rg != nil {
rg.schedlink = *gpp
*gpp = rg
}
if wg != nil {
wg.schedlink = *gpp
*gpp = wg
}
}
// 将pollDesc的状态置为pdReady并返回就绪协程
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
if casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}
疑问:一个fd会被多个协程同时进行IO么?比如一个协程读,另外一个协程写?或者多个协程同时读?此时返回的是哪个协程就绪呢?
一个socket fd可支持并发读写,因为对于tcp协议来说,是全双工。读写操作的是不同缓冲区,但是不支持并发读和并发写,因为这样会错乱的。所以上面的netFD.RWLock()就是干这个作用的。
有疑问加站长微信联系(非本文作者)