并发基础
多进程
多线程基于回调的非阻塞/异步IO协程
协程
与传统的系统级线程和进程相比,协程的最大优势在于其“轻量级”,可以轻松创建上百万个而不会导致系统资源衰竭,
而线程和进程通常最多也不能超过1万个。这也是协程也叫轻量级线程的原因。多数语言在语法层面并不直接支持协程,而是通过库的方式支持,但用库的方式支持的功能也并不完整,比如仅仅提供轻量级线程的创建、销毁与切换等能力。如果在这样的轻量级线程中调用一个同步 IO 操作,比如网络通信、本地文件读写,都会阻塞其他的并发执行轻量级线程, 从而无法真正达到轻量级线程本身期望达到的目标。
Go 语言在语言级别支持轻量级线程,叫goroutine。Go 语言标准库提供的所有系统调用操作(当然也包括所有同步 IO 操作),都会出让 CPU 给其他goroutine。这让事情变得非常简单,让轻量级线程的切换管理不依赖于系统的线程和进程,也不依赖于CPU的核心数量。
goroutine
在一个函数调用前加上go关键字,这次调用就会在一个新的goroutine中并发执行。当被调用的函数返回时,这个goroutine也自动结束了。需要注意的是,如果这个函数有返回值,那么这个返回值会被丢弃。
package main
import "fmt"
func Add(x, y int) {
z := x + y
fmt.Println(z)
}
func main() {
for i := 0; i < 10; i++ {
go Add(i, i)
}
}
Go程序从初始化main package并执行main()函数开始,当main()函数返回时,程序退出,且程序并不等待其他goroutine(非主goroutine)结束。 所以看不到输出结果
要让主函数等待所有goroutine退出后再返回,如何知道goroutine都退出了呢?这就引出了多个goroutine之间通信的问题。下一节我们将主要解决这个问题。
并发通信
在工程上,有两种最常见的并发通信模型:共享数据和消息。
共享数据是指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享。被共享的数据可能有多种形式,比如内存数据块、磁盘文件、网络数据等。在实际工程应用中最常见的无疑是内存了,也就是常说的共享内存。
下面是共享内存的实现
var count int
func Count(lock *sync.Mutex) {
lock.Lock()
count++
fmt.Println(count)
lock.Unlock()
}
func main(){
lock := &sync.Mutex{}
for i := 0; i < 10; i++ {
go Count(lock)
}
for {
lock.Lock()
c := count
lock.Unlock()
runtime.Gosched()
if c > 10 {
break
}
}
}
Go语言提供的是另一种通信模型,即以消息机制而非共享内存作为通信方式。
Go语言提供的消息通信机制被称为channel,接下来我们将详细介绍channel。现在,让我们用Go语言社区的那句著名的口号来结束这一小节:“不要通过共享内存来通信,而应该通过通信来共享内存。”
channel
channel是Go语言在语言级别提供的goroutine间的通信方式。
//声明一个chan
var ch chan int
var mch map[string]chan bool
//声明并初始化一个int类型的chan
chan1 := make(chan int,1)
//将一个数据写入channel中
chan1 <- 1
getchan1 := <-chan1
从channel中取数据与写入数据
chan1 := make(chan int, 1)
//将1写入channel中
chan1 <- 1
//将一个数据从channel中读取到getchan1中
getchan1 := <-chan1
fmt.Println(getchan1) //输出1
select
通过调用select()函数来监控一系列的文件句柄。一旦其中一个文件句柄发生了IO动作,该select()调用就会被返回
select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。
与switch语句可以选择任何可使用相等比较的条件相比,select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
ch := make(chan int, 1)
for {
select {
case ch <- 0:
case ch <- 1:
}
i := <-ch
fmt.Println("Value received:", i)
}
缓冲机制
//创建一个带缓冲的channel
c := make(chan int, 1024)
超时机制
在并发编程的通信过程中,最需要处理的就是超时问题,即向channel写数据时发现channel已满,或者从channel试图读取数据时发现channel为空。
如果不正确处理这些情况,很可能会导致整个goroutine锁死。
因为select的特点是只要其中一个case已经完成,程序就会继续往下执行,而不会考虑其他case的情况。 所以可以使用select来避免goroutine阻塞问题
// 首先,我们实现并执行一个匿名的超时等待函数
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // 等待1秒钟
timeout <- true
}()
// 然后我们把timeout这个channel利用起来
select {
case <-ch:
// 从ch中读取到数据
case <-timeout:
// 一直没有从ch中读取到数据,但从timeout中读取到了数据
}
这样使用select机制可以避免永久等待的问题
channel的传递
type PipeData struct {
value int
handler func(int) int
next chan int
}
func handle(queue chan *PipeData) {
for data := range queue {
data.next <- data.handler(data.value)
}
}
单向channel
单向channel只能用于发送或者接收数据。
channel本身必然是同时支持读写的,否则根本没法用。假如一个channel真的只能读,那么肯定只会是空的,因为你没机会往里面写数据。同理,如果一个channel只允许写,即使写进去了,也没有丝毫意义,因为没有机会读取里面的数据。所谓的单向channel概念,其实只是对channel的一种使用限制。
我们在将一个channel变量传递到一个函数时,可以通过将其指定为单向channel变量,从
而限制该函数中可以对此channel的操作,比如只能往这个channel写,或者只能从这个channel读。
var ch1 chan int // ch1是一个正常的channel,不是单向的
var ch2 chan<- float64// ch2是单向channel,只用于写float64数据
var ch3 <-chan int // ch3是单向channel,只用于读取int数据
//单项channel初始化,ch4被转换为一个单项读channel和一个单向写channel
ch4 := make(chan int)
ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel
ch6 := chan<- int(ch4) // ch6 是一个单向的写入channel
关闭channel
close(ch)
如何判断一个channel是否已经被关闭
x, ok := <-ch //
返回值是false则表示ch已经被关闭。
多核并行化
type Vector []float64
// 分配给每个CPU的计算任务
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
for ; i < n; i++ {
v[i] += u.Op(v[i])
}
c <- 1 // 发信号告诉任务管理者我已经计算完成了
}
const NCPU = 16 // 假设总共有16核
func (v Vector) DoAll(u Vector) {
c := make(chan int, NCPU) // 用于接收每个CPU的任务完成信号
for i := 0; i < NCPU; i++ {
go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c)
}
// 等待所有CPU的任务完成
for i := 0; i < NCPU; i++ {
<-c // 获取到一个数据,表示一个CPU计算完成了
}
// 到这里表示所有计算已经结束
}
在Go语言升级到默认支持多CPU的某个版本之前,我们可以先通过设置环境变量
GOMAXPROCS的值来控制使用多少个CPU核心。具体操作方法是通过直接设置环境变量
GOMAXPROCS的值,或者在代码中启动goroutine之前先调用以下这个语句以设置使用16个CPU
核心:
runtime.GOMAXPROCS(16)
到底应该设置多少个CPU核心呢,其实runtime包中还提供了另外一个函数NumCPU()来获
取核心数。可以看到,Go语言其实已经感知到所有的环境信息,下一版本中完全可以利用这些
信息将goroutine调度到所有CPU核心上,从而最大化地利用服务器的多核计算能力。抛弃
GOMAXPROCS只是个时间问题。 出让时间片
runtime.Gosched()
同步
同步锁
Go语言包中的sync包提供了两种锁类型:sync.Mutex和sync.RWMutex
Mutex是最简单的一种锁类型,同时也比较暴力,当一个goroutine获得了Mutex后,其他goroutine就只能乖乖等到这个goroutine释放该Mutex。RWMutex相对友好些,是经典的单写多读模型。在读锁占用的情况下,会阻止写,但不阻止读,也就是多个goroutine可同时获取读锁(调用RLock()方法;而写锁(调用Lock()方法)会阻止任何其他goroutine(无论读和写)进来,整个锁相当于由该goroutine独占。从RWMutex的实现看,RWMutex类型其实组合了Mutex:
对于这两种锁类型,任何一个Lock()或RLock()均需要保证对应有Unlock()或RUnlock()
调用与之对应,否则可能导致等待该锁的所有goroutine处于饥饿状态,甚至可能导致死锁。锁的
典型使用模式如下:
var l sync.Mutex
func foo() {
l.Lock()
defer l.Unlock()
//...
}
这里我们再一次见证了Go语言defer关键字带来的优雅。
全局唯一性操作
对于从全局的角度只需要运行一次的代码,比如全局初始化操作,Go语言提供了一个Once类型来保证全局的唯一性操作,具体代码如下:
var a string
var once sync.Once
func setup() {
a = "hello, world"
}
func doprint() {
once.Do(setup)
print(a)
}
func twoprint() {
go doprint()
go doprint()
}
once的Do()方法可以保证在全局范围内只调用指定的函数一次(这里指
setup()函数),而且所有其他goroutine在调用到此语句时,将会先被阻塞,直至全局唯一的
once.Do()调用结束后才继续。