百万 Go TCP 连接的思考2: 百万连接的吞吐率和延迟

smallnest · · 784 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

上一篇epoll方式减少资源占用 介绍了测试环境以及epoll方式实现百万连接的TCP服务器。这篇文章介绍百万连接服务器的几种实现方式,以及它们的吞吐率和延迟。

这几种服务器的实现包括:epollmultiple epollerpreforkworkerpool

第一篇 百万 Go TCP 连接的思考: epoll方式减少资源占用
第二篇 百万 Go TCP 连接的思考2: 百万连接的吞吐率和延迟
第三篇 百万 Go TCP 连接的思考: 正常连接下的吞吐率和延迟

相关代码已发布到github上: 1m-go-tcp-server

三、 epoll服务器加上吞吐率指标

上一篇已经介绍了epoll方式的实现,为了测试吞吐率,我们需要通过传递特殊的数据来计算。

客户端将它发送数据时的时间戳传给服务器,这个时间戳只需要8个字节,服务器不需要任何改动,只需要原封不动的将数据回传给客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
......
var (
opsRate = metrics.NewRegisteredMeter("ops", nil)
)
func start() {
for {
connections, err := epoller.Wait()
if err != nil {
log.Printf("failed to epoll wait %v", err)
continue
}
for _, conn := range connections {
if conn == nil {
break
}
// 将消息(时间戳)原封不动的写回
_, err = io.CopyN(conn, conn, 8)
if err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
conn.Close()
}
opsRate.Mark(1)
}
}
}

这里epoll我们并没有注册为边缘触发的方式,默认是水平触发的方式。

每次读取8个字节(时间戳),然后返回给客户端。同时metric记录一次。

metric库使用的是rcrowley/go-metrics

四、客户端也修改为epoll方式

客户端不再发送hello world数据,而是当前的时间戳,收到服务器的返回后,就可以计算出一次请求的总共的花费(延迟,latency),然后发送下一个请求。

所以客户端的测试并不是pipeline的方式,以下所有的测试都不是pipeline的方式,而是收到返回再发下一个请求。

客户端也需要改成epoll的方式,原先一个goroutine轮训所有的连接的方式性能比较底下,所以改成epoll的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"encoding/binary"
"flag"
"fmt"
"log"
"net"
"os"
"syscall"
"time"
"github.com/rcrowley/go-metrics"
)
var (
ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")
startMetric = flag.String("sm", time.Now().Format("2006-01-02T15:04:05 -0700"), "start time point of all clients")
)
var (
opsRate = metrics.NewRegisteredTimer("ops", nil)
)
var epoller *epoll
// client改造成epoll方式, 处理epoll消息是单线程的
func main() {
flag.Parse()
go func() {
startPoint, err := time.Parse("2006-01-02T15:04:05 -0700", *startMetric)
if err != nil {
panic(err)
}
time.Sleep(startPoint.Sub(time.Now()))
metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))
}()
var err error
epoller, err = MkEpoll()
if err != nil {
panic(err)
}
addr := *ip + ":8972"
log.Printf("连接到 %s", addr)
var conns []net.Conn
for i := 0; i < *connections; i++ {
c, err := net.DialTimeout("tcp", addr, 10*time.Second)
if err != nil {
fmt.Println("failed to connect", i, err)
i--
continue
}
if err := epoller.Add(c); err != nil {
log.Printf("failed to add connection %v", err)
c.Close()
}
conns = append(conns, c)
}
log.Printf("完成初始化 %d 连接", len(conns))
tts := time.Second
if *connections > 100 {
tts = time.Millisecond * 5
}
go start()
for i := 0; i < len(conns); i++ {
time.Sleep(tts)
conn := conns[i]
err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano())
if err != nil {
log.Printf("failed to write timestamp %v", err)
if err := epoller.Remove(conn); err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
}
}
}
select {}
}
func start() {
var nano int64
for {
connections, err := epoller.Wait()
if err != nil {
log.Printf("failed to epoll wait %v", err)
continue
}
for _, conn := range connections {
if conn == nil {
break
}
if err := binary.Read(conn, binary.BigEndian, &nano); err != nil {
log.Printf("failed to read %v", err)
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
conn.Close()
continue
} else {
opsRate.Update(time.Duration(time.Now().UnixNano() - nano))
}
err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano())
if err != nil {
log.Printf("failed to write %v", err)
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
conn.Close()
}
}
}
}

使用的epoll实现代码和服务器端是一样的。

客户端的统计会遇到一个问题,因为我们会启动50个docker容器,计算客户端的吞吐率的时候我们需要统计同一个时间段内这50个容器所有的请求和延迟。这里我们用了一个小小的技巧,让metrics库再同一个时间打印出它们的统计数据,基本可以保证统计的是这50个容器的同一个时间段内的指标。

数据分析

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 42495, 延迟(latency)为 23秒

五、客户端改为多个epoller

在上面的实现中,我们的客户端使用一个epoller处理所有的请求, 在事件监听的处理中,使用一个goroutine处理接收的所有的事件,如果处理事件比较慢,这个单一的goroutine将会是严重的瓶颈。

所以我们要把它改成多goroutine的方式去处理。一种方式是启动一个线程池,采用多event loop的方式处理事件,另外一种方式是使用多个epoller, 每个epoller处理一批连接,每个epoller独自占用一个goroutine。 我们的客户端采用第二种方式,实现起来比较简单。

Linux的Accept和epoller都曾有惊群的现象,也就是一个一个事件到来后会唤醒所有的监听的线程,目前这个问题应该已经不存在了。

client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func main() {
flag.Parse()
setLimit()
go func() {
startPoint, _ := time.Parse("2006-01-02T15:04:05 -0700", *startMetric)
time.Sleep(startPoint.Sub(time.Now()))
metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))
}()
addr := *ip + ":8972"
log.Printf("连接到 %s", addr)
for i := 0; i < *c; i++ {
go mkClient(addr, *connections/(*c))
}
select {}
}
func mkClient(addr string, connections int) {
epoller, err := MkEpoll()
if err != nil {
panic(err)
}
var conns []net.Conn
for i := 0; i < connections; i++ {
c, err := net.DialTimeout("tcp", addr, 10*time.Second)
if err != nil {
fmt.Println("failed to connect", i, err)
i--
continue
}
if err := epoller.Add(c); err != nil {
log.Printf("failed to add connection %v", err)
c.Close()
}
conns = append(conns, c)
}
log.Printf("完成初始化 %d 连接", len(conns))
go start(epoller)
tts := time.Second
if *c > 100 {
tts = time.Millisecond * 5
}
for i := 0; i < len(conns); i++ {
time.Sleep(tts)
conn := conns[i]
err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano())
if err != nil {
log.Printf("failed to write timestamp %v", err)
if err := epoller.Remove(conn); err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
}
}
}
select {}
}
func start(epoller *epoll) {
...... //同上
}

测试脚本稍微一下,增加一个epoller数量的控制:

1
2
3
4
5
6
7
8
9
10
11
12
CONNECTIONS=$1
REPLICAS=$2
IP=$3
CONCURRENCY=$4
DATE=`date -d "+2 minutes" +"%FT%T %z"`
for (( c=0; c<${REPLICAS}; c++ ))
do
docker run -v $(pwd)/mclient:/client --name 1mclient_$c -d alpine /client \
-conn=${CONNECTIONS} -ip=${IP} -c=${CONCURRENCY} -sm "${DATE}"
done

数据分析

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 42402, 延迟(latency)为 0.8秒

吞吐率并没有增加,但是得益于我们客户端可以并发的处理消息,可以大大减小事务的延迟,将相关的延迟可以降低到一秒以下。

六、服务器改为多个epoller

基于我们上面客户端使用多个epoller的启发,我们可以修改服务器端也采用多个epoller的方式,看看是否能增加吞吐率或者降低延迟。

server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main
import (
"flag"
"io"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"syscall"
"time"
"github.com/libp2p/go-reuseport"
"github.com/rcrowley/go-metrics"
)
var (
c = flag.Int("c", 10, "concurrency")
)
var (
opsRate = metrics.NewRegisteredMeter("ops", nil)
)
func main() {
flag.Parse()
go metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))
go func() {
if err := http.ListenAndServe(":6060", nil); err != nil {
log.Fatalf("pprof failed: %v", err)
}
}()
for i := 0; i < *c; i++ {
go startEpoll()
}
select {}
}
func startEpoll() {
ln, err := reuseport.Listen("tcp", ":8972")
if err != nil {
panic(err)
}
epoller, err := MkEpoll()
if err != nil {
panic(err)
}
go start(epoller)
for {
conn, e := ln.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
log.Printf("accept temp err: %v", ne)
continue
}
log.Printf("accept err: %v", e)
return
}
if err := epoller.Add(conn); err != nil {
log.Printf("failed to add connection %v", err)
conn.Close()
}
}
}
func start(epoller *epoll) {
for {
connections, err := epoller.Wait()
if err != nil {
log.Printf("failed to epoll wait %v", err)
continue
}
for _, conn := range connections {
if conn == nil {
break
}
io.CopyN(conn, conn, 8)
if err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
conn.Close()
}
opsRate.Mark(1)
}
}
}

和客户端的类似,我们启动了多个epoller。这里我们使用reuseport库启动多个goroutine监听同一个端口,这个特性应该在较新的Linux内核上已经支持, 内核会负责负载均衡。

当然我们也可以启动一个goroutine进行监听,接收到客户端的请求后在交给某个epoller进行处理(随机或者轮询),我们就负责连接的负载均衡。

再或者,多个goroutine可以同时调用同一个listener.Accept方法,对Accept进行竞争。

后面的处理逻辑和单个的epoller的方式是一样的,只不过我们使用多个goroutine进行处理。

数据分析

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 197814, 延迟(latency)为 0.9秒

以下所有的测试都使用多epoller的客户端,下面的比较也是针对多epoller的客户端的测试:

和单poller的服务器实现相比较,多epoller的服务器客户端吞吐率大幅增加,而延迟略微增加。

七、 prefork实现服务器

Prefork 是Apache实现的一种服务方式。一个单一的控制进程启动的时候负责启动多个子进程,每个子进程都是独立的,使用单一的goroutine处理消息事件。

这是一个有趣的实现方式,子进程可以共享父进程打开的文件,这样我们就可以把net.Listener传给子进程,让所有的子进程共同监听这个端口。

传递给子进程的文件是通过exec.Cmd.ExtraFiles字段进行传递的:

1
2
3
4
5
6
7
8
9
10
type Cmd struct {
......
// ExtraFiles specifies additional open files to be inherited by the
// new process. It does not include standard input, standard output, or
// standard error. If non-nil, entry i becomes file descriptor 3+i.
//
// ExtraFiles is not supported on Windows.
ExtraFiles []*os.File
......
}

正如注释中所指出的,传递的第i个文件在子进程中的文件描述符为 3+i,所以如果父进程中启动子进程的命令如下的话:

1
2
3
4
5
6
a_file_descriptor, _ := tcplistener.File()
children[i] = exec.Command(os.Args[0], "-prefork", "-child")
children[i].Stdout = os.Stdout
children[i].Stderr = os.Stderr
children[i].ExtraFiles = []*os.File{a_file_descriptor}

子进程你可以这样得到这个父进程的文件:

1
listener, err = net.FileListener(os.NewFile(3, ""))

我们实现的是父进程和子进程共享同一个listener的方式, 如果你使用reuseport在每个子进程打开同一个端口应该也是可以的,这样就父子之间不需要共享同一个文件了。

完整的服务器实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
"flag"
"io"
"log"
"net"
"os"
"os/exec"
"syscall"
)
var (
c = flag.Int("c", 10, "concurrency")
prefork = flag.Bool("prefork", false, "use prefork")
child = flag.Bool("child", false, "is child proc")
)
func main() {
flag.Parse()
var ln net.Listener
var err error
if *prefork {
ln = doPrefork(*c)
} else {
ln, err = net.Listen("tcp", ":8972")
if err != nil {
panic(err)
}
}
startEpoll(ln)
select {}
}
func startEpoll(ln net.Listener) {
epoller, err := MkEpoll()
if err != nil {
panic(err)
}
go start(epoller)
for {
conn, e := ln.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
log.Printf("accept temp err: %v", ne)
continue
}
log.Printf("accept err: %v", e)
return
}
if err := epoller.Add(conn); err != nil {
log.Printf("failed to add connection %v", err)
conn.Close()
}
}
}
func doPrefork(c int) net.Listener {
var listener net.Listener
if !*child {
addr, err := net.ResolveTCPAddr("tcp", ":8972")
if err != nil {
log.Fatal(err)
}
tcplistener, err := net.ListenTCP("tcp", addr)
if err != nil {
log.Fatal(err)
}
fl, err := tcplistener.File()
if err != nil {
log.Fatal(err)
}
children := make([]*exec.Cmd, c)
for i := range children {
children[i] = exec.Command(os.Args[0], "-prefork", "-child")
children[i].Stdout = os.Stdout
children[i].Stderr = os.Stderr
children[i].ExtraFiles = []*os.File{fl}
err = children[i].Start()
if err != nil {
log.Fatalf("failed to start child: %v", err)
}
}
for _, ch := range children {
if err := ch.Wait(); err != nil {
log.Printf("failed to wait child's starting: %v", err)
}
}
os.Exit(0)
} else {
var err error
listener, err = net.FileListener(os.NewFile(3, ""))
if err != nil {
log.Fatal(err)
}
}
return listener
}
func start(epoller *epoll) {
for {
connections, err := epoller.Wait()
if err != nil {
log.Printf("failed to epoll wait %v", err)
continue
}
for _, conn := range connections {
if conn == nil {
break
}
io.CopyN(conn, conn, 8)
if err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
conn.Close()
}
}
}
}

数据分析

服务器启动50个子进程: ./server -c 50 -prefork

客户端还是一样: ./setupm.sh 20000 50 172.17.0.1 10

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 444415, 延迟(latency)为 1.5秒

和多poller的服务器实现相比较,prefork的服务器客户端吞吐率又大大幅增加,而延迟相对长一些了,比多poller的实现延迟翻倍。

八、 服务器实现workerpool

从单个poller的代码分析可知,单goroutine处理消息到来的事件可能会有瓶颈,尤其是并发量比较大的情况下,无法使用多核的优势,因为我们采用多poller、prefork的方式可以并发地处理到来的消息,这里还有一种Reactor的方式,将I/O goroutine和业务goroutine分离, I/O goroutine采用单goroutine的方式,监听的消息交给一个goroutine池 (workerpool)去处理,这样可以并行的处理业务消息,而不会阻塞I/O goroutine。

这里实现的消息读取也是在 workerpool 中实现的, 一般更通用的方式是I/O goroutine解析出消息, 将解析好的消息再交给workerpool去处理。我们这里的例子比较简单,所以读取消息也在workerpool中实现。

worker pool的实现如下:

workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package main
import (
"io"
"log"
"net"
"sync"
)
type pool struct {
workers int
maxTasks int
taskQueue chan net.Conn
mu sync.Mutex
closed bool
done chan struct{}
}
func newPool(w int, t int) *pool {
return &pool{
workers: w,
maxTasks: t,
taskQueue: make(chan net.Conn, t),
done: make(chan struct{}),
}
}
func (p *pool) Close() {
p.mu.Lock()
p.closed = true
close(p.done)
close(p.taskQueue)
p.mu.Unlock()
}
func (p *pool) addTask(conn net.Conn) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return
}
p.mu.Unlock()
p.taskQueue <- conn
}
func (p *pool) start() {
for i := 0; i < p.workers; i++ {
go p.startWorker()
}
}
func (p *pool) startWorker() {
for {
select {
case <-p.done:
return
case conn := <-p.taskQueue:
if conn != nil {
handleConn(conn)
}
}
}
}
func handleConn(conn net.Conn) {
_, err := io.CopyN(conn, conn, 8)
if err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
conn.Close()
}
opsRate.Mark(1)
}

服务器端代码改造:

server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
var epoller *epoll
var workerPool *pool
func main() {
flag.Parse()
go metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))
ln, err := net.Listen("tcp", ":8972")
if err != nil {
panic(err)
}
go func() {
if err := http.ListenAndServe(":6060", nil); err != nil {
log.Fatalf("pprof failed: %v", err)
}
}()
workerPool = newPool(*c, 1000000)
workerPool.start()
epoller, err = MkEpoll()
if err != nil {
panic(err)
}
go start()
for {
conn, e := ln.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
log.Printf("accept temp err: %v", ne)
continue
}
log.Printf("accept err: %v", e)
return
}
if err := epoller.Add(conn); err != nil {
log.Printf("failed to add connection %v", err)
conn.Close()
}
}
workerPool.Close()
}
func start() {
for {
connections, err := epoller.Wait()
if err != nil {
log.Printf("failed to epoll wait %v", err)
continue
}
for _, conn := range connections {
if conn == nil {
break
}
workerPool.addTask(conn)
}
}
}

数据分析

服务器启动50个子进程: ./server -c 50 -prefork

客户端还是一样: ./setupm.sh 20000 50 172.17.0.1 10

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 190022, 延迟(latency)为 0.3秒


总结

吞吐率 (tps) 延迟 (latency)
goroutine-per-conn 202830 4.9s
单epoller(单epoller client) 42495 23s
单epoller 42402 0.8s
多epoller 197814 0.9s
prefork 444415 1.5s
workerpool 190022 0.3s

从上表可以看出,客户端的实现对测试结果影响也是巨大的,不过实际我们的客户端分布在不同的节点上,而不像我们的测试不得不使用同一台机器启动百万个节点,所以下面的测试都是通过多epoller client进行测试的,尽量让客户端能并发的处理消息。

从测试结果来看, 在百万并发的情况下, workerpool的实现还是不错的, 既能达到很高的吞吐率(19万), 还能取得 0.3秒的延迟, 而且使用小量的goroutine的worker pool也不会占用太多的系统资源。prefork可以大幅提高吞吐率,但是延迟要稍微长一些。

以上是在巨量连接情况下的各种实现的吞吐率和延迟的测试,这是一类的应用场景, 还有一类很大的应用场景, 比如企业内的服务通讯, 连接数并不会很多,我们将介绍这类场景下几种实现方案的吞吐率和延迟。


有疑问加站长微信联系(非本文作者)

本文来自:鸟窝

感谢作者:smallnest

查看原文:百万 Go TCP 连接的思考2: 百万连接的吞吐率和延迟

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

784 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传