> 欢迎大家到我的博客浏览 <a href="https://www.yinkai.cc/post/f07fe1e9850811ed7745690013acaace">YinKai 's Blog | 手把手教你用go语言实现生产者消费者模式</a>
本篇文章会从生产者消费者模式的定义、特点、流程等方面为大家展开介绍,并带大家手把手来实现一下生产者消费者模式。<!--more-->
### 一、 简介
生产者消费者模式是一种并发设计模式,用于解决多线程环境下生产者和消费者之间的协作和数据共享问题。在这个模式中,有两种不同的角色:生产者和消费者,它们共同操作一个共享的缓冲区,以实现线程间的安全通信。
它的应用场景和优点如下:
1. **异步任务处理**:在异步编程中,生产者可以生成异步任务,而消费者负责处理这些任务。生产者消费者模式可以有效地协调异步任务的生成和处理,提高系统的响应速度。
2. **缓冲区处理:**当生产者和消费者的处理速度不一致时,通过引入缓冲区,可以平衡两者之间的速度差异。生产者将数据放入缓冲区,而消费者从缓冲区中获取数据,使两者能够独立运行,提高系统的效率。
3. **任务调度:**在任务调度系统中,生产者可以生成需要执行的任务,而消费者负责执行这些任务。通过生产者消费者模式,可以灵活地管理和调度任务,实现任务的分发和执行的解耦。
4. **消息队列:** 生产者消费者模式常用于消息队列的实现。生产者向队列中发送消息,而消费者从队列中获取消息进行处理。这种模式使得消息的生成和处理能够异步进行,提高了系统的可伸缩性和可维护性。
5. **解耦生产者和消费者:** 生产者和消费者之间的解耦使得系统更加灵活和可维护。可以独立地修改和扩展生产者和消费者的实现,而不影响整个系统的稳定性。
### 二、角色
在生产者消费者模式中有三个角色,分别是生产者、消费者和缓冲区,下面为大家分别介绍这两个角色的职责和特点:
##### 生产者
生产者主要的职责是生成数据并将数据放入共享的缓冲区,并在缓冲区已满时进行等待。
它的特点是:
1. **独立运行**:可以以自己的速度生成数据,而不必等待消费者的处理。
2. **数据生成**:主要关注数据的生成和放置,不涉及具体的数据处理逻辑,不关心数据的最终用途。
3. **可能阻塞**:当缓冲区已满时,生产者可能会被阻塞,以此确保生产者和消费者之间的同步。
4. **任务分配:**生产者可以根据需求,将生成的数据进行分配和调度,将数据分发给不同的消费者进行处理。
##### 消费者
消费者主要的职责是从共享的缓冲区中获取数据并进行处理,并在缓冲区为空时进行等待。
它的特点是:
1. **独立运行:** 它可以以自己的速度从缓冲区中获取数据,而不必等待生产者的生成。
2. **数据处理:** 主要关注对获取的数据的处理,而不涉及数据的生成过程。
3. **可能阻塞:** 当缓冲区为空时,消费者可能会被阻塞,这确保了生产者和消费者之间的同步。
4. **任务执行:** 消费者可能负责实际执行任务的逻辑,如处理消息、执行计算等,取决于具体应用场景。
##### 缓冲区
缓冲区在生产者消费者模式中起到了关键的作用,其主要作用是作为生产者和消费者之间的中介,用于存储生产者生成的数据,以便消费者能够安全、有序地获取这些数据。
缓冲区相当于提供了一个同步点,使得生产者和消费者能够协调它们的操作。缓冲区提供一定量的数据,生产者生成数据放入缓冲区,消费者从缓冲区获取数据,两者之间通过缓冲区进行间接通信,不需要直接依赖对方的状态,这样可以使得两者之间可以异步操作,而不会导致数据丢失或不一致。
它的重要性如下:
1. **防止竞态条件:** 避免了生产者和消费者之间的竞态条件,确保了在多线程环境中的数据访问的正确性。
2. **提高系统吞吐量:** 缓冲区的使用可以提高系统的吞吐量,使得生产者和消费者能够以各自的速度进行操作,而不会互相阻塞。
3. **减少资源竞争:** 缓冲区作为共享的数据结构,通过合适的同步机制,减少了对共享资源的竞争,提高了系统的效率。
4. **增加系统灵活性:** 缓冲区的引入使得系统更加灵活,可以调整缓冲区的大小以满足不同场景的需求,同时提供了一个中介层,使得系统的不同部分能够独立演化而不影响整体结构。
### 三、基本流程
##### 生产者将数据放入缓冲区的步骤
生产者将数据放入缓冲区的过程包括获取互斥锁,检查缓冲区状态,将生成的数据放入缓冲区,然后释放互斥锁。
##### 消费者从缓冲区获取数据的步骤
消费者从缓冲区获取数据的过程包括获取互斥锁,检查缓冲区状态,获取数据进行处理,然后释放互斥锁。
可能有人会问,“ 为什么使用缓冲区,而不是直接生产者和消费者直接通信?”
使用缓冲区的主要原因在于解耦和同步。缓冲区作为一个中介,提供了一个独立的数据存储空间,使得生产者和消费者可以独立运行,不需要即时通信。这种解耦性增加了系统的灵活性,允许生产者和消费者以各自的速度操作数据,而不必互相等待。同时,缓冲区通过同步机制确保了线程安全,防止了数据竞争和不一致性问题,从而提高了系统的稳定性和可维护性。
### 四、手把手带大家实现
首先要明确,生产者消费者模式分为四种,分别是 一对一、一对多、多对一、多对多,对于不同的模式,都有细微的差距,下面一一展开讲解:
在正式开始写代码之前,我们先写一个输出包,以便于后续打印需要的信息。我们在项目根目录下创建一个 `out` 目录,然后创建一个 `out.go` 文件,代码如下:
```go
package out
import "fmt"
// Out 输出
type Out struct {
data chan interface{}
}
// 单例模式
var out *Out
// NewOut 初始化
func NewOut() *Out {
if out == nil {
out = &Out{
data: make(chan interface{}, 65535), // 这里必须设置缓冲区
}
}
return out
}
// Println out 的写入方法
func Println(i interface{}) {
out.data <- i
}
// OutPut 将 out 内所有数据全部输出
func (o *Out) OutPut() {
for {
select {
case i := <-o.data:
fmt.Println(i)
}
}
}
```
##### 一对一
首先我们定义一个任务结构体,生产者生成的任务就是一个带有 `ID` 的任务,然后消费任务的逻辑就是将该任务的 `ID` 打印出来:
```go
// Task 任务
type Task struct {
ID int64
}
// 消费任务
func (t *Task) run() {
out.Println(t.ID)
}
```
然后定义一个缓冲区,用于存放生产者生产的任务,这里采用 带缓存的`channel` 来做缓冲区,并且给生产者需要生产的任务数量赋值:
```go
// 缓冲池
var taskCh = make(chan Task, 10)
// 生产者需要生产的任务数量
const taskNum int64 = 10000
```
接着写我们的生产者逻辑,因为是一对一,所以只有一个生产者,那么在该生产者生产完任务之后就可以将生产者通道关闭。需要注意的是,如果这里没有关闭的话,可能会导致后续消费者误以为还有任务在生产一直等待,导致死锁:
```go
func producer(wo chan<- Task) {
var i int64
for i = 1; i <= taskNum; i++ {
t := Task{
ID: i,
}
wo <- t
}
// 单个生产者就可以直接关闭通道了,关闭后,消费者任然可以消费
close(wo)
}
```
再来看消费者逻辑,消费者的话,直接用 `for - range` 的方式阻塞等待生产者生产任务即可,待生产者生产完成之后,会主动关闭通断,消费者消费完成之后,就会结束 `for - range` 循环了:
```go
func consumer(ro <-chan Task) {
for t := range ro {
if t.ID != 0 {
t.run()
}
}
}
```
最后就是我们的执行函数,由于我们不知道什么时候生产者和消费者完成了自己负责的任务,于是我们通过 `sync.WaitGroup` 来作协程通知,以确保生产者任务生产完毕且消费者任务消费完毕:
```go
func Exec() {
wg := &sync.WaitGroup{}
wg.Add(2)
go func(wg *sync.WaitGroup) {
defer wg.Done()
producer(taskCh)
}(wg)
go func(wg *sync.WaitGroup) {
defer wg.Done()
consumer(taskCh)
}(wg)
wg.Wait()
out.Println("执行成功")
}
```
在使用 `sync.WaitGroup` 进行函数传参时需要注意,由于 Go 语言的函数传参是值传递,如果只传递的是值的话,在函数内部执行 `Done()` 操作是不会影响到函数外的计数器的数量的,所以如果要传参,就需要使用指针进行传递。
###### 完整代码
```go
package one_one
import (
"main/out"
"sync"
)
// Task 任务
type Task struct {
ID int64
}
// 消费任务
func (t *Task) run() {
out.Println(t.ID)
}
/* 可能存在的问题
1、生产者通道未关闭,消费者未结束,wg.Wait()没有等待,导致死锁
2、wg 传参时,如果是值传递,可能会导致wg.Wait()没有被 Done 为零
*/
// 缓冲池
var taskCh = make(chan Task, 10)
// 生产者需要生产的任务数量
const taskNum int64 = 10000
// 一个生产者
func producer(wo chan<- Task) {
var i int64
for i = 1; i <= taskNum; i++ {
t := Task{
ID: i,
}
wo <- t
}
// 单个生产者就可以直接关闭通道了,关闭后,消费者任然可以消费
close(wo)
}
// 一个消费者
func consumer(ro <-chan Task) {
for t := range ro {
if t.ID != 0 {
t.run()
}
}
}
func Exec() {
wg := &sync.WaitGroup{}
wg.Add(2)
go func(wg *sync.WaitGroup) {
defer wg.Done()
producer(taskCh)
}(wg)
go func(wg *sync.WaitGroup) {
defer wg.Done()
consumer(taskCh)
}(wg)
wg.Wait()
out.Println("执行成功")
}
```
##### 一对多
一对多的话,还是一个生产者去生产任务,但是多个消费者去消费任务,这里我们只需要在 “ 一对一 ” 的基础上修改 `Exec()` 函数,开多个消费者进行消费即可,消费逻辑也不需要进行修改。这里需要强调一点,由于 Go 语言中 `chennel` 是线程安全的,故这里多个消费者去竞争任务的时候,不会出现线程安全的问题,我们也不需要额外加锁去作兜底。
我们这里就通过任务的增量逐渐来开新的消费者去消费:
```go
func Exec() {
wg := &sync.WaitGroup{}
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
producer(taskCh)
}(wg)
var i int64
for i = 0; i < taskNum; i++ {
if i%100 == 0 { // 根据任务增量来逐渐开新的消费者去消费
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
consumer(taskCh)
}(wg)
}
}
wg.Wait()
out.Println("执行成功")
}
```
每 100 个任务,就开一个消费者去消费。这里并不是将 100 个任务分配给指定的消费者,而是多个消费者去缓冲区中竞争任务来处理执行。
###### 完整代码
```go
package one_many
import (
"main/out"
"sync"
)
// Task 任务
type Task struct {
ID int64
}
// 消费任务
func (t *Task) run() {
out.Println(t.ID)
}
/* 可能存在的问题
1、channel 是线程安全的,多个消费者同时去消费不存在 数据竞争问题
2、wg 传参时,如果是值传递,可能会导致wg.Wait()没有被 Done 为零
*/
// 缓冲池
var taskCh = make(chan Task, 10)
// 生产者需要生产的任务数量
const taskNum int64 = 10000
// 一个生产者
func producer(wo chan<- Task) {
var i int64
for i = 1; i <= taskNum; i++ {
t := Task{
ID: i,
}
wo <- t
}
// 单个生产者就可以直接关闭通道了,关闭后,消费者任然可以消费
close(wo)
}
// 一个消费者
func consumer(ro <-chan Task) {
for t := range ro {
if t.ID != 0 {
t.run()
}
}
}
func Exec() {
wg := &sync.WaitGroup{}
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
producer(taskCh)
}(wg)
var i int64
for i = 0; i < taskNum; i++ {
if i%100 == 0 { // 根据任务增量来逐渐开新的消费者去消费
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
consumer(taskCh)
}(wg)
}
}
wg.Wait()
out.Println("执行成功")
}
```
##### 多对一
多对一的话,就是需要多个生产者生产任务,而只有一个消费者去消费任务,所以这里的消费者逻辑不需要进行更改。而生产者逻辑,我们这里规定每个生产者需要生产的任务数量 `nums`,然后生产者逻辑就是:从当前任务编号开始生产,生产 `nums` 个,然后就可以停止生产了,如下:
```go
// 多个生产者
func producer(wo chan<- Task, startNum int64, nums int64) {
var i int64
for i = startNum; i < startNum+nums; i++ {
t := Task{
ID: i,
}
wo <- t
}
}
```
同样,`Exec()`执行逻辑中,我们去做一个多对一的生产者消费者逻辑。我们限制每个生产者生产 nums 个任务,即每 nums 个任务开一个新的生产者去生产。同时为了保证生产者任务生产完毕,我们使用 `pwg.Add(1)` 在生产任务前将计数器 ++,在生产任务结束后,用`pwg.Done()` 将计数器 --。
还是一样的,为了保证生产者和消费者都完成了任务,我们使用 `wg.Add(1)` 在生产者和消费者在工作前将计数器 ++,在生产者和消费者完成工作后将计数器 --。
```go
func Exec() {
// 保证生产者任务生产完毕
wg := &sync.WaitGroup{}
// 保证生产者任务生产完毕后,将 channel 关闭
pwg := &sync.WaitGroup{}
var i int64
wg.Add(1)
for i = 0; i < taskNum; i += nums {
if i >= taskNum {
break
}
// 每个生产者生产 100 个任务
wg.Add(1)
pwg.Add(1)
// 问题2:参数传递
go func(i int64) {
defer wg.Done()
defer pwg.Done()
producer(taskCh, i, nums)
}(i)
}
go func() {
defer wg.Done()
consumer(taskCh)
}()
pwg.Wait()
// 这里需要注意,问题 1
go close(taskCh)
wg.Wait()
out.Println("执行成功")
}
```
这里需要注意的两个问题:一个是,在 `for` 循环中的变量 `i` 可能会存在内存共享的问题,因为在可能在本次循环中 `i` 的值为 `199`,但是在协程开始执行后,传入 `producer()` 函数的 `i` 的值就变成了 `200`,所以这里需要用参数将 `i` 的值传到对应的协程中。另一个问题是,在关闭通道 `close(taskch)` 的时候,这里可能会存在一个极小的时间差,可能会存在还有协程在往通道里面写数据,所以这里用 `go(close)` 会保险一点。
###### 完整代码
```go
package many_one
import (
"main/out"
"sync"
)
// Task 任务
type Task struct {
ID int64
}
// 消费任务
func (t *Task) run() {
out.Println(t.ID)
}
/* 可能存在的问题
1、go close 去关闭channel,因为可能还有协程在向里面写数据,有极小的时间差
2、生产者在生产的时候,可能存在数据竞争问题
*/
// 缓冲池
var taskCh = make(chan Task, 10)
// 生产者需要生产的任务数量
const taskNum int64 = 10000
// 每个生产者生产的任务数量,100
const nums int64 = 100
// 多个生产者
func producer(wo chan<- Task, startNum int64, nums int64) {
var i int64
for i = startNum; i < startNum+nums; i++ {
t := Task{
ID: i,
}
wo <- t
}
}
// 一个消费者
func consumer(ro <-chan Task) {
for t := range ro {
if t.ID != 0 {
t.run()
}
}
}
func Exec() {
// 保证生产者任务生产完毕
wg := &sync.WaitGroup{}
// 保证生产者任务生产完毕后,将 channel 关闭
pwg := &sync.WaitGroup{}
var i int64
wg.Add(1)
for i = 0; i < taskNum; i += nums {
if i >= taskNum {
break
}
// 每个生产者生产 100 个任务
wg.Add(1)
pwg.Add(1)
// 问题2:参数传递
go func(i int64) {
defer wg.Done()
defer pwg.Done()
producer(taskCh, i, nums)
}(i)
}
go func() {
defer wg.Done()
consumer(taskCh)
}()
pwg.Wait()
// 这里需要注意,问题 1
go close(taskCh)
wg.Wait()
out.Println("执行成功")
}
```
##### 多对多
多对多的话,就比较接近现实中的场景了。会有源源不断的生产者生产任务,就会有消费者不断地去消费任务,它们都不会主动退出,靠人为信号退出 goroutine。因此,我们需要先定义一个全局停止运行的信号:
```go
// 停止运行的信号
var done = make(chan struct{})
```
生产者由于是无限生产,那毫无疑问生产者逻辑是写在一个 `for` 循环内的,这里为了避免缓冲区满了,生产者因为阻塞而导致无法接收到 `done` 信号,我们配合 `select` 来实现:
```go
func producer(wo chan<- Task, done chan struct{}) {
var i int64
for {
if i >= TaskNum { // 无限生产
i = 0
}
i++
t := Task{
ID: i,
}
// 可以防止因为生产者阻塞,而导致关闭信号无法关闭
select {
case wo <- t:
case <-done:
out.Println("生产者退出")
return
}
}
}
```
同样,我们的消费者逻辑肯定也是放在 `for` 循环中来写,并且也配合 `select` 来接收信号:
```go
func consumer(ro <-chan Task, done chan struct{}) {
for {
select {
case t := <-ro:
if t.ID != 0 {
t.run()
}
case <-done: // 这里如果直接退出的话,可能 channel 里面还有值没有被消费(有缓存区的情况)
for t := range ro { // 生产者那边已经停止,消息不会再生产。消费者这里将所有消息消费后,就可以 退出了
if t.ID != 0 {
t.run()
}
}
out.Println("消费者退出")
return
}
}
}
```
在接收到 `done` 信号后,这里有一个小坑:可能此时缓冲区中还存在任务没有被消费。故我们应该在退出运行前,再消费执行一次消费逻辑,保证缓冲区中没有任务剩余。
执行函数的逻辑就很简单了,直接异步开多个生产者和消费者同时运行即可。这里也需要注意先关闭信号,再关闭通道,如果反过来了,就可能会导致向已关闭的 `channel` 内写入数据,会报异常。
###### 完整代码
```go
package many_many
import (
"main/out"
"time"
)
// Task 任务
type Task struct {
ID int64
}
// 消费任务
func (t *Task) run() {
out.Println(t.ID)
}
/* 可能存在的问题
1、生产者 和 消费者 都不主动退出,靠信号退出 goroutine
2、源源不断地生产,消费者也不间断。
*/
// 缓冲池
var taskCh = make(chan Task, 10)
// 停止运行的信号
var done = make(chan struct{})
// TaskNum 生产者需要生产的任务数量
const TaskNum int64 = 10000
func producer(wo chan<- Task, done chan struct{}) {
var i int64
for {
if i >= TaskNum { // 无限生产
i = 0
}
i++
t := Task{
ID: i,
}
// 可以防止因为生产者阻塞,而导致关闭信号无法关闭
select {
case wo <- t:
case <-done:
out.Println("生产者退出")
return
}
}
}
func consumer(ro <-chan Task, done chan struct{}) {
for {
select {
case t := <-ro:
if t.ID != 0 {
t.run()
}
case <-done: // 这里如果直接退出的话,可能 channel 里面还有值没有被消费(有缓存区的情况)
for t := range ro { // 生产者那边已经停止,消息不会再生产。消费者这里将所有消息消费后,就可以 退出了
if t.ID != 0 {
t.run()
}
}
out.Println("消费者退出")
return
}
}
}
func Exec() {
// 多个生产者
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
// 多个消费者
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
time.Sleep(time.Second * 5)
// 一定要先关闭 done,再关闭通道。防止向已关闭的 channel 写入数据,报异常
close(done)
close(taskCh)
out.Println("执行成功")
}
```
### 五、总结
生产者消费者模式的核心思想是通过**共享缓冲区实现生产者和消费者之间的解耦**,使得生产者生成数据并放入缓冲区,而消费者从缓冲区获取数据进行处理。
关键的实现要点包括同步机制,阻塞和唤醒机制,以及解耦生产者和消费者的直接依赖关系。这种模式通过平衡数据生成和处理的速度,提高了系统的灵活性和效率,适用于多线程环境下的异步数据交换。
有疑问加站长微信联系(非本文作者)