Golang 中的回环栅栏

alfred-zhong · 2018-03-30 22:38:20 · 2242 次点击 · 预计阅读时间 6 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2018-03-30 22:38:20 的文章,其中的信息可能已经有所发展或是发生改变。

这篇文章中我们会研究一个基本的同步问题。并使用 Golang 中原生的 Buffered Channels 来为这个问题找到一个简洁的解决方案。

问题

现在假设我们我们有一堆 workers。为了充分发挥 CPU 多核的能力,我们让每个 worker 运行在单独的 goroutine 中:

for i := 0; i < workers; i++ {
    go worker()
}

worker 需要做一系列的工作 job:

func worker() {
    for i := 0; i < 3; i++ {
        job()
    }
}

每次 job 前都需要在所有的 worker 上同步地先进行一次准备 bootstrap 的过程。也就是说,每个 worker 在执行 job 前,需要等待所有其他 worker 都完成 bootstrap 的准备。

func worker() {
    for i := 0; i < 3; i++ {
        bootstrap()
        # wait for other workers to bootstrap
        job()
    }
}

还有件事。如果至少有一个 worker 仍在执行 job,则所有 worker 的下一次的 bootstrap 都不能开始。换句话说,每次的 bootstrap 都是为紧接着的 job 部分做准备的,所以不能在上一次的 job 尚未结束之前就开始下一次的 bootstrap:

func worker() {
    for i := 0; i < 3; i++ {
        # wait for all workers to finish previous loop
        bootstrap()
        # wait for other workers to bootstrap
        job()
    }
}

我们的 bootstrap 部分内容为增长一个共享的计数器。job 部分为等待一段时间并打印计数器的内容:

type counter struct {
    c int
    sync.Mutex
}

func (c *counter) Incr() {
    c.Lock()
    c.c += 1
    c.Unlock()
}

func (c *counter) Get() (res int) {
    c.Lock()
    res = c.c
    c.Unlock()
    return
}

func worker(c *counter) {
    for i := 0; i < 3; i++ {
        # wait for all workers to finish previous loop
        c.Incr()
        # wait for other workers to do bootstrap
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
        fmt.Println(c.Get())
    }
}

我们的目标是编写一个程序并以下列情形打印数字:

  • 只有 n, 2*n 和 3*n 的数字被打印(因为每个 worker 循环 3 次)
  • 每次打印的数字不会比之前的数字小
  • 每个数字会被打印 n

如果有 3 个 worker,则期望输出如下:

3
3
3
6
6
6
9
9
9

2 个 worker 的期望输出:

2
2
4
4
6
6

2 个 worker 不合法的输出可能会是这样:

2
4
2
4
6
6

想一想可能的解决办法。下面几行我故意留空并不急着给大家答案。

.

.

.

.

.

.

.

.

.

.

.

.

.

.

workers 会用一个名为回环栅栏(reusable barrier,类似 Java 中的 CyclicBarrier)的数据结构来实现同步。每个栅栏包含 2 扇门。第 1 扇门放置于增长计数器之前,一开始是关闭的。关闭的门意味着到达这扇门的 worker 会被阻塞。一旦所有的 worker 到达了第 1 扇门:

  • 第 2 扇门(放置于增长计数器之后)会关闭
  • 第 1 扇门开启

所有的计数器会通过并增长计数器,接着成功到达第 2 扇门。一旦所有的 worker 都到达了第 2 扇门:

  • 第 1 扇门关闭
  • 第 2 扇门开启

worker 此时可以开始执行 job 并接着在下一次循环中再次抵达第 1 扇门。循环再次开始。整个过程如图所示:


     1st gate     2nd gate
        v             v
-w1-->  |             |
 --w2-->|
--w3--> |          
 --w4-->|             
-w5-->  |             |
 --w1-->|             |
 --w2-->|             
 --w3-->|
 --w4-->|             
 --w5-->|             |
        |      --w1-->|
               --w2-->|
            --w3-->   |
             --w4-->  |
        |      --w5-->|
        |      --w1-->|
               --w2-->|
               --w3-->|
               --w4-->|
        |      --w5-->|
--w1--> |             |
        |              --w2-->
 --w3-->|
--w4--> |             
        |             | --w5-->

下文有两种解决方案,实现略有不同。下面的代码可用来对两种方案进行测试:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
    // Set this import spec to point
    // to the copy of one of proposed
    // solutions.
    "path/to/package/barrier"
)

func init() {
    rand.Seed(time.Now().Unix())
}

type counter struct {
    c int
    sync.Mutex
}

func (c *counter) Incr() {
    c.Lock()
    c.c += 1
    c.Unlock()
}

func (c *counter) Get() (res int) {
    c.Lock()
    res = c.c
    c.Unlock()
    return
}

func worker(c *counter, br *barrier.Barrier, wg *sync.WaitGroup) {
    for i := 0; i < 3; i++ {
        br.Before()
        c.Incr()
        br.After()
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
        fmt.Println(c.Get())
    }
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    workers := 3
    br := barrier.New(workers)
    c := counter{}
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go worker(&c, br, &wg)
    }
    wg.Wait()
}

栅栏必须实现 BeforeAfter 两个方法,各自对应第 1 和第 2 扇门。

解决方案 1

我们需要容量为 1 的 buffered channel:

ch := make(chan int, 1)

门的逻辑可以通过先从 channel 中接收数据,然后再次向其中发送数据来实现:

<-ch
ch <- 1

如果 channel 中包含元素数量为 1,则表示门是开的。它会让一个 worker 通过并往 channel 中放入新的元素以使另外一个 worker 通过,依此类推。

如果 channel 中没有元素了则表示门关闭了。接着 worker 从 channel 中接收元素就会被阻塞。

// github.com/mlowicki/barrier
package barrier 

import "sync"

type Barrier struct {
    c      int
    n      int
    m      sync.Mutex
    before chan int
    after  chan int
}

func New(n int) *Barrier {
    b := Barrier{
        n:      n,
        before: make(chan int, 1),
        after:  make(chan int, 1),
    }
    // close 1st gate
    b.after <- 1  
    return &b
}

func (b *Barrier) Before() {
    b.m.Lock()
    b.c += 1
    if b.c == b.n {
        // close 2nd gate
        <-b.after
        // open 1st gate
        b.before <- 1
    }
    b.m.Unlock()
    <-b.before
    b.before <- 1
}

func (b *Barrier) After() {
    b.m.Lock()
    b.c -= 1
    if b.c == 0 {
       // close 1st gate
       <-b.before
       // open 2st gate
       b.after <- 1
    }
    b.m.Unlock()
    <-b.after
    b.after <- 1
}

解决方案 2

这个方案使用了容量和 worker 数量 n 相等的 buffered channel。现在我们不再让 worker 一个接一个地依次通过,而是在 channel 中放入 n 个元素来使所有的 worker 一次性通过:

// github.com/mlowicki/barrier2
package barrier

import "sync"

type Barrier struct {
    c      int
    n      int
    m      sync.Mutex
    before chan int
    after  chan int
}

func New(n int) *Barrier {
    b := Barrier{
        n:      n,
        before: make(chan int, n),
        after:  make(chan int, n),
    }
    return &b
}

func (b *Barrier) Before() {
    b.m.Lock()
    b.c += 1
    if b.c == b.n {
        // open 2nd gate
        for i := 0; i < b.n; i++ {
            b.before <- 1
        }
    }
    b.m.Unlock()
    <-b.before
}

func (b *Barrier) After() {
    b.m.Lock()
    b.c -= 1
    if b.c == 0 {
        // open 1st gate
        for i := 0; i < b.n; i++ {
            b.after <- 1
        }
    }
    b.m.Unlock()
    <-b.after
}

参考

  • "The Little Book of Semaphores" --- Allen B. Downey

via: https://medium.com/golangspec/reusable-barriers-in-golang-156db1f75d0b

作者:Michał Łowicki  译者:alfred-zhong  校对:rxcai

本文由 GCTT 原创编译,Go语言中文网 荣誉推出


有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2242 次点击  
加入收藏 微博
被以下专栏收入,发现更多相似内容
2 回复  |  直到 2018-04-04 02:54:54
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传