Go语言的并发编程主要通过goroutine和channel实现的。以下为学习笔记,仅供参考。欢迎指正。
一、goroutine
(一)goroutine的理解
对于初学者,goroutine直接理解成为线程就可以了。当使用go关键词调用函数时,启动一个goroutine的时候,就相当于启了一个线程,执行这个函数。
然而实际上一个goroutine并不是一个线程,是比线程还要小的调度单位。默认所有的goroutines在一个线程里跑,而且goroutines在线程中会是一个一个地执行。如果线程阻塞了,则被分配到空闲的线程。
(二)goroutine的使用
使用非常简单,在函数前增加一个go
例:go f(a,b)//开启后,不等待其结束,主线程继续执行。
PS:要注意的是一个goroutine开启后,若不等其执行,main(主goroutine)中将继续执行下一步,那么主线程一结束,goroutine中的程序就不会执行了。如何解决?代码如下:
func saySomething(str string) {
for i := 0; i<5; i++ {
time.Sleep(time.Millisecond * 1000)
fmt.Println(str)
}
}
func main() {
// 启动一个goroutine线程
go saySomething("Hello")
saySomething("World")
}
这里为什么要sleep? 是为了等go saySomething(“Hello”)处理完成。
好了,这里就出来了一个需求:如果要人为设定一个休眠的时间,非常地不方便,需要使一个goroutine结束后自动向主线程传输数据,告诉主线程这个goroutine已经结束了。这里就引进了channel的概念。
二、channel
(一)channel概念
简单来说就是,主线程告诉大家你开goroutine可以,但是我在我的主线程开了一个信道,你做完了你要做的事情之后,往信道里面塞个东西告诉我你已经完成了,我再结束主线程。
(二)channel使用
1、和map一样,信道是引用类型,用make 分配内存。如果调用make时提供一个可选的整数参数,则该信道就会被分配相应大小的缓冲区。缓冲区大小默认为0,对应于无缓冲信道或者同步信道。
ci := make(chan int) // 无缓冲整数信道
cs := make(chan *os.File, 100) // 缓冲的文件指针信道
2、信道可用来让正在运行的goroutine等待排序完成。确保(goroutine)相互都处于已知状态。
-往channel中插入数据的操作
c <- 1
-从channel中输出数据
<- c
代码示例:
c := make(chan int) // Allocate a channel.
// 在goroutine中启动排序,当排序完成时,信道上发出信号
go func() {
list.Sort()
c <- 1 // 发送一个信号,值是多少无所谓。
}()
doSomethingForAWhile()
<-c // 等待排序完成,丢弃被发送的值。
收信者(receivers)在收到数据前会一直被阻滞。如果信道是非缓冲的,则发信者(sender)在收信者接收到数据前也一直被阻滞。如果信道有缓冲区,发信者只有在数据被填入缓冲区前才被阻滞;如果缓冲区是满的,意味着发送者要等到某个收信者取走一个值。
(三)channel限制吞吐量
缓冲的信道可以象信号灯一样使用,比如用来限制吞吐量。在下面的例子中,进入的请求被传递给handle,handle发送一个值到信道,接着处理请求,最后从信道接收一个值。信道缓冲区的大小限制了并发调用process的数目。
var sem = make(chan int, MaxOutstanding)
func handle(r *Request) {
sem <- 1 // 等待队列缓冲区非满
process(r) // 处理请求,可能会耗费较长时间.
<-sem // 请求处理完成,准备处理下一个请求
}
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) //不等待handle完成
}
}
通过启动固定数目的handle goroutines也可以实现同样的功能,这些goroutines都从请求信道中读取请求。Goroutines的数目限制了并发调用process的数目。Serve函数也从一个信道中接收退出信号;在启动goroutines后,它处于阻滞状态,直到接收到退出信号:
func handle(queue chan *Request) {
for r := range queue {
process(r)
}
}
func Serve(clientRequests chan *clientRequests, quit chan bool) {
// 启动请求处理
for i := 0; i < MaxOutstanding; i++ {
go handle(clientRequests)
}
<-quit // 等待退出信号
}
(四)通过信道传输信道
Go最重要的特性之一就是: 信道, 信道可以像其它类型的数值一样被分配内存并传递。此特性常用于实现安全且并行的去复用(demultiplexing)。
前面的例子中,handle是一个理想化的处理请求的函数,但是我们没有定义它所能处理的请求的具体类型。如果该类型包括了一个信道,每个客户端就可以提供自己方式进行应答
type Request struct {
args []int
f func([]int) int
resultChan chan int
}
客户端提供一个函数、该函数的参数以及一个请求对象用来接收应答的信道
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}
request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// 发送请求
clientRequests <- request
// 等待响应.
fmt.Printf("answer: %d\n", <-request.resultChan)
在服务器端,处理请求的函数是
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}
显然要使这个例子更为实际还有很多工作要做,但这是针对速度限制、并行、非阻滞RPC系统的框架,而且其中也看不到互斥(mutex)的使用。
(五)并行
并行思想的一个应用是利用多核CPU进行并行计算。如果计算过程可以被分为多个片段,则它可以通过这样一种方式被并行化:在每个片段完成后通过信道发送信号。
此处上篇文章已经介绍过了,详见 Go语言并发编程(一)
(六)同步工具sync.WaitGroup
设置一个变量作为同步工具。这是防止主Goroutine过早的被运行结束的有效手段之一。对这个变量的声明和初始化的代码如下:
var waitGroup sync.WaitGroup // 用于等待一组操作执行完毕的同步工具。
waitGroup.Add(3) // 该组操作的数量是3。
numberChan1 := make(chan int64, 3) // 数字通道1。
numberChan2 := make(chan int64, 3) // 数字通道2。
numberChan3 := make(chan int64, 3) // 数字通道3
标识符sync.WaitGroup代表了一个类型。该类型的声明存在于代码包sync中,类型名为WaitGroup。另外,上面的第二条语句进行了一个“加3”的操作,意味着我们将要后面启用三个Goroutine,或者说要并发的执行三个go函数。
先来看第一个go函数:数字过滤函数,过滤掉不能被2整除的数字。
go func() { // 数字过滤函数1。
for n := range numberChan1 { // 不断的从数字通道1中接收数字,直到该通道关闭。
if n%2 == 0 { // 仅当数字可以被2整除,才将其发送到数字通道2.
numberChan2 <- n
} else {
fmt.Printf("Filter %d. [filter 1]\n", n)
}
}
close(numberChan2) // 关闭数字通道2。
waitGroup.Done() // 表示此操作完成。进行相应的“减1”
}()
数字过滤函数2代码与上述类似,过滤掉不能被5整除的数字。如下:
go func() { // 数字过滤函数2。
for n := range numberChan2 { // 不断的从数字通道2中接收数字,直到该通道关闭。
if n%5 == 0 { // 仅当数字可以被5整除,才将其发送到数字通道3.
numberChan3 <- n
} else {
fmt.Printf("Filter %d. [filter 1]\n", n)
}
}
close(numberChan3) // 关闭数字通道3。
waitGroup.Done() // 表示此操作完成。进行相应的“减1”
}()
如此一来,数字过滤函数1和2就经由数字通道2串联起来了。请注意,不要忘记在数字过滤函数2中的for语句后面添加对数字通道numberChan3的关闭操作,以及调用waitGroup变量的Done方法。
go func() { // 数字输出函数。
for n := range numberChan3 { // 不断的从数字通道3中接收数字,直到该通道关闭。
fmt.Println(n) // 打印数字。
}
waitGroup.Done() // 表示此操作完成。并“减1”。
}()
然后激活这一过滤数字的流程。具体的激活方法是,向数字通道numberChan1发送数字。在上述代码后加入代码如下:
for i := 0; i < 100; i++ { // 先后向数字通道1传送100个范围在[0,100)的随机数。
numberChan1 <- rand.Int63n(100)
}
close(numberChan1) // 数字发送完毕,关闭数字通道1。对通道的关闭并不会影响到对已存于其中的数字的接收操作。
为了能够让这个流程能够被完整的执行,我们还需要在最后加入这样一条语句:
waitGroup.Wait() // 等待前面那组操作(共3个)的完成。
对waitGroup的Wait方法的调用会一直被阻塞,直到前面三个go函数中的三个waitGroup.Done()语句(即那三个“减1操作”)都被执行完毕。也就是当waitGroup里的数量由3减到0时,才能让对waitGroup.Wait()语句的执行从阻塞中恢复并完成。
版权声明:本文为博主原创文章,未经博主允许不得转载。
有疑问加站长微信联系(非本文作者)