在讨论Go编程语言时,经常被提起的一个特点是使用goroutines;这是一种轻量级进程,可以并发运行成千上万的goroutines。许多其它编程语言使用操作系统提供的线程来支持并发任务。线程的缺点是它们是比较重的,因此只能运行数百个线程,然后才会遇到可扩展性问题。这些问题在实时更新与大量客户端场景下尤为明显。
通常,我们可以人云亦云地说:goroutine是线程的"轻量级"的版本。但如何才能在功能不变的情况下,做到轻量级呢。我最终深入到Go的runtime
源代码中去寻找答案。在这篇文章中,我将通过实现一个简单的Go程序来展示Go的调度机制是如何工作的。
任务调度
Goroutines是建立在事件驱动的架构上。当一个事件发生时,与该事件相关的任务会被放在一个队列中。事件循环通过队列,逐一执行任务。如果触发的任务需要很长时间才能执行,会怎么样呢?那么队列上的其他事件都会被阻塞。这不正是我们要使用多线程的原因,这样才能保证及时响应吗?如果某个任务占用处理器的时间过长,这个线程就会被调度器打断,进而让其他线程去做他们的任务。问题是,我们得到的吞吐量比较低,因为我们在切换任务的时候,必须花时间把半成品收起来。举例来说,保存半成品的工作可以是例如我们想要一起做乘法的变量,并且是占用几个CPU寄存器。在这种情况下,我们将不得不来回交换所有这些寄存器。
Goroutines试图通过让任务在适当的时候调用调度器本身来解决事件驱动方式的阻塞问题。这通常发生在任务必须等待一些输入或输出而又无事可做的时候。在Go 1.2中,函数调用也会触发调度器,因为无论如何都要把CPU寄存器交给调用方。Go还通过在不同的CPU核上运行并行事件循环来降低阻塞的风险,但我们在这里就不提了。
Echoserver示例
让我们从一个简单的服务器开始,为每个新的TCP连接启动一个goroutine。为了简洁起见,这里省略了错误处理,但你可以在我的Github repo上找到完整的代码。
func main() {
addr, _ := net.ResolveTCPAddr("tcp", ":7777")
listener, _ := net.ListenTCP("tcp", addr)
replyGoroutine(listener)
}
func replyGoroutine(listener net.Listener) {
for {
conn, _ := listener.Accept()
go func() {
buf := make([]byte, 16)
conn.Read(buf)
log.Printf("received: %s", buf)
conn.Write(bytes.ToUpper(buf))
conn.Close()
}()
}
}
所有的Accept()
、Read()
和Write()
调用都是在等待一些外部操作的完成,等待时恰好是切换到另一个任务的最佳时机。在等待点调用调度器runtime.Gosched()
,这样进程就可以切换到另一个有工作要做的goroutine。
我们可以在Bash中使用Netcat来测试这段代码。
$ echo "Hello World" | nc localhost 7777
HELLO WORLD
不用goroutine
上面,我们已经满足于 "等待某个外部操作完成 "时做其他事情的解释。但这究竟是如何工作的呢?为了充分理解这一点,我们必须深入研究UNIX的polling和文件描述符的工作原理。我们通过在上面的同一个例子中使用它们,并自己实现我们自己的伪goroutine的调度逻辑。
文件描述符是一种可以处理外部资源的输入、输出和其他相关操作的资源。它们在读/写文件时使用,也可以用于在TCP端口上监听新的客户端和处理一个开放的TCP连接。我们可以通过UNIX中的accept()
、read()
和write()
等函数来访问这些资源。问题是这些函数一次只能处理一个资源。幸运的是,我们可以使用UNIX的polling来同时观察多个资源上的事件。
在我们的例子中,我们使用的是epoll系统调用,它只支持Linux。Go运行时以类似的方式使用相同的系统调用。在Go中,你可以通过golang.org/x/sys/unix
包来访问系统调用。
type GoroutineState struct {
connFile *os.File
buffer []byte
}
与每个goroutine相关的变量是用我们的GoroutineState来模拟的,它以TCP连接的文件描述符为键存储在一个map中。
接下来,我们用EpollWait()
实现事件循环,在这里,它监视来自TCP监听器和TCP连接的文件描述符事件。EpollCtl()
用于改变监视事件的资源集。可以在Github repo查看完整的代码,了解完整的错误处理。
func replierPoll(listener *net.TCPListener) {
epollFd, _ := unix.EpollCreate(8)
// UNIX represents a TCP listener socket as a file
listenerFile, _ := listener.File()
// Add the TCP listener to the set of file descriptors being polled
listenerPoll := unix.EpollEvent{
Fd: int32(listenerFile.Fd()),
Events: unix.POLLIN, // POLLIN triggers on accept()
Pad: 0, // Arbitary data
}
unix.EpollCtl(epollFd, unix.EPOLL_CTL_ADD, int(listenerPoll.Fd), &listenerPoll)
// Map EpollEvent.Pad to the connection state
states := map[int]*GoroutineState{}
for {
// Wait infinitely until at least one new event is happening
var eventsBuf [10]unix.EpollEvent
unix.EpollWait(epollFd, eventsBuf[:], -1)
// Go though every event occured; most often len(eventsBuf) == 1
for _, event := range eventsBuf {
if event.Fd == listenerPoll.Fd {
// Handle new connection
// AcceptTCP() will now return immediately
conn, _ := listener.AcceptTCP()
// Equal to creating a new goroutine
newState := addNewClientPoll(epollFd, conn)
fd := int(newState.connFile.Fd())
states[fd] = newState
continue
}
// Handle existing connection
fd := int(event.Pad)
state := states[fd]
if event.Events == unix.POLLIN {
state.connFile.Read(state.buffer)
log.Printf("received: %s", state.buffer)
// Equal to switching away the goroutine
newPoll := event
newPoll.Events = unix.POLLOUT
unix.EpollCtl(epollFd, unix.EPOLL_CTL_MOD, fd, &newPoll)
} else if event.Events == unix.POLLOUT {
state.connFile.Write(bytes.ToUpper(state.buffer))
state.connFile.Close()
// Equal to stopping the goroutine
unix.EpollCtl(epollFd, unix.EPOLL_CTL_DEL, fd, nil)
delete(states, fd)
}
}
}
}
我们的事件循环正在等待三种类型的事件。
- 新的连接: TCP端口监听器触发POLLIN事件,
AcceptTCP()
则立即返回 - 从TCP客户端接收数据: 客户端套接字触发POLLIN事件,
Read()
立即返回。 - 缓冲区中的可用空间要发送给TCP客户端: 客户端套接字触发POLLOUT事件,
Write()
立即返回。
这里是将GoroutineState添加到新的连接中的代码:
func addNewClientPoll(epollFd int, conn *net.TCPConn) *GoroutineState {
connFile, _ := conn.File()
conn.Close() // Close this an use the connFile copy instead
newState := GoroutineState{
connFile: connFile,
buffer: make([]byte, 16),
}
fd := int(connFile.Fd())
connPoll := unix.EpollEvent{
Fd: int32(fd),
Events: unix.POLLIN, // POLLIN triggers on accept()
Pad: int32(fd), // So we can find states[fd] when triggered
}
unix.EpollCtl(epollFd, unix.EPOLL_CTL_ADD, fd, &connPoll)
return &newState
}
epoll只支持Linux。但在其他操作系统上也可以找到类似的系统调用,比如Mac/BSD上的kqueue()和POSIX系统上扩展性较差的poll()。
结论
我们可以看到,Go使用了事件驱动架构的技术,而程序员不必了解它。JavaScript/Node.js也遵循了类似的做法,但需要程序员为其编写代码,并思考潜在的阻塞问题。在Go中,你很少需要思考这个问题。从我们的例子中,我们还可以看到,访问UNIX系统调用非常容易,因为unix包提供了一个友好的封闭以供在Go编程中调用。
(旁注:线程切换是由OS完成,调度也是OS来做,上下文切换费时费力;go有自己的调度器,对go来说,goroutine是调度单元,goroutine切换也是在用户态完成,goroutine需要OS线程来最终运行,所以可以尽可能利用CPU,从编码角度讲,goroutine当然比线程轻量,毕竟同样功能代码量更少,go的runtime帮程序员完成了很多事情。作者的切入点很有意思,看起来像是在做一个代码级别的比较,其实引申出了很多OS底层内容,比如事件驱动)
参考
- How to stop Go’s scheduler loop from working: A pitfall of golang scheduler
- Deeper coverage of the scheduler concepts: Analysis of the Go runtime scheduler
有疑问加站长微信联系(非本文作者)