Go语言并行之美 -- 超越 "Hello World"

haibinzhang · · 2514 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

        偶尔学习一种新的编程语言是件好事,但不能仅止步于 “Hello World"。


        时常学习一种新的编程语言对你有好处,即使这种语言不会流行起来或者已经过时。使用新的语言处理旧的问题会促使你重新思考你当前处理问题的视角、方法和习惯。


        我喜欢尝试新鲜的事物,特别是编程语言。但是,当你用新的语言实现了“你好,世界!”或者斐波那契序列之后,通常你会感到基本上再没什么可做的,没有任何新奇的地方。你可以试着实现埃拉托斯特尼筛法,借此探索一点数据结构和算法性能。但是我想要一些实际的东西,可能以后还会被复用。因此,不久前我自己创造了一个问题,这个问题可以帮助我仅用几百行代码去熟悉一种语言。


        问题涉及了一种语言几个非常重要的方面:字符串,文件和网络输入输出,当然还有并行。称这个问题为 TCP/IP 代理(或者你可以称它 网络调试器)。问题的想法:你有一个TCP/IP侦听器(单线程或多线程)在给定的端口接受连接,当它接受到连接(调用者)时,它必须连接另外一台主机(远程主机),并且在调用者和远程主机间全双工传输数据。另外,代理还可以使用各种格式的日志记录通信内容,用来帮助分析数据。


        我不再计算需要使用这种工具的场合。任何时候,只要涉及到网络编程,这种工具必不可少。我已经用不同的语言实现了这种工具很多次:C,C++,Perl,PHP。最近的两个实现用的是Python和Erlang。这种工具代表了我正在寻找的那种实际问题。


        我可以指定更具体的需求。应用必须能够同时服务多个连接。对于每个连接,它需要以三种方式记录数据:一个以十六进制格式导出的表示双向顺序数据的导出日志文件;两个二进制日志文件分别记录输入和输出数据流。


        这篇文章我将用Go语言来实现这个程序。Go语言的作者声称Go语言血脉中就支持并行多线程。我打算让它们名符其实。


        如果借助boost库使用C++开发这个程序的话,我很可能选择主侦听器线程加上用于服务每个连接的线程。由此,一个单独的连接将完全占用一个线程。


        下面是用Go语言实现的程序中用来服务每个连接的线程:

1. 一个双向十六进制导出器线程

2. 两个线程以二进制格式记录输入和输出数据流

3. 两个线程用来在本地和远端主机间双向传输数据

        总共5个线程。


       再强调一遍,五个线程用来服务每一个单独的连接。我实现所有这些线程不是因为多线程本身的缘故,而是因为Go语言鼓励多线程,C++恰恰相反。Go语言本身就支持多线程,使用起来非常简单。我用Go语言实现TCP/IP代理时没有使用互斥信号量和条件变量。使用Go语言的管道(channels)可以优雅地实现线程同步。


        好吧,下面是源代码,带有解释。如果你不熟悉Go编程语言,注释应该会有帮助。我的目的不只关注程序功能,同时还关注Go语言本身。


让我们出发!


        2-11行声明了程序将要用到的包。需要注意,如果一个包被包含了但是没有用到,Go将认为这是一种错误,并强制删除没有用到的声明(还记得上次你想放弃并不耐烦地清理C++项目中STL包含进来的列表吗?)

 	 1     package main
 	 2     import (
 	 3       "encoding/hex"
 	 4       "flag"
 	 5       "fmt"
 	 6       "net"
 	 7       "os"
 	 8       "runtime"
 	 9       "strings"
 	10       "time"
 	11     )


        12-16行声明了用来表示命令行标识的全局变量。下面将看到如何解析它们。

 	12     var (
 	13       host *string = flag.String("host", "",
 	         "target host or address")
 	14       port *string = flag.String("port", "0", "target port")
 	15       listen_port *string = flag.String("listen_port", "0",
 	         "listen port")
 	16     )

        17-20行我们看到了Go语言可变参数函数参数的语法。
 	17     func die(format string, v ...interface{}) {
 	18       os.Stderr.WriteString(fmt.Sprintf(format+"\n", v...))
 	19       os.Exit(1)
 	20     }

        21-28行有两个函数用来启动十六进制导出和二进制日志器。区别仅在于日志名不同。

 	21     func connection_logger(data chan []byte, conn_n int,
 	           local_info, remote_info string) {
 	22       log_name := fmt.Sprintf("log-%s-%04d-%s-%s.log",
 	           format_time(time.Now()), conn_n, local_info, remote_info)
 	23       logger_loop(data, log_name)
 	24     }
 	25     func binary_logger(data chan []byte, conn_n int, peer string) {
 	26       log_name := fmt.Sprintf("log-binary-%s-%04d-%s.log",
 	           format_time(time.Now()), conn_n, peer)
 	27       logger_loop(data, log_name)
 	28     }

        29-43行是Go乐趣的开始,logger_loop函数创建一个日志文件,然后开始进入无限循环(35-42行)。36行代码等待来之管道data的消息。一个有意思的技巧在34行,defer操作符允许我们定义一个代码块,在函数域的末尾这个代码块一定会执行(类似于Java的 finally)。如果接收到的数据为空,函数退出。

 	29     func logger_loop(data chan []byte, log_name string) {
 	30       f, err := os.Create(log_name)
 	31       if err != nil {
 	32         die("Unable to create file %s, %v\n", log_name, err)
 	33       }
 	34       defer f.Close()
 	35       for {
 	36         b := <-data
 	37         if len(b) == 0 {
 	38           break
 	39         }
 	40         f.Write(b)
 	41         f.Sync()
 	42       }
 	43     }
 	44     func format_time(t time.Time) string {
 	45       return t.Format("2006.01.02-15.04.05")
 	46     }
 	47     func printable_addr(a net.Addr) string {
 	48       return strings.Replace(a.String(), ":", "-", -1)
 	49     }
 	50     type Channel struct {
 	51       from, to              net.Conn
 	52       logger, binary_logger chan []byte
 	53       ack                   chan bool
 	54     }

        55-58有一个函数从源套接字from读数据,写到日志文件,再将数据发送到目标套接字to。对于每一个连接有两个pass_through函数的实例在本地和远程套接字间向相反的方向拷贝数据。当I/O错误出现时,视为连接断开。最后,79行的函数向主线程发送确认,通知pass_through函数终止。

 	55     func pass_through(c *Channel) {
 	56       from_peer := printable_addr(c.from.LocalAddr())
 	57       to_peer := printable_addr(c.to.LocalAddr())
 	
 	58       b := make([]byte, 10240)
 	59       offset := 0
 	60       packet_n := 0
 	61       for {
 	62         n, err := c.from.Read(b)
 	63         if err != nil {
 	64           c.logger <- []byte(fmt.Sprintf("Disconnected from %s\n",
 	               from_peer))
 	65           break
 	66         }
 	67         if n > 0 {
 	68           c.logger <- []byte(fmt.Sprintf("Received (#%d, %08X)
 	               %d bytes from %s\n",
 	               packet_n, offset, n, from_peer))
 	69           c.logger <- []byte(hex.Dump(b[:n]))
 	70           c.binary_logger <- b[:n]
 	71           c.to.Write(b[:n])
 	72           c.logger <- []byte(fmt.Sprintf("Sent (#%d) to %s\n",
 	               packet_n, to_peer))
 	73           offset += n
 	74           packet_n += 1
 	75         }
 	76       }
 	77       c.from.Close()
 	78       c.to.Close()
 	79       c.ack <- true
 	80     }

        81-107行的函数处理整所有连接。它连接远程套接字(82行),测量连接持续时间(行88,101-103),启动日志器(行93-95),最后启动两个数据传输线程(行97-98)。函数pass_through运行直至通信两端都启动。行99-100等待来之数据传输线程的确认。行104-106终止日志器。

 	 81     func process_connection(local net.Conn, conn_n int, target string) {
 	 82       remote, err := net.Dial("tcp", target)
 	 83       if err != nil {
 	 84         fmt.Printf("Unable to connect to %s, %v\n", target, err)
 	 85       }
 	
 	 86       local_info := printable_addr(remote.LocalAddr())
 	 87       remote_info := printable_addr(remote.RemoteAddr())
 	
 	 88       started := time.Now()
 	
 	 89       logger := make(chan []byte)
 	 90       from_logger := make(chan []byte)
 	 91       to_logger := make(chan []byte)
 	 92       ack := make(chan bool)
 	
 	 93       go connection_logger(logger, conn_n, local_info, remote_info)
 	 94       go binary_logger(from_logger, conn_n, local_info)
 	 95       go binary_logger(to_logger, conn_n, remote_info)
 	
 	 96       logger <- []byte(fmt.Sprintf("Connected to %s at %s\n",
 	            target, format_time(started)))
 	
 	 97       go pass_through(&Channel{remote, local, logger, to_logger, ack})
 	 98       go pass_through(&Channel{local, remote, logger, from_logger, ack})
 	 99       <-ack // Make sure that the both copiers gracefully finish.
 	100       <-ack //
 	
 	101       finished := time.Now()
 	102       duration := finished.Sub(started)
 	103       logger <- []byte(fmt.Sprintf("Finished at %s, duration %s\n",
 	            format_time(started), duration.String()))
 	
 	104       logger <- []byte{}      // Stop logger
 	105       from_logger <- []byte{} // Stop "from" binary logger
 	106       to_logger <- []byte{}   // Stop "to" binary logger
 	107     }

        行108-132是运行TCP/IP侦听器的主函数。行109使Go运行环境使用所有物理可用的CPU。

 	108     func main() {
 	109       runtime.GOMAXPROCS(runtime.NumCPU())
 	110       flag.Parse()
 	111       if flag.NFlag() != 3 {
 	112         fmt.Printf("usage: gotcpspy -host target_host -port target_port
 	              -listen_post=local_port\n")
 	113         flag.PrintDefaults()
 	114         os.Exit(1)
 	115       }
 	116       target := net.JoinHostPort(*host, *port)
 	117       fmt.Printf("Start listening on port %s and
 	            forwarding data to %s\n",
 	            *listen_port, target)
 	118       ln, err := net.Listen("tcp", ":"+*listen_port)
 	119       if err != nil {
 	120         fmt.Printf("Unable to start listener, %v\n", err)
 	121         os.Exit(1)
 	122       }
 	123       conn_n := 1
 	124       for {
 	125         if conn, err := ln.Accept(); err == nil {
 	126           go process_connection(conn, conn_n, target)
 	127           conn_n += 1
 	128         } else {
 	129           fmt.Printf("Accept failed, %v\n", err)
 	130         }
 	131       }
 	132     }


        就这些,只有132行。请注意:程序仅使用了Go语言自身的标准库。


现在准备运行:

go run gotcpspy.go -host pop.yandex.ru -port 110 -local_port 8080

输出应为:

Start listening on port 8080 and forwarding data to pop.yandex.ru:110

然后在另一个窗口运行:

telnet localhost 8080


然后回车,比如:USER  test  [ENTER]  和 PASS  none  [ENTER]。三个日志文件将被创建(当你运行时间戳会不同)。


双向十六进制导出日志文件  log-2012.04.20-19.55.17-0001-192.168.1.41 -49544-213.180.204.37-110.log:

 	Connected to pop.yandex.ru:110 at 2012.04.20-19.55.17
 	Received (#0, 00000000) 38 bytes from 192.168.1.41-49544
 	00000000  2b 4f 4b 20 50 4f 50 20  59 61 21 20 76 31 2e 30
 	  |+OK POP Ya! v1.0|
 	00000010  2e 30 6e 61 40 32 36 20  48 74 6a 4a 69 74 63 50
 	  |.0na@26 HtjJitcP|
 	00000020  52 75 51 31 0d 0a
 	  |RuQ1..|
 	Sent (#0) to [--1]-8080
 	Received (#0, 00000000) 11 bytes from [--1]-8080
 	00000000  55 53 45 52 20 74 65 73  74 0d 0a
 	  |USER test..|
 	Sent (#0) to 192.168.1.41-49544
 	Received (#1, 00000026) 23 bytes from 192.168.1.41-49544
 	00000000  2b 4f 4b 20 70 61 73 73  77 6f 72 64 2c 20 70 6c
 	  |+OK password, pl|
 	00000010  65 61 73 65 2e 0d 0a
 	  |ease...|
 	Sent (#1) to [--1]-8080
 	Received (#1, 0000000B) 11 bytes from [--1]-8080
 	00000000  50 41 53 53 20 6e 6f 6e  65 0d 0a
 	  |PASS none..|
 	Sent (#1) to 192.168.1.41-49544
 	Received (#2, 0000003D) 72 bytes from 192.168.1.41-49544
 	00000000  2d 45 52 52 20 5b 41 55  54 48 5d 20 6c 6f 67 69
 	  |-ERR [AUTH] logi|
 	00000010  6e 20 66 61 69 6c 75 72  65 20 6f 72 20 50 4f 50
 	  |n failure or POP|
 	00000020  33 20 64 69 73 61 62 6c  65 64 2c 20 74 72 79 20
 	  |3 disabled, try |
 	00000030  6c 61 74 65 72 2e 20 73  63 3d 48 74 6a 4a 69 74
 	  |later. sc=HtjJit|
 	00000040  63 50 52 75 51 31 0d 0a
 	  |cPRuQ1..|
 	Sent (#2) to [--1]-8080
 	Disconnected from 192.168.1.41-49544
 	Disconnected from [--1]-8080
 	Finished at 2012.04.20-19.55.17, duration 5.253979s

二进制输出数据日志文件: log-binary-2012.04.20-19.55.17-0001 -192.168.1.41-49544.log:

 	USER test
 	PASS none

二进制输入数据日志文件: log-binary-2012.04.20-19.55.17 -0001-213.180.204.37-110.log:

 	+OK POP Ya! v1.0.0na@26 HtjJitcPRuQ1
 	+OK password, please.
 	-ERR [AUTH] login failure or POP3 disabled, try later. sc=HtjJitcPRuQ

貌似可以工作,让我们通过直接下载一个较大的二进制文件和通过代理下载测试一下程序性能。

直接下载(文件大小约72MB):

 	time wget http://www.erlang.org/download/otp_src_R15B01.tar.gz
 	...
 	Saving to: `otp_src_R15B01.tar.gz'
 	...
 	real        1m2.819s

现在启动代理,然后通过代理下载文件:

 	go run gotcpspy.go -host=www.erlang.org -port=80 -listen_port=8080

下载:

 	time wget http://localhost:8080/download/otp_src_R15B01.tar.gz
 	...
 	Saving to: `otp_src_R15B01.tar.gz.1'
 	...
 	real        0m56.209s

比较结果:

diff otp_src_R15B01.tar.gz otp_src_R15B01.tar.gz.1

两者匹配,说明程序正确工作。


        来看性能。我在我的Mac Air笔记本上重复这个实验许多次。令人惊讶的是,通过代理下载文件比直接下载还快一点。在上面的例子:1m2819s(直接)vs 0m.56209s (通过代理)。我能想到的唯一的解释是wget是单线程的,它在一个线程内多路复用输入输出数据流。相比,代理处理数据流在各自的线程中,这可能导致一点提速。但是这种差别很小,几乎可以忽略不计,在其他计算机或网络这种差异可能完全消失。主要的结论是:

通过代理下载没有降低程序运行速度,尽管有创建大量日志文件的额外开销。


       总体来看,我希望你从简单性和清晰度的角度来看这个程序。上面已经指出,但我再次强调:在这个程序中,我是渐进地使用了多线程。问题的实质促使我在处理一个连接时识别出并行活动,然后Go语言并行机制的易用性和安全性使并行得以实现。最后,使用并行我并没有考虑效率与复杂性(比较难调试)间的权衡。


      我同意,有时一个问题只需改变位和字节,这时代码的线性效率是你唯一需要关心的。但你遇到越来越多的问题,并行能力、多线程处理成为关键因素,对于这种应用,Go语言非常耀眼。


程序整体代码 gotcpspy.go :

package main

import (
	"encoding/hex"
	"flag"
	"fmt"
	"net"
	"os"
	"runtime"
	"strings"
	"time"
)

var (
	host        *string = flag.String("host", "", "target host or address")
	port        *string = flag.String("port", "0", "target port")
	listen_port *string = flag.String("listen_port", "0", "listen port")
)

func die(format string, v ...interface{}) {
	os.Stderr.WriteString(fmt.Sprintf(format+"\n", v...))
	os.Exit(1)
}

func connection_logger(data chan []byte, conn_n int, local_info, remote_info string) {
	log_name := fmt.Sprintf("log-%s-%04d-%s-%s.log", format_time(time.Now()), conn_n, local_info, remote_info)
	logger_loop(data, log_name)
}

func binary_logger(data chan []byte, conn_n int, peer string) {
	log_name := fmt.Sprintf("log-binary-%s-%04d-%s.log", format_time(time.Now()), conn_n, peer)
	logger_loop(data, log_name)
}

func logger_loop(data chan []byte, log_name string) {
	f, err := os.Create(log_name)
	if err != nil {
		die("Unable to create file %s, %v\n", log_name, err)
	}
	defer f.Close()
	for {
		b := <-data
		if len(b) == 0 {
			break
		}
		f.Write(b)
		f.Sync()
	}
}

func format_time(t time.Time) string {
	return t.Format("2006.01.02-15.04.05")
}

func printable_addr(a net.Addr) string {
	return strings.Replace(a.String(), ":", "-", -1)
}

type Channel struct {
	from, to              net.Conn
	logger, binary_logger chan []byte
	ack                   chan bool
}

func pass_through(c *Channel) {
	from_peer := printable_addr(c.from.LocalAddr())
	to_peer := printable_addr(c.to.LocalAddr())

	b := make([]byte, 10240)
	offset := 0
	packet_n := 0
	for {
		n, err := c.from.Read(b)
		if err != nil {
			c.logger <- []byte(fmt.Sprintf("Disconnected from %s\n", from_peer))
			break
		}

		if n > 0 {
			c.logger <- []byte(fmt.Sprintf("Received (#%d, %08X) %d bytes from %s\n", packet_n, offset, n, from_peer))
			c.logger <- []byte(hex.Dump(b[:n]))
			c.binary_logger <- b[:n]
			c.to.Write(b[:n])
			c.logger <- []byte(fmt.Sprintf("Sent (#%d) to %s\n", packet_n, to_peer))
			offset += n
			packet_n += 1
		}
	}
	c.from.Close()
	c.to.Close()
	c.ack <- true
}

func process_connection(local net.Conn, conn_n int, target string) {
	remote, err := net.Dial("tcp", target)
	if err != nil {
		fmt.Printf("Unable to connect to %s, %v\n", target, err)
	}

	local_info := printable_addr(remote.LocalAddr())
	remote_info := printable_addr(remote.RemoteAddr())

	started := time.Now()

	logger := make(chan []byte)
	from_logger := make(chan []byte)
	to_logger := make(chan []byte)
	ack := make(chan bool)

	go connection_logger(logger, conn_n, local_info, remote_info)
	go binary_logger(from_logger, conn_n, local_info)
	go binary_logger(to_logger, conn_n, remote_info)

	logger <- []byte(fmt.Sprintf("Connected to %s at %s\n", target, format_time(started)))

	go pass_through(&Channel{remote, local, logger, to_logger, ack})
	go pass_through(&Channel{local, remote, logger, from_logger, ack})
	<-ack // Make sure that the both copiers gracefully finish.
	<-ack //

	finished := time.Now()
	duration := finished.Sub(started)
	logger <- []byte(fmt.Sprintf("Finished at %s, duration %s\n", format_time(started), duration.String()))

	logger <- []byte{}      // Stop logger
	from_logger <- []byte{} // Stop "from" binary logger
	to_logger <- []byte{}   // Stop "to" binary logger
}

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())
	flag.Parse()
	if flag.NFlag() != 3 {
		fmt.Printf("usage: gotcpspy -host target_host -port target_port -listen_port=local_port\n")
		flag.PrintDefaults()
		os.Exit(1)
	}
	target := net.JoinHostPort(*host, *port)
	fmt.Printf("Start listening on port %s and forwarding data to %s\n", *listen_port, target)
	ln, err := net.Listen("tcp", ":"+*listen_port)
	if err != nil {
		fmt.Printf("Unable to start listener, %v\n", err)
		os.Exit(1)
	}
	conn_n := 1
	for {
		if conn, err := ln.Accept(); err == nil {
			go process_connection(conn, conn_n, target)
			conn_n += 1
		} else {
			fmt.Printf("Accept failed, %v\n", err)
		}
	}
}


原文地址:http://pragprog.com/magazines/2012-06/the-beauty-of-concurrency-in-go


有疑问加站长微信联系(非本文作者)

本文来自:CSDN博客

感谢作者:haibinzhang

查看原文:Go语言并行之美 -- 超越 "Hello World"

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2514 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传