golang 碎片整理之 并发

霍帅兵 · · 568 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

并发与并行

并发:同一时间段内执行多个任务。
并行:同一时刻执行多个任务。

Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine并发工作。goroutine是由go语言的运行调度完成的,而线程是由操作系统调度完成的。
Go语言还提供了channel在多个goroutine间进行通信。goroutine和channel是go语言秉承的CSP(Communicating Sequential Process)并发模式的重要实现基础。

goroutine

在Java/c++中我们要实现并发编程的时候,我们通常要自己维护一个线程池,并且需要自己去包装一个又一个的任务和然后自己去调度线程执行任务并维护上线文的切换,这一切通常会耗费程序员的大量心智。能不能有一种机制,程序员只需要定义很多个任务,让系统去帮忙我们把这些任务分配到CPU上实现并发执行呢? Go语言中的goroutine就是这样一种机制,Go语言之所以能被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。

使用goroutine

Go程序中使用go关键字为一个函数创建一个goroutine。一个函数可以被创建多个goroutine,一个goroutine必定对应一个函数。

启动单个goroutine

启动goroutine的方式非常简单,只需要在调用的函数(普通函数和匿名函数)前面加一个go 关键字。
举个例子:

package main

import (
    "fmt"
)

func hello() {
    fmt.Println("hello goroutine")
}
func main() {
    hello()
    fmt.Println("main goroutine done!")
}

这个示例中hello函数和下面的语句是串行的,执行的结果是打印完hello goroutine 后打印main goroutine done!
接下来我们在调用hello 函数前面加上go关键字,也就是启动一个goroutine区执行hello这个函数。

func main() {
    go hello()
    fmt.Println("main goroutine done!")
}

这一次的执行结果只打印了 main goroutine done! , 并没有打印 hello goroutine,为什么呢?
在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。当main函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine 会一同结束,main函数所在的goroutine就像是权利的游戏中的夜王,其他的goroutine就像是异鬼,夜王一死它转化的那些异鬼也就全部GG了。
所以我们要想办法让main函数等一等hello函数,最简单粗暴的方式就是sleep了。

func main(){
    go hello()
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second)
}

执行上面的代码你会发现,这一次先打印main goroutine done! ,然后紧接着打印Hello Goroutine!
首先为什么会打印main goroutine done! 是因为我们在创建新的goroutine 的时候需要花费一些时间,而此时main函数所在的goroutine是继续执行的。

sync.WaitGroup

在代码中生硬的使用time.sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。sync.WaitGroup有一下几个方法:

方法名 功能
(wg WaitGroup)Add(delta int) 计数器+delta
(wg
WaitGroup)Done() 计算器-1
(wg *WaitGroup)Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N个并发任务时,就将计数器的值增加N, 每个任务完成时通过调用Done()方法将计数器减1,通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
我们利用sync.WaitGroup将上面的代码优化一下:

var wg sync.WaitGroup

func hello() {
    defer wg.Done()
    fmt.Println("hello 1")
}
func main() {
    wg.Add(1)
    go hello()
    fmt.Println("main 2")
    wg.Wait()
}

需要注意的是sync.WaitGroup是一个结构体,传递的时候需要传递指针。

启动多个goroutine

在go语言中实现并发就是这么简单,我们还可以启动多个goroutine。让我们再来一个例子:

var wg sync.WaitGroup

func hello(i int) {
    defer wg.Done()
    fmt.Println("hello ,", i)
}
func main() {
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go hello(i)
    }
    wg.Wait()
}

多次执行上面的代码,会发现每次打印的数字的顺序都不一样,这是因为10个goroutine是并发执行的,而goroutine的调度室随机的。

goroutine 与线程

可增长的栈

OS线程一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这么大。所以在go语言中一次创建十万左右的goroutine也是可以的。

goroutine的调度

os线程是由os内核来调度的,goroutine则是由go运行时自己的调度器来调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程上)。goroutine的调度不需要切换内核语境,所以调度一个goroutine比调度一个线程成本低很多。

GOMAXPROCS

go运行时的调度器使用GOMAXPROCS 参数来确定需要使用多少个OS线程来同时执行GO代码,默认值是机器上的CPU的核心数,例如在一个8核的机器上,调度器会把go代码同时调度到8个OS线程上。go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
go1.5版本之前,默认使用的是单核心执行,Go1.5版本之后,默认使用全部的CPU逻辑核心数。

GO语言中的操作系统线程和goroutine的关系:
1.一个操作系统线程对应用户态多个goroutine。
2.go程序可以同时多个操作系统线程。
3.goroutine 和OS线程是多对多的关系,即m:n

channel

单纯的将函数并发执行是没有意义的,函数与函数间需要交换数据才能体现并发执行函数的意义。
虽然可以使用共享内存进行数据交换,但是共享内存存在不同的goroutine中容易发生的竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法必然导致性能问题。
go语言的并发模型是CSP,提倡通过通信共享内存,而不是通过共享内存实现通信。
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接,channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
go语言中的通道channel是一种特殊的类型,通道像一个传送带或者队列,总是遵循先入先出的规则,保证收发数据的顺序,每个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

声明channel

声明通道类型的格式如下:

var 变量  chan 元素类型

举几个例子:

var ch1 chan int
var ch2 chan bool
var ch3 chan []int

创建channel

通道是引用类型,通道类型的空值是nil。

var ch chan int
fmt.Println(ch)    //<nil>

声明的通道后需要使用make函数初始化之后才能使用。创建channel的格式如下:

make(chan 元素类型, [缓冲大小])

缓冲大小是可选的。
举几个例子:

cha4 := make(chan int)
cha5 := make(chan bool)
cha6 := make(chan []int)

channel操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。发送和接收都使用<- 符号。
现在我们先使用以下语句定义一个通道:

ch := make(chan int)

发送

将一个值发送到通道中。

ch <- 10  //把10发送到ch中

接收

从一个通道中接收值。

x := <- ch     //从ch中接收值并赋值给变量x
<- ch       // 从ch中接收值,忽略结果

关闭

我们通过调用内置的close函数来关闭通道。

close(ch)

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,他和关闭文件不一样,在结束操作之后关闭文件是必须做的,但是关闭通道不是必须的。
关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值得通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。

    无缓冲的通道

    无缓冲的通道又称为阻塞的通道。我们来看一下下面的代码:

    func main(){
    ch := make(chan int)
    ch <- 10
    fmt.Println("发送成功了")
    }

    上面的代码能够通过编译,但是执行的时候会出现以下错误:
    fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
/Users/huoshuaibing/gowork/src/github.com/studygolang/day12/main.go:9 +0x54
exit status 2
为什么会出现deadlock错误呢?
因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。就像你住的小区没有代收点和快递柜,快递员给你打电话必须把这个物品送到你的手中,简单来说就是无缓冲的通道必须有接收才能发送。
上面的代码会阻塞在 ch <- 10 这一行代码形成死锁,那么如何解决这个问题呢?
一种方法是启用一个goroutine去接收值,例如:

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}
func main() {
    ch := make(chan int)
    go recv(ch)
    ch <- 10
    fmt.Println("发送成功")
}

无缓冲的通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行;相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroiutine在该通道上发送一个值。
使用无缓冲的通道进行通信将导致发送和接收的goroutine同步化,因此,无缓冲的通道也被称为同步通道。

有缓冲的通道

解决上面的问题的方法还有一种就是使用有缓冲区的通道。我们可以使用make函数初始通道的时候为其指定通道的容量,例如:

func main() {
    ch := make(chan int, 1)
    ch <- 10
    fmt.Println("发送成功")
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么多个格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员才能往里面放一个。
我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量。

如何优雅的从通道中循环取值

当通过通道发送有限的数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待。当通道关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢?
我们来看下面的例子:

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go func() {
        for i := 0; i < 100; i++ {
            ch1 <- i
        }
        close(ch1)
    }()
    go func() {
        for {
            i, ok := <-ch1
            if !ok {
                break
            }
            ch2 <- i * i
        }
        close(ch2)
    }()
    for i := range ch2 {
        fmt.Println(i)
    }
}

从上面的例子中我们看到两种方式在接收值得时候判断通道是否被关闭,我们通常使用的是for range 的方式。

单向通道

有时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如只能发送或接收。Go语言中提供了单向通道来处理这种情况。例如,我们把上面的例子改造如下:

func counter(out chan<- int) {
    for i := 0; i < 100; i++ {
        out <- i
    }
    close(out)
}
func squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}
func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

其中,chan<- int 是一个只能发送的通道,可以发送但是不能接收; <-chan int 是一个只能接收的通道,可以接收但是不能发送。在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但是反过来不可以的。

select多路复用

在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。你也许会写出如下代码使用遍历的方式来实现:

for {
    data,ok := <- ch1
    data,ok := <- ch2
    ...
}

这种方式虽然可以实现从多个通道接收值的需求,但是运行性能会差很多。为了应对这种场景,GO内置了select关键字,可以同时响应多个通道的操作。select的使用类似于switch语句,它有一些列case分支和一个默认分支,每个case会对应一个通道的通信过程。select会一直等待,直到某个case的通信操作完成,就会执行case分支对应的语句。具体格式如下:

select {
case <- ch1:
      ...
case <-ch2
      ...
default:
      默认操作
}

举个小例子来演示一下select 的使用:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int, 1)
    for i := 0; i < 10; i++ {
        select {
        case x := <-ch:
            fmt.Println(x)
        case ch <- i:
        }
    }
}

使用select语句能提高代码的可读性。如果多个case同时满足,那么select会随机挑选一个。对于没有case的select{}会一直等待。

并发安全和锁

有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题。类比生活中的例子有十字路口被各个方向的汽车竞争;还有火车上的卫生间被车厢里面的人竞争。举个例子:

var x int64
var wg sync.WaitGroup

func add() {
    for i := 0; i < 100; i++ {
        x = x + 1
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

上面的代码我们开启了两个goroutine 去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符。

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,他们能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex来实现互斥锁。使用互斥锁来修复上面代码的问题:

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i := 0; i < 100; i++ {
        lock.Lock()
        x = x + 1
        lock.Unlock()
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine 则在等待锁;当互斥锁释放后,等待goroutine才可以获得锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

读写互斥锁

互斥锁时完全互斥的,实际情况是很多情景下是读多写少的,当我们并发的去读一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包的RWMutex 类型。
读写锁分两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果获取的是写锁就会等待,当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

读写锁示例:

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    // lock.Lock()   // 加互斥锁
    rwlock.Lock() // 加写锁
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
    rwlock.Unlock()                   // 解写锁
    // lock.Unlock()                     // 解互斥锁
    wg.Done()
}

func read() {
    // lock.Lock()                  // 加互斥锁
    rwlock.RLock()               // 加读锁
    time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
    rwlock.RUnlock()             // 解读锁
    // lock.Unlock()                // 解互斥锁
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

sync.Once

说在前面:这是一个进阶知识点。延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序启动延时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要的。我们来看一个例子:
sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会执行多次。

sync.Map

Go语言中内置的map不是并发安全的。请看下面的示例:

var m = make(map[string]int)

func get(key string) int {
    return m[key]
}
func set(key string, value int) {
    m[key] = value
}
func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            set(key, n)
            fmt.Println("k=:%v, v:=%v\n", key, get(key))
            wg.Done()
        }(i)
    }
    wg.Wait()
}

上面的代码开启少量几个goroutine 的时候没有问题,当并发对了之后,执行就会报fatal error: concurrent map writes错误。
像这种场景下就需要为map加锁来保证并发的安全性,go语言的sync包中提供了一个开箱即用的并发安全版map-sync.Map。同时sync.Map 内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。


有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:霍帅兵

查看原文:golang 碎片整理之 并发

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

568 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传