手把手教你用go语言实现生产者消费者模式

oYto · · 212601 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

> 欢迎大家到我的博客浏览 <a href="https://www.yinkai.cc/post/f07fe1e9850811ed7745690013acaace">YinKai 's Blog | 手把手教你用go语言实现生产者消费者模式</a> ​ 本篇文章会从生产者消费者模式的定义、特点、流程等方面为大家展开介绍,并带大家手把手来实现一下生产者消费者模式。<!--more--> ### 一、 简介 ​ 生产者消费者模式是一种并发设计模式,用于解决多线程环境下生产者和消费者之间的协作和数据共享问题。在这个模式中,有两种不同的角色:生产者和消费者,它们共同操作一个共享的缓冲区,以实现线程间的安全通信。 ​ 它的应用场景和优点如下: 1. **异步任务处理**:在异步编程中,生产者可以生成异步任务,而消费者负责处理这些任务。生产者消费者模式可以有效地协调异步任务的生成和处理,提高系统的响应速度。 2. **缓冲区处理:**当生产者和消费者的处理速度不一致时,通过引入缓冲区,可以平衡两者之间的速度差异。生产者将数据放入缓冲区,而消费者从缓冲区中获取数据,使两者能够独立运行,提高系统的效率。 3. **任务调度:**在任务调度系统中,生产者可以生成需要执行的任务,而消费者负责执行这些任务。通过生产者消费者模式,可以灵活地管理和调度任务,实现任务的分发和执行的解耦。 4. **消息队列:** 生产者消费者模式常用于消息队列的实现。生产者向队列中发送消息,而消费者从队列中获取消息进行处理。这种模式使得消息的生成和处理能够异步进行,提高了系统的可伸缩性和可维护性。 5. **解耦生产者和消费者:** 生产者和消费者之间的解耦使得系统更加灵活和可维护。可以独立地修改和扩展生产者和消费者的实现,而不影响整个系统的稳定性。 ### 二、角色 ​ 在生产者消费者模式中有三个角色,分别是生产者、消费者和缓冲区,下面为大家分别介绍这两个角色的职责和特点: ##### 生产者 ​ 生产者主要的职责是生成数据并将数据放入共享的缓冲区,并在缓冲区已满时进行等待。 ​ 它的特点是: 1. **独立运行**:可以以自己的速度生成数据,而不必等待消费者的处理。 2. **数据生成**:主要关注数据的生成和放置,不涉及具体的数据处理逻辑,不关心数据的最终用途。 3. **可能阻塞**:当缓冲区已满时,生产者可能会被阻塞,以此确保生产者和消费者之间的同步。 4. **任务分配:**生产者可以根据需求,将生成的数据进行分配和调度,将数据分发给不同的消费者进行处理。 ##### 消费者 ​ 消费者主要的职责是从共享的缓冲区中获取数据并进行处理,并在缓冲区为空时进行等待。 ​ 它的特点是: 1. **独立运行:** 它可以以自己的速度从缓冲区中获取数据,而不必等待生产者的生成。 2. **数据处理:** 主要关注对获取的数据的处理,而不涉及数据的生成过程。 3. **可能阻塞:** 当缓冲区为空时,消费者可能会被阻塞,这确保了生产者和消费者之间的同步。 4. **任务执行:** 消费者可能负责实际执行任务的逻辑,如处理消息、执行计算等,取决于具体应用场景。 ##### 缓冲区 ​ 缓冲区在生产者消费者模式中起到了关键的作用,其主要作用是作为生产者和消费者之间的中介,用于存储生产者生成的数据,以便消费者能够安全、有序地获取这些数据。 ​ 缓冲区相当于提供了一个同步点,使得生产者和消费者能够协调它们的操作。缓冲区提供一定量的数据,生产者生成数据放入缓冲区,消费者从缓冲区获取数据,两者之间通过缓冲区进行间接通信,不需要直接依赖对方的状态,这样可以使得两者之间可以异步操作,而不会导致数据丢失或不一致。 ​ 它的重要性如下: 1. **防止竞态条件:** 避免了生产者和消费者之间的竞态条件,确保了在多线程环境中的数据访问的正确性。 2. **提高系统吞吐量:** 缓冲区的使用可以提高系统的吞吐量,使得生产者和消费者能够以各自的速度进行操作,而不会互相阻塞。 3. **减少资源竞争:** 缓冲区作为共享的数据结构,通过合适的同步机制,减少了对共享资源的竞争,提高了系统的效率。 4. **增加系统灵活性:** 缓冲区的引入使得系统更加灵活,可以调整缓冲区的大小以满足不同场景的需求,同时提供了一个中介层,使得系统的不同部分能够独立演化而不影响整体结构。 ### 三、基本流程 ##### 生产者将数据放入缓冲区的步骤 ​ 生产者将数据放入缓冲区的过程包括获取互斥锁,检查缓冲区状态,将生成的数据放入缓冲区,然后释放互斥锁。 ##### 消费者从缓冲区获取数据的步骤 ​ 消费者从缓冲区获取数据的过程包括获取互斥锁,检查缓冲区状态,获取数据进行处理,然后释放互斥锁。 ​ 可能有人会问,“ 为什么使用缓冲区,而不是直接生产者和消费者直接通信?” ​ 使用缓冲区的主要原因在于解耦和同步。缓冲区作为一个中介,提供了一个独立的数据存储空间,使得生产者和消费者可以独立运行,不需要即时通信。这种解耦性增加了系统的灵活性,允许生产者和消费者以各自的速度操作数据,而不必互相等待。同时,缓冲区通过同步机制确保了线程安全,防止了数据竞争和不一致性问题,从而提高了系统的稳定性和可维护性。 ### 四、手把手带大家实现 ​ 首先要明确,生产者消费者模式分为四种,分别是 一对一、一对多、多对一、多对多,对于不同的模式,都有细微的差距,下面一一展开讲解: ​ 在正式开始写代码之前,我们先写一个输出包,以便于后续打印需要的信息。我们在项目根目录下创建一个 `out` 目录,然后创建一个 `out.go` 文件,代码如下: ```go package out import "fmt" // Out 输出 type Out struct { data chan interface{} } // 单例模式 var out *Out // NewOut 初始化 func NewOut() *Out { if out == nil { out = &Out{ data: make(chan interface{}, 65535), // 这里必须设置缓冲区 } } return out } // Println out 的写入方法 func Println(i interface{}) { out.data <- i } // OutPut 将 out 内所有数据全部输出 func (o *Out) OutPut() { for { select { case i := <-o.data: fmt.Println(i) } } } ``` ##### 一对一 ​ 首先我们定义一个任务结构体,生产者生成的任务就是一个带有 `ID` 的任务,然后消费任务的逻辑就是将该任务的 `ID` 打印出来: ```go // Task 任务 type Task struct { ID int64 } // 消费任务 func (t *Task) run() { out.Println(t.ID) } ``` ​ 然后定义一个缓冲区,用于存放生产者生产的任务,这里采用 带缓存的`channel` 来做缓冲区,并且给生产者需要生产的任务数量赋值: ```go // 缓冲池 var taskCh = make(chan Task, 10) // 生产者需要生产的任务数量 const taskNum int64 = 10000 ``` ​ 接着写我们的生产者逻辑,因为是一对一,所以只有一个生产者,那么在该生产者生产完任务之后就可以将生产者通道关闭。需要注意的是,如果这里没有关闭的话,可能会导致后续消费者误以为还有任务在生产一直等待,导致死锁: ```go func producer(wo chan<- Task) { var i int64 for i = 1; i <= taskNum; i++ { t := Task{ ID: i, } wo <- t } // 单个生产者就可以直接关闭通道了,关闭后,消费者任然可以消费 close(wo) } ``` ​ 再来看消费者逻辑,消费者的话,直接用 `for - range` 的方式阻塞等待生产者生产任务即可,待生产者生产完成之后,会主动关闭通断,消费者消费完成之后,就会结束 `for - range` 循环了: ```go func consumer(ro <-chan Task) { for t := range ro { if t.ID != 0 { t.run() } } } ``` ​ 最后就是我们的执行函数,由于我们不知道什么时候生产者和消费者完成了自己负责的任务,于是我们通过 `sync.WaitGroup` 来作协程通知,以确保生产者任务生产完毕且消费者任务消费完毕: ```go func Exec() { wg := &sync.WaitGroup{} wg.Add(2) go func(wg *sync.WaitGroup) { defer wg.Done() producer(taskCh) }(wg) go func(wg *sync.WaitGroup) { defer wg.Done() consumer(taskCh) }(wg) wg.Wait() out.Println("执行成功") } ``` ​ 在使用 `sync.WaitGroup` 进行函数传参时需要注意,由于 Go 语言的函数传参是值传递,如果只传递的是值的话,在函数内部执行 `Done()` 操作是不会影响到函数外的计数器的数量的,所以如果要传参,就需要使用指针进行传递。 ###### 完整代码 ```go package one_one import ( "main/out" "sync" ) // Task 任务 type Task struct { ID int64 } // 消费任务 func (t *Task) run() { out.Println(t.ID) } /* 可能存在的问题 1、生产者通道未关闭,消费者未结束,wg.Wait()没有等待,导致死锁 2、wg 传参时,如果是值传递,可能会导致wg.Wait()没有被 Done 为零 */ // 缓冲池 var taskCh = make(chan Task, 10) // 生产者需要生产的任务数量 const taskNum int64 = 10000 // 一个生产者 func producer(wo chan<- Task) { var i int64 for i = 1; i <= taskNum; i++ { t := Task{ ID: i, } wo <- t } // 单个生产者就可以直接关闭通道了,关闭后,消费者任然可以消费 close(wo) } // 一个消费者 func consumer(ro <-chan Task) { for t := range ro { if t.ID != 0 { t.run() } } } func Exec() { wg := &sync.WaitGroup{} wg.Add(2) go func(wg *sync.WaitGroup) { defer wg.Done() producer(taskCh) }(wg) go func(wg *sync.WaitGroup) { defer wg.Done() consumer(taskCh) }(wg) wg.Wait() out.Println("执行成功") } ``` ##### 一对多 ​ 一对多的话,还是一个生产者去生产任务,但是多个消费者去消费任务,这里我们只需要在 “ 一对一 ” 的基础上修改 `Exec()` 函数,开多个消费者进行消费即可,消费逻辑也不需要进行修改。这里需要强调一点,由于 Go 语言中 `chennel` 是线程安全的,故这里多个消费者去竞争任务的时候,不会出现线程安全的问题,我们也不需要额外加锁去作兜底。 ​ 我们这里就通过任务的增量逐渐来开新的消费者去消费: ```go func Exec() { wg := &sync.WaitGroup{} wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() producer(taskCh) }(wg) var i int64 for i = 0; i < taskNum; i++ { if i%100 == 0 { // 根据任务增量来逐渐开新的消费者去消费 wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() consumer(taskCh) }(wg) } } wg.Wait() out.Println("执行成功") } ``` ​ 每 100 个任务,就开一个消费者去消费。这里并不是将 100 个任务分配给指定的消费者,而是多个消费者去缓冲区中竞争任务来处理执行。 ###### 完整代码 ```go package one_many import ( "main/out" "sync" ) // Task 任务 type Task struct { ID int64 } // 消费任务 func (t *Task) run() { out.Println(t.ID) } /* 可能存在的问题 1、channel 是线程安全的,多个消费者同时去消费不存在 数据竞争问题 2、wg 传参时,如果是值传递,可能会导致wg.Wait()没有被 Done 为零 */ // 缓冲池 var taskCh = make(chan Task, 10) // 生产者需要生产的任务数量 const taskNum int64 = 10000 // 一个生产者 func producer(wo chan<- Task) { var i int64 for i = 1; i <= taskNum; i++ { t := Task{ ID: i, } wo <- t } // 单个生产者就可以直接关闭通道了,关闭后,消费者任然可以消费 close(wo) } // 一个消费者 func consumer(ro <-chan Task) { for t := range ro { if t.ID != 0 { t.run() } } } func Exec() { wg := &sync.WaitGroup{} wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() producer(taskCh) }(wg) var i int64 for i = 0; i < taskNum; i++ { if i%100 == 0 { // 根据任务增量来逐渐开新的消费者去消费 wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() consumer(taskCh) }(wg) } } wg.Wait() out.Println("执行成功") } ``` ##### 多对一 ​ 多对一的话,就是需要多个生产者生产任务,而只有一个消费者去消费任务,所以这里的消费者逻辑不需要进行更改。而生产者逻辑,我们这里规定每个生产者需要生产的任务数量 `nums`,然后生产者逻辑就是:从当前任务编号开始生产,生产 `nums` 个,然后就可以停止生产了,如下: ```go // 多个生产者 func producer(wo chan<- Task, startNum int64, nums int64) { var i int64 for i = startNum; i < startNum+nums; i++ { t := Task{ ID: i, } wo <- t } } ``` ​ 同样,`Exec()`执行逻辑中,我们去做一个多对一的生产者消费者逻辑。我们限制每个生产者生产 nums 个任务,即每 nums 个任务开一个新的生产者去生产。同时为了保证生产者任务生产完毕,我们使用 `pwg.Add(1)` 在生产任务前将计数器 ++,在生产任务结束后,用`pwg.Done()` 将计数器 --。 ​ 还是一样的,为了保证生产者和消费者都完成了任务,我们使用 `wg.Add(1)` 在生产者和消费者在工作前将计数器 ++,在生产者和消费者完成工作后将计数器 --。 ```go func Exec() { // 保证生产者任务生产完毕 wg := &sync.WaitGroup{} // 保证生产者任务生产完毕后,将 channel 关闭 pwg := &sync.WaitGroup{} var i int64 wg.Add(1) for i = 0; i < taskNum; i += nums { if i >= taskNum { break } // 每个生产者生产 100 个任务 wg.Add(1) pwg.Add(1) // 问题2:参数传递 go func(i int64) { defer wg.Done() defer pwg.Done() producer(taskCh, i, nums) }(i) } go func() { defer wg.Done() consumer(taskCh) }() pwg.Wait() // 这里需要注意,问题 1 go close(taskCh) wg.Wait() out.Println("执行成功") } ``` ​ 这里需要注意的两个问题:一个是,在 `for` 循环中的变量 `i` 可能会存在内存共享的问题,因为在可能在本次循环中 `i` 的值为 `199`,但是在协程开始执行后,传入 `producer()` 函数的 `i` 的值就变成了 `200`,所以这里需要用参数将 `i` 的值传到对应的协程中。另一个问题是,在关闭通道 `close(taskch)` 的时候,这里可能会存在一个极小的时间差,可能会存在还有协程在往通道里面写数据,所以这里用 `go(close)` 会保险一点。 ###### 完整代码 ```go package many_one import ( "main/out" "sync" ) // Task 任务 type Task struct { ID int64 } // 消费任务 func (t *Task) run() { out.Println(t.ID) } /* 可能存在的问题 1、go close 去关闭channel,因为可能还有协程在向里面写数据,有极小的时间差 2、生产者在生产的时候,可能存在数据竞争问题 */ // 缓冲池 var taskCh = make(chan Task, 10) // 生产者需要生产的任务数量 const taskNum int64 = 10000 // 每个生产者生产的任务数量,100 const nums int64 = 100 // 多个生产者 func producer(wo chan<- Task, startNum int64, nums int64) { var i int64 for i = startNum; i < startNum+nums; i++ { t := Task{ ID: i, } wo <- t } } // 一个消费者 func consumer(ro <-chan Task) { for t := range ro { if t.ID != 0 { t.run() } } } func Exec() { // 保证生产者任务生产完毕 wg := &sync.WaitGroup{} // 保证生产者任务生产完毕后,将 channel 关闭 pwg := &sync.WaitGroup{} var i int64 wg.Add(1) for i = 0; i < taskNum; i += nums { if i >= taskNum { break } // 每个生产者生产 100 个任务 wg.Add(1) pwg.Add(1) // 问题2:参数传递 go func(i int64) { defer wg.Done() defer pwg.Done() producer(taskCh, i, nums) }(i) } go func() { defer wg.Done() consumer(taskCh) }() pwg.Wait() // 这里需要注意,问题 1 go close(taskCh) wg.Wait() out.Println("执行成功") } ``` ##### 多对多 ​ 多对多的话,就比较接近现实中的场景了。会有源源不断的生产者生产任务,就会有消费者不断地去消费任务,它们都不会主动退出,靠人为信号退出 goroutine。因此,我们需要先定义一个全局停止运行的信号: ```go // 停止运行的信号 var done = make(chan struct{}) ``` ​ 生产者由于是无限生产,那毫无疑问生产者逻辑是写在一个 `for` 循环内的,这里为了避免缓冲区满了,生产者因为阻塞而导致无法接收到 `done` 信号,我们配合 `select` 来实现: ```go func producer(wo chan<- Task, done chan struct{}) { var i int64 for { if i >= TaskNum { // 无限生产 i = 0 } i++ t := Task{ ID: i, } // 可以防止因为生产者阻塞,而导致关闭信号无法关闭 select { case wo <- t: case <-done: out.Println("生产者退出") return } } } ``` ​ 同样,我们的消费者逻辑肯定也是放在 `for` 循环中来写,并且也配合 `select` 来接收信号: ```go func consumer(ro <-chan Task, done chan struct{}) { for { select { case t := <-ro: if t.ID != 0 { t.run() } case <-done: // 这里如果直接退出的话,可能 channel 里面还有值没有被消费(有缓存区的情况) for t := range ro { // 生产者那边已经停止,消息不会再生产。消费者这里将所有消息消费后,就可以 退出了 if t.ID != 0 { t.run() } } out.Println("消费者退出") return } } } ``` ​ 在接收到 `done` 信号后,这里有一个小坑:可能此时缓冲区中还存在任务没有被消费。故我们应该在退出运行前,再消费执行一次消费逻辑,保证缓冲区中没有任务剩余。 ​ 执行函数的逻辑就很简单了,直接异步开多个生产者和消费者同时运行即可。这里也需要注意先关闭信号,再关闭通道,如果反过来了,就可能会导致向已关闭的 `channel` 内写入数据,会报异常。 ###### 完整代码 ```go package many_many import ( "main/out" "time" ) // Task 任务 type Task struct { ID int64 } // 消费任务 func (t *Task) run() { out.Println(t.ID) } /* 可能存在的问题 1、生产者 和 消费者 都不主动退出,靠信号退出 goroutine 2、源源不断地生产,消费者也不间断。 */ // 缓冲池 var taskCh = make(chan Task, 10) // 停止运行的信号 var done = make(chan struct{}) // TaskNum 生产者需要生产的任务数量 const TaskNum int64 = 10000 func producer(wo chan<- Task, done chan struct{}) { var i int64 for { if i >= TaskNum { // 无限生产 i = 0 } i++ t := Task{ ID: i, } // 可以防止因为生产者阻塞,而导致关闭信号无法关闭 select { case wo <- t: case <-done: out.Println("生产者退出") return } } } func consumer(ro <-chan Task, done chan struct{}) { for { select { case t := <-ro: if t.ID != 0 { t.run() } case <-done: // 这里如果直接退出的话,可能 channel 里面还有值没有被消费(有缓存区的情况) for t := range ro { // 生产者那边已经停止,消息不会再生产。消费者这里将所有消息消费后,就可以 退出了 if t.ID != 0 { t.run() } } out.Println("消费者退出") return } } } func Exec() { // 多个生产者 go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) // 多个消费者 go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done) time.Sleep(time.Second * 5) // 一定要先关闭 done,再关闭通道。防止向已关闭的 channel 写入数据,报异常 close(done) close(taskCh) out.Println("执行成功") } ``` ### 五、总结 ​ 生产者消费者模式的核心思想是通过**共享缓冲区实现生产者和消费者之间的解耦**,使得生产者生成数据并放入缓冲区,而消费者从缓冲区获取数据进行处理。 ​ 关键的实现要点包括同步机制,阻塞和唤醒机制,以及解耦生产者和消费者的直接依赖关系。这种模式通过平衡数据生成和处理的速度,提高了系统的灵活性和效率,适用于多线程环境下的异步数据交换。

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

212601 次点击  ∙  2 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传