19.Go语言基础之并发

DevOperater · · 1611 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

1.1并发与并行

并发:同一时间段执行多个任务(使用微信和多个朋友聊天)
并行:同一时刻执行多个任务(windows中360在杀毒,同时你也在写代码)
Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine并发工作。
goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。
Go语言还提供channel在多个goroutine间进行通信。goroutine和channel是Go语言秉承的CSP(Communication Sequential Process)并发模式的重要实现基础。

1.2goroutine

在java/Python中,我们实现并发编程的时候,通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,同时需要自己去调度线程执行任务并维护上下文切换,这一切需要耗费很多。
Go语言中的goroutine,类似于线程,但goroutine是由Go的运行时(runtime)调度和管理的。Go程序能够只能的将goroutine中的任务合理的分配到每个CPU。Go语言被称为现代化语言的原因,就是因为Go在语言层面就已经内置了调度和上下文切换的机制。
在Go语言编程中,不需要自己写进程、线程、协程,你的技能只有一个,就是goroutine。

1.2.1使用goroutine

Go语言中使用goroutine非常简单,只需要在调用函数前面加上"go"关键字,就可以为一个函数创建一个goroutine。
一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。

1.2.2启动单个goroutine

没有使用goroutine时,程序是顺序运行的。

//

package main

import (
    "fmt"
)

func hello()  {
    fmt.Println("Hello Goroutine!")
}
func main() {
    hello()
    fmt.Println("main goroutine done!")
}

结果:
Hello Goroutine!
main goroutine done!

Process finished with exit code 0

使用go关键字

Mac系统上实验
package main

import (
    "fmt"
)

func hello()  {
    fmt.Println("Hello Goroutine!")
}
func main() {
    go hello()
    fmt.Println("main goroutine done!")
}

结果1:
main goroutine done!

Process finished with exit code 0

结果2:
main goroutine done!
Hello Goroutine!

Process finished with exit code 0

结果3:
Hello Goroutine!
main goroutine done!

Process finished with exit code 0
会发现,出现了只打印了main goroutine done的现象,是因为main函数也是一个goroutine,main函数执行完了,整个程序就结束了。

1.2.3启动多个goroutine

Go语言中实现并发就是这么简单,可以启动多个goroutine。
这里使用sync.WaitGroup来实现goroutine的同步。
package main

import (
    "fmt"
    "sync"
)
var wg sync.WaitGroup

func hello(i interface{})  {
    defer wg.Done() //goroutine结束就登记-1
    fmt.Println("Hello Goroutine! i:",i)
}
func main() {
    for i:=0;i<10;i++{
        wg.Add(1) //启动一个goroutine就登记+1
        go hello(i)
    }
    wg.Wait()//等待所有等级的goroutine都结束
}

结果:
Hello Goroutine! i: 9
Hello Goroutine! i: 7
Hello Goroutine! i: 2
Hello Goroutine! i: 0
Hello Goroutine! i: 3
Hello Goroutine! i: 5
Hello Goroutine! i: 1
Hello Goroutine! i: 6
Hello Goroutine! i: 4
Hello Goroutine! i: 8

Process finished with exit code 0
多次执行上面的代码,会发现每次打印的数字顺序都不一样。这是因为10个goroutine是并发执行的,而goroutine的调度室随机的。

1.3goroutine与线程

1.3.1可增长的栈

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然很少会用到这么大。

1.3.2goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;
P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。

P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。
其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 
另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

1.3.3GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。
例如在一个8个CPU的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n中的n)
Go语言通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
我们可以通过将任务分配到不同的CPU逻辑核心上实现并行的效果,这里举个例子:
设置GOMAXPROCS=1,goroutine启动两个任务,此时是一个任务执行完了才能执行另一个任务

package main

import (
    "fmt"
    "runtime"
    "time"
)
func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(1)
    go a()
    go b()
    time.Sleep(time.Second)
}

结果:
A: 1
A: 2
A: 3
A: 4
A: 5
A: 6
A: 7
A: 8
A: 9
B: 1
B: 2
B: 3
B: 4
B: 5
B: 6
B: 7
B: 8
B: 9

Process finished with exit code 0
设置GOMAXPROCS=2,goroutine启动两个任务,两个任务同时执行,出现两个任务交互打印现象,要多试几次,需要笔记本是多个CPU哦!我在mac上测试成功的。
package main

import (
    "fmt"
    "runtime"
    "time"
)
func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(2)
    go a()
    go b()
    time.Sleep(time.Second)
}

结果:
A: 1
B: 1
B: 2
B: 3
B: 4
B: 5
B: 6
B: 7
B: 8
B: 9
A: 2
A: 3
A: 4
A: 5
A: 6
A: 7
A: 8
A: 9

Process finished with exit code 0
//Go语言中的操作系统线程和goroutine的关系:
1.一个操作系统线程对应用户态多个goroutine。
2.go程序可以同时使用多个操作系统线程。
3.goroutine和OS线程是多对多的关系,即m:n。

1.4channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

1.4.1channel类型

channel是一种类型,一种引用类型。生命通道类型的格式如下:
var 变量 chan 元素类型
package main

import (
    "fmt"
)

func main() {
    var ch1 chan int   // 声明一个传递整型的通道
    var ch2 chan bool  // 声明一个传递布尔型的通道
    var ch3 chan []int // 声明一个传递int切片的通道

    fmt.Printf("v:%v type:%T\n",ch1,ch1)
    fmt.Printf("v:%v type:%T\n",ch2,ch2)
    fmt.Printf("v:%v type:%T\n",ch3,ch3)
}

结果:
v:<nil> type:chan int
v:<nil> type:chan bool
v:<nil> type:chan []int

Process finished with exit code 0

1.4.2创建channel

通道是引用类型,通道类型的空值是nil。
var ch chan int
fmt.Println(ch) // <nil>

声明的通道后需要使用make函数初始化后才能使用。
创建channel的格式如下:
make(chan 元素类型, [缓冲大小])
channel的缓冲大小是可选的。
package main

import (
    "fmt"
)

func main() {
    ch4 := make(chan int)
    ch5 := make(chan bool)
    ch6 := make(chan []int)

    fmt.Printf("v:%v type:%T\n",ch4,ch4)
    fmt.Printf("v:%v type:%T\n",ch5,ch5)
    fmt.Printf("v:%v type:%T\n",ch6,ch6)
}

结果:
v:0xc000012060 type:chan int
v:0xc0000120c0 type:chan bool
v:0xc000012120 type:chan []int

Process finished with exit code 0

1.4.3channel操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。
发送和接收都使用<-
定义通道:ch := make(chan int)

发送:将一个值发送到通道中。
           ch <- 10 //把10发送到通道中
接收:从一个通道中接收值。
           a := <- ch //从ch中接收值,并赋值给a
                  <- ch        //从ch中接收值,忽略结果
关闭:关闭通道。
           close(ch)
                     注意:
                         1.只有在通知接收方goroutine所有的数据都发送完毕的时候,才需要关闭通道。
                         2.通道是可以被垃圾回收机制回收的,与关闭文件不一样,文件操作结束后文件是必须关闭的,但通道不是必须关闭的。
                     关闭后的通道有以下特点:
                         1.对一个关闭的通道再发送值会导致panic。
                         2.对一个关闭的通道进行接收值,会一直获取值直到通道为空。
                         3.对一个关闭的并且没有值得通道执行接收操作,会得到对应类型的零值。
                         4.关闭一个已经关闭的通道会导致panic。

1.4.4无缓冲的通道

无缓冲通道称为阻塞通道。无缓冲通道必须在发送数据的同时有人接收值,否则会阻塞在那里,直到报错。
//无缓冲通道,只发送值不接收值的时候会出现deadlock错误。

package main

import "fmt"

func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Printf("发送成功!")
}

结果:
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /Users/tongchao/Desktop/gopath/src/test/test.go:7 +0x54

Process finished with exit code 2

因为我们使用ch := make(chan int)创建的是无缓冲通道,无缓冲通道只有在有人接收值的时候才能发送值。
上买呢代码会阻塞在ch <- 10,这一行代码会形成死锁。
解决方法:使用goroutine去接收值
package main

import (
    "fmt"
    "sync"
)
var wg sync.WaitGroup
func recv(ch chan int)  {
    defer wg.Done()
    i := <- ch
    fmt.Println("接收的值是:",i)
}
func main() {
    ch := make(chan int)
    wg.Add(1)
    go recv(ch)

    ch <- 10
    wg.Wait()

    fmt.Printf("发送成功!\n")
}

结果:
接收的值是: 10
发送成功!

Process finished with exit code 0

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时才能发送成功,两个goroutine将继续执行。
如果接收操作限制性,接收方的goroutine将会阻塞,直到另一个goroutine在该通道上发送一个值。
//使用无缓冲通道进行通信,将会导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

1.4.5有缓冲的通道

解决上面问题的方法还有一种就是使用有缓冲的通道。我们可以在使用make函数初始化通道的时候为其制定通道的容量,只要通道的容量大于零,就是有缓冲的通道,通道的容量表示通道中能存放的元素的数量。
可以使用len()获取通道内元素的数量,使用cap函数获取通道的容量。
package main

import (
    "fmt"
)

func main() {
    ch := make(chan int,1) //创建一个容量为1的有缓冲区通道

    ch <- 10

    fmt.Printf("发送成功!\n")
    fmt.Println("len(ch):",len(ch))
    fmt.Println("cap(ch)",cap(ch))
}

结果:
发送成功!
len(ch): 1
cap(ch) 1

Process finished with exit code 0

1.4.6for range从通道循环取值

当向通道中发送完数据时,我们可以通过close函数关闭通道。
当通道被关闭时,再往该通道发送值会引发panic,从该通道里接收值一直都是类型0值。那么如何判断一个通道是否被关闭了呢?
方法一:
    i, ok := <-ch1 // 通道关闭后再取值ok=false
方法二:
    for range遍历通道,通道被关闭时就会退出for range。
package main

import "fmt"

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    //开启goroutine将0-100的数发送到ch1中
    go func() {
        for i:=0;i<101;i++{
            ch1 <- i
        }
        close(ch1)
    }()

    //开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
    go func() {
        for{
            i,ok := <- ch1 //通道关闭后再取值ok=false
            if !ok{
                break
            }
            ch2 <- i*i
        }
        close(ch2)
    }()

    //在主goroutine中从ch2中接收值打印
    for i:= range ch2{//通道关闭后退出for range循环
        fmt.Println(i)

    }
}

结果:
0
1
4
9
16
25
...
9604
9801
10000

Process finished with exit code 0

1.4.7单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。
chan <- int是一个只写单向通道(只能对其写入int类型值),可以对其进行发送操作但不能执行接收操作;
<- chan int是一个只读单向通道(只能从通道读取int类型值),可以对其执行接收操作但不能执行发送操作。
在函数传参及任何赋值操作中,可以将双向通道转换为单向通道,但反过来是不可以的。
package main

import "fmt"

func counter(out chan <- int)  {
    for i:=0;i<101;i++{
        out <- i
    }
    close(out)
}
func squarer(out chan <- int,in <- chan int)  {
    for i:= range in{
        out <- i*i
    }
    close(out)
}
func printer(in <- chan int)  {
    for i:= range in{
        fmt.Println(i)
    }
}
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

结果:
0
1
4
9
16
25
...
9604
9801
10000

Process finished with exit code 0

1.4.8通道总结

19.Go语言基础之并发

1.5worker pool(go routine池)

在工作中,我们通常会使用可以指定启动的goroutine数量-worker pool 模式,控制go routine的数量,防止go routine泄露和暴涨。
一个简单的work pool 示例代码如下:
package main

import (
    "fmt"
    "time"
)

func worker(id int,jobs <- chan int,results chan <- int )  {
    for j:= range jobs{
        fmt.Printf("worker:%d start job:%d\n", id, j)
        time.Sleep(time.Second)
        fmt.Printf("worker:%d end job:%d\n", id, j)
        results <- j * 2
    }
}
func main() {
    jobs := make(chan int,100)
    results := make(chan int,100)

    //开启3个goroutine
    for w:=1;w<=3;w++{
        go worker(w,jobs,results)
    }
    //5个任务
    for j:=1;j<=5;j++{
        jobs <- j
    }
    close(jobs)
    // 输出结果
    for a := 1; a <= 5; a++ {
        <-results
    }
}

结果:
worker:1 start job:2
worker:3 start job:1
worker:2 start job:3
worker:1 end job:2
worker:3 end job:1
worker:3 start job:4
worker:2 end job:3
worker:1 start job:5
worker:1 end job:5
worker:3 end job:4

Process finished with exit code 0

1.6select多路复用

在某些场景下,我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。
//可以使用遍历方式,实现同时从多个通道中获取数据
package main

import (
    "fmt"
)

var ch1 chan int
var ch2 chan int

func main() {
    ch1 = make(chan int, 100)
    ch2 = make(chan int, 100)

    go func() {
        ch1 <- 10
        close(ch1)
    }()
    go func() {
        ch2 <- 11
        close(ch2)
    }()

    for{
        //从ch1接收值
        c1,ok := <- ch1
        if !ok{
            fmt.Println("ch1数据取完了")

        }
        if c1!=0{
            fmt.Println(c1)
        }

        //从ch2接收值
        c2,ok := <- ch2
        if !ok{
            fmt.Println("ch2数据取完了")
            break
        }
        fmt.Println(c2)
    }
    fmt.Println("操作完成!")
}

结果:
10
11
ch1数据取完了
ch2数据取完了
操作完成!

Process finished with exit code 0
//可以使用goroutine实现同时从多个通道中接收数据
package main

import (
    "fmt"
    "sync"
)
var wg sync.WaitGroup
var ch1 chan int
var ch2 chan int

func getFromCh1()  {
    defer wg.Done()
    c1 := <- ch1
    fmt.Println(c1)
}
func getFromCh2()  {
    defer wg.Done()
    c2 := <- ch2
    fmt.Println(c2)
}
func main() {
    ch1 = make(chan int, 100)
    ch2 = make(chan int, 100)
    wg.Add(2)
    go getFromCh1()
    go getFromCh2()
    go func() {
        ch1 <- 10
    }()
    go func() {
        ch2 <- 11
    }()

    wg.Wait()
    fmt.Println("操作完成!")
}

结果:
11
10
操作完成!

Process finished with exit code 0
使用select关键字实现多个通道接收值的需求。
select的使用类似于switch语句,他有一系列case分支和一个默认分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case对应的语句。
格式如下:
select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默认操作
}

select语句能提高代码的可读性。
1.可处理一个或多个channel的发送/接收操作。
2.如果多个case同时满足,select会随机选择一个。
3.对于没有case的select,会一直等待,可用于阻塞main函数。
package main

import "fmt"

func main() {
    ch1 := make(chan int,1)
    ch2 := make(chan int,1)
    for i:=0;i<10;i++{
        select {
        case x1 := <- ch1:
            fmt.Printf("循环第%d次,ch1取出%d:\n",i,x1)
        case ch1 <- i:
            fmt.Printf("循环第%d次,ch1存入:%d\n",i,i)
        case x2 := <- ch2:
            fmt.Printf("循环第%d次,ch2取出%d:\n",i,x2)
        case ch2 <- i:
            fmt.Printf("循环第%d次,ch2存入:%d\n",i,i)
        }
    }

}

结果:
循环第0次,ch1存入:0
循环第1次,ch1取出0:
循环第2次,ch1存入:2
循环第3次,ch1取出2:
循环第4次,ch1存入:4
循环第5次,ch1取出4:
循环第6次,ch1存入:6
循环第7次,ch2存入:7
循环第8次,ch2取出7:
循环第9次,ch1取出6:

Process finished with exit code 0

//上面的结果完美的体现出了 多个case同时满足时,select会随机选择一个执行。

1.7并发安全和锁

有时候在Go代码中会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竟态问题(数据竟态)。
package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup
var x int64
func add()  {
    for i:=0;i<5000;i++{
        x=x+1
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)

}

结果1:
7281

Process finished with exit code 0

结果2:
10000

Process finished with exit code 0

//上面的代码中,我们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后结果与期待的不符。

1.8互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。
Go语言中使用sync包的Mutex类型来实现互斥锁。使用互斥锁来修复上面代码的问题:
package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup
var lock sync.Mutex
var x int64
func add()  {
    for i:=0;i<5000;i++{
        lock.Lock()//加锁
        x=x+1
        lock.Unlock()//解锁
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)

}

结果:
10000

Process finished with exit code 0

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;
当互斥锁释放后,等待的goroutine才能获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒策略是随机的。

1.9读写互斥锁

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的。
这种场景下使用读写锁时更好的一种选择。

读写锁分为两种:读锁和写锁。
当一个goroutine获取读锁后,其他的goroutine可以继续获取读锁,获取写锁会等待;
当一个goroutine获取写锁后,其他的goroutine获取读锁,写锁都会等待。
读写锁适合读多写少的场景,如果读写操作量差别不大,读写锁的优势就发挥不出来了。
package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    lock.Lock()   // 加互斥锁
    //rwlock.Lock() // 加写锁
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
    //rwlock.Unlock()                   // 解写锁
    lock.Unlock()                     // 解互斥锁
    wg.Done()
}

func read() {
    lock.Lock()                  // 加互斥锁
    //rwlock.RLock()               // 加读锁
    time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
    //rwlock.RUnlock()             // 解读锁
    lock.Unlock()                // 解互斥锁
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

互斥锁的时间:
1.404974744s

Process finished with exit code 0

读写互斥锁的时间:
109.371376ms

Process finished with exit code 0

1.10sync.WaitGroup

在代码中生硬的使用time.Sleep是不合适的,Go语言中可以使用sync.WaitGoup来实现并发任务的同步。
sync.WaitGroup有以下几个方法:

19.Go语言基础之并发

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。
例如
我们启动了N个并发任务时,就使用Add(N)将计数器值增加N。
每个任务完成时,调用Done(),会将计数器减1。
调用Wait()来等待并发任务执行完。
当计数器值为0时,表示所有并发任务已经完成。

sync.WaitGroup是一个结构体,传递的时候要传递指针。
var wg sync.WaitGroup

func hello() {
    defer wg.Done()
    fmt.Println("Hello Goroutine!")
}
func main() {
    wg.Add(1)
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    wg.Wait()
}

1.11sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如:只加载一次配置文件、只关闭一次通道等。
Go语言中的sync包提供了一个针对只执行一次场景的解决方案-sync.Once。
sync.Onece只有一个Do方法,
func (o *Once) Do(f func()) {}

如果要执行的函数f需要传递参数,需要搭配闭包来使用。

1.11.1加载配置文件示例

延迟一个开销很大的初始化操作到真正用到它的时候在执行是一个很好地实践。
因为预先初始化一个变量(比如在Init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。
看下面的例子:
var icons map[string]image.Image

func loadIcons() {
    icons = map[string]image.Image{
        "left":  loadIcon("left.png"),
        "up":    loadIcon("up.png"),
        "right": loadIcon("right.png"),
        "down":  loadIcon("down.png"),
    }
}

// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
    if icons == nil {
        loadIcons()
    }
    return icons[name]
}

多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU在保证每个goroutine都满足串行一致的基础上,自由的重排访问内存的顺序。
loadIcons函数可能被重排为以下结果:
func loadIcons() {
    icons = make(map[string]image.Image)
    icons["left"] = loadIcon("left.png")
    icons["up"] = loadIcon("up.png")
    icons["right"] = loadIcon("right.png")
    icons["down"] = loadIcon("down.png")
}
在这种情况下就会出现,即使判断了icons不是nil,也不意味着变量初始化完成了。
考虑到这种情况,我们能想到的办法一:可以添加互斥锁;方法二:使用sync.Once。
var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
    icons = map[string]image.Image{
        "left":  loadIcon("left.png"),
        "up":    loadIcon("up.png"),
        "right": loadIcon("right.png"),
        "down":  loadIcon("down.png"),
    }
}

// Icon 是并发安全的
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}

1.11.2并发安全的单例模式


package singleton

import (
    "sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}
sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。
这样设计就能保证初始化操作的时候是并发安全的,并且初始化操作也不会被执行多次。

1.12sync.Map

Go语言中内置的map不是并发安全的。

package main

import (
    "fmt"
    "strconv"
    "sync"
)

var m = make(map[string]int)

func get(key string)int  {
    return m[key]
}
func set(key string,value int)  {
    m[key] = value
}
func main() {
    wg := sync.WaitGroup{}
    for i:=0;i<20;i++{
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(i)
            set(key,i)
            fmt.Printf("k=:%v,v:=%v\n", key, get(key))
            wg.Done()
        }(i)
    }
    wg.Wait()
}

结果:
fatal error: concurrent map writes

goroutine 6 [running]:

        /usr/local/go/src/runtime/panic.go:617 +0x72 fp=0xc0000326b8 sp=0xc000032688 pc=0x1028282
runtime.mapassign_faststr(0x10aca40, 0xc000060180, 0x10cd3a2, 0x1, 0x0)
        /usr/local/go/src/runtime/map_faststr.go:211 +0x42a fp=0xc000032720 sp=0xc0000326b8 pc=0x101031a
main.set(...)
        /Users/tongchao/Desktop/gopath/src/test/test.go:15
main.main.func1(0xc000014080, 0xc000014070, 0x2)
        /Users/tongchao/Desktop/gopath/src/test/test.go:23 +0x8e fp=0xc0000327c8 sp=0xc000032720 pc=0x1094fee
runtime.goexit()
        /usr/local/go/src/runtime/asm_amd64.s:1337 +0x1 fp=0xc0000327d0 sp=0xc0000327c8 pc=0x1051451
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 1 [runnable]:
sync.(*WaitGroup).Add(0xc000014070, 0x1)
        /usr/local/go/src/sync/waitgroup.go:53 +0x13c
main.main()
        /Users/tongchao/Desktop/gopath/src/test/test.go:20 +0x6e

goroutine 4 [runnable]:
main.get(...)
        /Users/tongchao/Desktop/gopath/src/test/test.go:12
main.main.func1(0xc000014080, 0xc000014070, 0x0)
        /Users/tongchao/Desktop/gopath/src/test/test.go:24 +0xcc
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 5 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x1)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 7 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x3)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 8 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x4)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 9 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x5)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 10 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x6)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 11 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x7)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 12 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x8)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 13 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0x9)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 14 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0xa)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 15 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0xb)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

goroutine 16 [runnable]:
main.main.func1(0xc000014080, 0xc000014070, 0xc)
        /Users/tongchao/Desktop/gopath/src/test/test.go:21
created by main.main
        /Users/tongchao/Desktop/gopath/src/test/test.go:21 +0xa2

Process finished with exit code 2
Go语言的sync包中提供了一开箱即用的并发安全的map-sync.Map。
开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。
同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。
package main

import (
    "fmt"
    "strconv"
    "sync"
)
var m = sync.Map{}

func main() {
    wg := sync.WaitGroup{}
    for i:=0;i<20;i++{
        wg.Add(1)
        go func() {
            key := strconv.Itoa(i)
            m.Store(key,i)
            value,_ := m.Load(key)
            fmt.Printf("k=:%v,v:=%v\n", key, value)
            wg.Done()
        }()
    }
    wg.Wait()
}

结果:
k=:8,v:=8
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:8,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:8,v:=8
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20
k=:20,v:=20

Process finished with exit code 0

1.13原子操作

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。
针对"基本数据类型",我们可以使用原子操作来保证并发安全,因为原子操作是Go 语言提供的方法,在用户态就可以完成,因此性能比加锁操作更好。
Go语言中原子操作由内置的标准库sync/atomic提供。
atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

19.Go语言基础之并发
19.Go语言基础之并发

一个示例来比较下互斥锁和原子操作的性能。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type Counter interface {
    Inc()
    Load() int64
}

// 普通版
type CommonCounter struct {
    counter int64
}

func (c CommonCounter) Inc() {
    c.counter++
}

func (c CommonCounter) Load() int64 {
    return c.counter
}

// 互斥锁版
type MutexCounter struct {
    counter int64
    lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
    m.lock.Lock()
    defer m.lock.Unlock()
    m.counter++
}

func (m *MutexCounter) Load() int64 {
    m.lock.Lock()
    defer m.lock.Unlock()
    return m.counter
}

// 原子操作版
type AtomicCounter struct {
    counter int64
}

func (a *AtomicCounter) Inc() {
    atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
    return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
    var wg sync.WaitGroup
    start := time.Now()
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            c.Inc()
            wg.Done()
        }()
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(c.Load(), end.Sub(start))
}

func main() {
    c1 := CommonCounter{} // 非并发安全
    test(c1)
    c2 := MutexCounter{} // 使用互斥锁实现并发安全
    test(&c2)
    c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
    test(&c3)
}

结果:
0 1.099595ms
1000 907.118µs
1000 456.326µs

Process finished with exit code 0

有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:DevOperater

查看原文:19.Go语言基础之并发

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1611 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传