Go并发

普朗tong · · 1164 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

并发和并行

Go是并发语言,而不是并行语言。(Go is a concurrent language and not a parallel one.)

并发(Concurrency)的关键在于你有处理多个任务的能力,不一定要同时
并行(Parallellism)的关键是你有同时处理多个任务的能力
最关键的点就是:是否是『同时』

下面举个例子来说明:
假设我们正在编写一个web浏览器。web浏览器有各种组件。其中两个是web页面呈现区域和下载文件从internet下载的下载器。假设我们以这样的方式构建了浏览器的代码,这样每个组件都可以独立地执行。
当这个浏览器运行在单个核处理器中时,处理器将在浏览器的两个组件之间进行上下文切换。它可能会下载一个文件一段时间,然后它可能会切换到呈现用户请求的网页的html。这就是所谓的并发性。并发进程从不同的时间点开始,它们的执行周期重叠。在这种情况下,下载和呈现从不同的时间点开始,它们的执行重叠。
假设同一浏览器运行在多核处理器上。在这种情况下,文件下载组件和HTML呈现组件可能同时在不同的内核中运行。这就是所谓的并行性

并发与并行

需要说明的是:并行性Parallelism不会总是导致更快的执行时间,这是因为并行运行的组件可能需要相互通信。

进程,线程和协程

  1. 进程(Process)
    进程是可并发执行的程序在某个数据集合上的一次计算活动,也是操作系统进行资源分配和调度的基本单位。进程一般由程序、数据集、进程控制块三部分组成。进程的局限是创建、撤销和切换的开销比较大。

  2. 线程(Thread)
    线程是操作系统进程中能够并发执行的实体,是处理器调度和分派的基本单位。每个进程内可包含多个可并发执行的线程。线程的优点是减小了程序并发执行时的开销,提高了操作系统的并发性能,缺点是线程自己基本不拥有系统资源,只拥有少量必不可少的资源:程序计数器、一组寄存器、栈。同属一个进程的线程共享进程所拥有的主存空间和资源。

  3. 协程(Coroutine)
    协程是一种用户态的轻量级线程,又称微线程,英文名Coroutine,协程的调度完全由用户控制。人们通常将协程和子程序(函数)比较着理解。 子程序调用总是一个入口,一次返回,一旦退出即完成了子程序的执行。

    Go语言对于并发的实现是靠协程:Goroutine。

Go的并发调度:G-P-M模型

在操作系统提供的内核线程之上,Go搭建了一个特有的两级线程模型。goroutine机制实现了M : N的线程模型,goroutine机制是协程(coroutine)的一种实现,golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。

1. 调度器如何工作

// 用go关键字加上一个函数(这里用了匿名函数)
// 调用就做到了在一个新的“线程”并发执行任务
go func() { 
// do something in one new goroutine
}()

调度器的实现主要包括4个结构:M,P,G,Sched,前三个定义在runtime.h中,Sched定义在proc.c中。

* Sched结构就是调度器,它维护有存储M和G的队列以及调度器的一些状态信息等。
* M结构是Machine,系统线程,它由操作系统管理的,goroutine就是跑在M之上的;M是一个很大的结构,里面维护小对象内存cache(mcache)、当前执行的goroutine、随机数发生器等等非常多的信息。
* P结构是Processor,处理器,它的主要用途就是用来执行goroutine的,它维护了一个goroutine队列,即runqueue。Processor是让我们从N:1调度到M:N调度的重要部分。
* G是goroutine实现的核心结构,它包含了栈,指令指针,以及其他对调度goroutine很重要的信息,例如其阻塞的channel。

说明:Processor的数量是在启动时被设置为环境变量GOMAXPROCS的值,也可以通过运行时调用函数runtime.GOMAXPROCS()进行设置。Processor数量固定意味着任意时刻只有GOMAXPROCS个线程在运行go代码。

在单核处理器的场景下,所有goroutine运行在同一个M系统线程中,每一个M系统线程维护一个Processor,任何时刻,一个Processor中只有一个goroutine,其他goroutine在runqueue中等待。一个goroutine运行完自己的时间片后,让出上下文,回到runqueue中。 多核处理器的场景下,为了运行goroutines,每个M系统线程会持有一个Processor。


2. 线程阻塞
正常情况下,Go调度器会按照上面的流程进行调度,但当发生阻塞时,比如Goroutine进行系统调用,Go调度器会再创建一个线程(或者从线程池取),当前的M线程放弃了它的Processor,P转到新的线程中去运行。

3. runqueue执行完成
当其中一个Processor的runqueue为空,没有goroutine可以调度。它会从另外一个上下文偷取一半的goroutine。

Go原生支持并发:Go调度器负责将并发任务分配到不同的内核线程上运行,然后内核调度器接管内核线程在CPU上的执行与调度。


共享资源安全问题

如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时 读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。如果这种竞争状态处理不当,可能会出现安全问题。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    count int32
    wg    sync.WaitGroup
)

func main() {
    wg.Add(2)
    go incCount()
    go incCount()
    wg.Wait()
    fmt.Println(count)
}

func incCount() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
        value := count
        runtime.Gosched()
        value++
        count = value
    }
}
//输出:2
竞争状态下程序行为的分析

锁住共享资源

  1. 原子函数
    原子函数能够以很底层的加锁机制来同步访问整型变量和指针。
    sync/atomic
  2. 互斥锁
    互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以 执行这个临界区代码。
package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    count int32
    wg    sync.WaitGroup//声明互斥锁
    mutex sync.Mutex
)

func main() {
    wg.Add(2)
    go incCount()
    go incCount()
    wg.Wait()
    fmt.Println(count)
}

func incCount() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
        mutex.Lock()//临界区上锁
        value := count
        runtime.Gosched()
        value++
        count = value
        mutex.Unlock()//解锁
    }
}

通道

使用通道,通过发送和接收需要共享的资源,在 goroutine 之间做同步。声明通道时,需要指定将要被共享的数据的类型。通道分为有缓冲通道和无缓冲通道。

// 无缓冲的整型通道
unbuffered := make(chan int)
// 有缓冲的字符串通道
buffered := make(chan string, 10)
//定义单向管道
var send chan<- int //只能发送
var receive <-chan int //只能接收
// 有缓冲的字符串通道
buffered := make(chan string, 10)
// 通过通道发送一个字符串 
buffered <- "Gopher"
// 从通道接收一个字符串
value := <-buffered

对于无缓冲通道来说,双方必须都处于就绪状态,否则会阻塞。

package main

import (
    "sync"
    "fmt"
    "time"
)

var wg sync.WaitGroup

func main() {

    baton := make(chan int)

    wg.Add(1)

    go Runner(baton)

    baton <- 1

    wg.Wait()

}

func Runner(baton chan int) {

    var newRunner int

    runner := <-baton

    fmt.Printf("Runner %d running with baton\n", runner)

    if runner != 4 {
        newRunner = runner + 1
        fmt.Printf("Runner %d to the line\n", runner)
        go Runner(baton)
    }

    time.Sleep(100 * time.Millisecond)

    if runner == 4 {
        fmt.Printf("Runner %d finished race over!", runner)
        wg.Done()
        return
    }

    fmt.Printf("Runner %d exchange with Runner %d\n", runner, newRunner)

    baton <- newRunner
}

对于有缓冲通道,当通道关闭后,仍然可以从通道中读取为读取完的数据,全部读取完毕后,最后读到的是通道上数据类型的零值。不可以写,会诱发panic。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

const (
    numberGoroutines = 4
    taskLoad         = 10
)

var wg sync.WaitGroup

func init() {

    rand.Seed(time.Now().Unix())
}

func main() {

    tasks := make(chan string, taskLoad)

    wg.Add(numberGoroutines)
    for gr := 1; gr <= numberGoroutines; gr++ {
        go worker(tasks, gr)
    }

    for post := 1; post <= taskLoad; post++ {

        tasks <- fmt.Sprintf("Task : %d", post)
    }

    close(tasks)

    wg.Wait()

}

func worker(tasks chan string, worker int) {

    for {

        task, ok := <-tasks
        if !ok {

            fmt.Printf("Worker: %d : Started %s\n", worker, task)

            sleep := rand.Int63n(100)
            time.Sleep(time.Duration(sleep) * time.Millisecond)

            fmt.Printf("Worker: %d : Completed %s\n", worker, task)
        }
    }
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:普朗tong

查看原文:Go并发

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1164 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传