Go语言并发编程(二)

swallowing_ · · 3651 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

Go语言的并发编程主要通过goroutine和channel实现的。以下为学习笔记,仅供参考。欢迎指正。

一、goroutine

(一)goroutine的理解

  对于初学者,goroutine直接理解成为线程就可以了。当使用go关键词调用函数时,启动一个goroutine的时候,就相当于启了一个线程,执行这个函数。
  然而实际上一个goroutine并不是一个线程,是比线程还要小的调度单位。默认所有的goroutines在一个线程里跑,而且goroutines在线程中会是一个一个地执行。如果线程阻塞了,则被分配到空闲的线程。

(二)goroutine的使用

使用非常简单,在函数前增加一个go
例:go f(a,b)//开启后,不等待其结束,主线程继续执行。

PS:要注意的是一个goroutine开启后,若不等其执行,main(主goroutine)中将继续执行下一步,那么主线程一结束,goroutine中的程序就不会执行了。如何解决?代码如下:

func saySomething(str string) {
    for i := 0; i<5; i++ {
        time.Sleep(time.Millisecond * 1000)
        fmt.Println(str)
    }
}

func main() {
    // 启动一个goroutine线程
    go saySomething("Hello")
    saySomething("World")
}

这里为什么要sleep? 是为了等go saySomething(“Hello”)处理完成。

  好了,这里就出来了一个需求:如果要人为设定一个休眠的时间,非常地不方便,需要使一个goroutine结束后自动向主线程传输数据,告诉主线程这个goroutine已经结束了。这里就引进了channel的概念。

二、channel

(一)channel概念

  简单来说就是,主线程告诉大家你开goroutine可以,但是我在我的主线程开了一个信道,你做完了你要做的事情之后,往信道里面塞个东西告诉我你已经完成了,我再结束主线程。

(二)channel使用

1、和map一样,信道是引用类型,用make 分配内存。如果调用make时提供一个可选的整数参数,则该信道就会被分配相应大小的缓冲区。缓冲区大小默认为0,对应于无缓冲信道或者同步信道。

ci := make(chan int) // 无缓冲整数信道
cs := make(chan *os.File, 100) // 缓冲的文件指针信道

2、信道可用来让正在运行的goroutine等待排序完成。确保(goroutine)相互都处于已知状态。
-往channel中插入数据的操作

c <- 1

-从channel中输出数据

<- c

代码示例:

c := make(chan int) // Allocate a channel.
// 在goroutine中启动排序,当排序完成时,信道上发出信号
go func() {
  list.Sort()
  c <- 1 // 发送一个信号,值是多少无所谓。
}()
doSomethingForAWhile()
<-c // 等待排序完成,丢弃被发送的值。

  收信者(receivers)在收到数据前会一直被阻滞。如果信道是非缓冲的,则发信者(sender)在收信者接收到数据前也一直被阻滞。如果信道有缓冲区,发信者只有在数据被填入缓冲区前才被阻滞;如果缓冲区是满的,意味着发送者要等到某个收信者取走一个值。

(三)channel限制吞吐量

  缓冲的信道可以象信号灯一样使用,比如用来限制吞吐量。在下面的例子中,进入的请求被传递给handle,handle发送一个值到信道,接着处理请求,最后从信道接收一个值。信道缓冲区的大小限制了并发调用process的数目。

var sem = make(chan int, MaxOutstanding)
func handle(r *Request) {
  sem <- 1 // 等待队列缓冲区非满
  process(r) // 处理请求,可能会耗费较长时间.
  <-sem // 请求处理完成,准备处理下一个请求
}
func Serve(queue chan *Request) {
  for {
    req := <-queue
    go handle(req) //不等待handle完成
  }
}

  通过启动固定数目的handle goroutines也可以实现同样的功能,这些goroutines都从请求信道中读取请求。Goroutines的数目限制了并发调用process的数目。Serve函数也从一个信道中接收退出信号;在启动goroutines后,它处于阻滞状态,直到接收到退出信号:

func handle(queue chan *Request) {
  for r := range queue {
   process(r)
  }
}

func Serve(clientRequests chan *clientRequests, quit chan bool) {
  // 启动请求处理
  for i := 0; i < MaxOutstanding; i++ {
    go handle(clientRequests)
  }
  <-quit // 等待退出信号
}

(四)通过信道传输信道

  Go最重要的特性之一就是: 信道, 信道可以像其它类型的数值一样被分配内存并传递。此特性常用于实现安全且并行的去复用(demultiplexing)。
  前面的例子中,handle是一个理想化的处理请求的函数,但是我们没有定义它所能处理的请求的具体类型。如果该类型包括了一个信道,每个客户端就可以提供自己方式进行应答

type Request struct {
  args []int
  f func([]int) int
  resultChan chan int
}

客户端提供一个函数、该函数的参数以及一个请求对象用来接收应答的信道

func sum(a []int) (s int) {
for _, v := range a {
  s += v
}
return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// 发送请求
clientRequests <- request
// 等待响应.
fmt.Printf("answer: %d\n", <-request.resultChan)

在服务器端,处理请求的函数是

func handle(queue chan *Request) {
  for req := range queue {
    req.resultChan <- req.f(req.args)
  }
}

  显然要使这个例子更为实际还有很多工作要做,但这是针对速度限制、并行、非阻滞RPC系统的框架,而且其中也看不到互斥(mutex)的使用。

(五)并行

  并行思想的一个应用是利用多核CPU进行并行计算。如果计算过程可以被分为多个片段,则它可以通过这样一种方式被并行化:在每个片段完成后通过信道发送信号。
  此处上篇文章已经介绍过了,详见 Go语言并发编程(一)

(六)同步工具sync.WaitGroup

  设置一个变量作为同步工具。这是防止主Goroutine过早的被运行结束的有效手段之一。对这个变量的声明和初始化的代码如下:

var waitGroup sync.WaitGroup // 用于等待一组操作执行完毕的同步工具。
waitGroup.Add(3)              // 该组操作的数量是3。
numberChan1 := make(chan int64, 3) // 数字通道1。
numberChan2 := make(chan int64, 3) // 数字通道2。
numberChan3 := make(chan int64, 3) // 数字通道3

  标识符sync.WaitGroup代表了一个类型。该类型的声明存在于代码包sync中,类型名为WaitGroup。另外,上面的第二条语句进行了一个“加3”的操作,意味着我们将要后面启用三个Goroutine,或者说要并发的执行三个go函数。

先来看第一个go函数:数字过滤函数,过滤掉不能被2整除的数字。

go func() { // 数字过滤函数1。
  for n := range numberChan1 { // 不断的从数字通道1中接收数字,直到该通道关闭。
    if n%2 == 0 { // 仅当数字可以被2整除,才将其发送到数字通道2.
      numberChan2 <- n
    } else {
      fmt.Printf("Filter %d. [filter 1]\n", n)
    }
  } 
  close(numberChan2) // 关闭数字通道2。
  waitGroup.Done()   // 表示此操作完成。进行相应的“减1”
}()

数字过滤函数2代码与上述类似,过滤掉不能被5整除的数字。如下:

go func() { // 数字过滤函数2。
  for n := range numberChan2 { // 不断的从数字通道2中接收数字,直到该通道关闭。
    if n%5 == 0 { // 仅当数字可以被5整除,才将其发送到数字通道3.
      numberChan3 <- n
    } else {
      fmt.Printf("Filter %d. [filter 1]\n", n)
    }
  } 
  close(numberChan3) // 关闭数字通道3。
  waitGroup.Done()   // 表示此操作完成。进行相应的“减1”
}()

  如此一来,数字过滤函数1和2就经由数字通道2串联起来了。请注意,不要忘记在数字过滤函数2中的for语句后面添加对数字通道numberChan3的关闭操作,以及调用waitGroup变量的Done方法。

go func() { // 数字输出函数。
  for n := range numberChan3 { // 不断的从数字通道3中接收数字,直到该通道关闭。
    fmt.Println(n) // 打印数字。
  }
  waitGroup.Done() // 表示此操作完成。并“减1”。
}()

  然后激活这一过滤数字的流程。具体的激活方法是,向数字通道numberChan1发送数字。在上述代码后加入代码如下:

for i := 0; i < 100; i++ { // 先后向数字通道1传送100个范围在[0,100)的随机数。
  numberChan1 <- rand.Int63n(100)
}
close(numberChan1) // 数字发送完毕,关闭数字通道1。对通道的关闭并不会影响到对已存于其中的数字的接收操作。

为了能够让这个流程能够被完整的执行,我们还需要在最后加入这样一条语句:

waitGroup.Wait() // 等待前面那组操作(共3个)的完成。

  对waitGroup的Wait方法的调用会一直被阻塞,直到前面三个go函数中的三个waitGroup.Done()语句(即那三个“减1操作”)都被执行完毕。也就是当waitGroup里的数量由3减到0时,才能让对waitGroup.Wait()语句的执行从阻塞中恢复并完成。

参考文章:go语言学习笔记之并发编程
     Go并发编程之Go语言概述

版权声明:本文为博主原创文章,未经博主允许不得转载。


有疑问加站长微信联系(非本文作者)

本文来自:CSDN博客

感谢作者:swallowing_

查看原文:Go语言并发编程(二)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

3651 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传