如下是一个简单的生产-消费的代码,当我们程序要退出,或者重启的时候,channel里面的数据会丢失掉。 大家在使用生产消费者的时候,是怎么处理的呢?
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
var oriTasks chan int
var taskStage21 chan int
var taskStage22 chan int
var endTasks chan int
func init() {
oriTasks = make(chan int , 1000)
taskStage21 = make(chan int , 1000)
taskStage22 = make(chan int , 1000)
endTasks = make(chan int , 1000)
}
func main() {
// 模拟任务来源,一般从网络获取
go func() {
i := 0
for {
oriTasks <- i
i++
time.Sleep(10*time.Second)
}
}()
go consumerStage1()
go consumerStage21()
go consumerStage22()
go consumerStageEnd()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
<-quit
}
func consumerStage1() {
for {
select {
case tasks := <-oriTasks:
if tasks %2 ==0 {
// do something ....
taskStage22 <- tasks
} else {
// do other something ...
taskStage22 <- tasks
}
}
}
}
func consumerStage21() {
for {
select {
case tasks := <-oriTasks:
// stage2-2 biz
endTasks <- tasks * 2
}
}
}
func consumerStage22() {
for {
select {
case tasks := <-oriTasks:
// stage2-3 biz
endTasks <- tasks * 4
}
}
}
func consumerStageEnd() {
for {
select {
case tasks := <-endTasks:
// end task biz
r := tasks +1
fmt.Println(r)
}
}
}
有疑问加站长微信联系(非本文作者)

你这个和消费生产者关系不大
你这个是一个最简化的消息队列
我们可以参考一下go实现的消息队列
https://nats.io/
nats是一个 go实现的消息队列
他最基本的是nats-server,也就是基于话题的订阅/发布功能。
然后,他还提供了一个 nats-streaming
https://docs.nats.io/nats-streaming-concepts/intro
通过这个功能实现了消息的持久化,消费模式等功能
然后是自己的最小化实现。
我自己做过一个通知队列,简单来说,需要实现4快
1.编号,给每个消息编一个号。 2.持久化,随便找一个key-value db就可以,比如leveldb 3.核销,操作成功后根据消息编号删除消息记录 4.重试,这个根据自己的策略来做就行
也有过这类的疑惑,暂时无解
你的数据在内存中,重启肯定丢,这些生产者消费者、或者channel是没有任何关系的。要想不丢就必须放在外面,无论是数据库还是消息队列或者其他什么的
程序退出前 close所有channel
读取channel的函数 等待消费完毕再退出 可以埋一个sync.WaitGroup到每个消费函数里,main里做wait即可
是想在一个进程内做这个,不想做成分布式的呀。
这个方法是靠谱的。需要注意的是,关闭的时候,要注意顺序,保证所有写channel的协程全部退出,否则会panic
我只是让你参考架构
这个和分布式没关系啊。
你需要的是消息的持久化,要持久化就是要落地,落地最简单的就是都存到一个key-value database里。
然后做了持久化还要考虑失败(重启服务)时的策略,保证一次/至少一次/精确一次。