如下是一个简单的生产-消费的代码,当我们程序要退出,或者重启的时候,channel里面的数据会丢失掉。
大家在使用生产消费者的时候,是怎么处理的呢?
```
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
var oriTasks chan int
var taskStage21 chan int
var taskStage22 chan int
var endTasks chan int
func init() {
oriTasks = make(chan int , 1000)
taskStage21 = make(chan int , 1000)
taskStage22 = make(chan int , 1000)
endTasks = make(chan int , 1000)
}
func main() {
// 模拟任务来源,一般从网络获取
go func() {
i := 0
for {
oriTasks <- i
i++
time.Sleep(10*time.Second)
}
}()
go consumerStage1()
go consumerStage21()
go consumerStage22()
go consumerStageEnd()
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
<-quit
}
func consumerStage1() {
for {
select {
case tasks := <-oriTasks:
if tasks %2 ==0 {
// do something ....
taskStage22 <- tasks
} else {
// do other something ...
taskStage22 <- tasks
}
}
}
}
func consumerStage21() {
for {
select {
case tasks := <-oriTasks:
// stage2-2 biz
endTasks <- tasks * 2
}
}
}
func consumerStage22() {
for {
select {
case tasks := <-oriTasks:
// stage2-3 biz
endTasks <- tasks * 4
}
}
}
func consumerStageEnd() {
for {
select {
case tasks := <-endTasks:
// end task biz
r := tasks +1
fmt.Println(r)
}
}
}
```
更多评论
你这个和消费生产者关系不大
你这个是一个最简化的消息队列
我们可以参考一下go实现的消息队列
https://nats.io/
nats是一个 go实现的消息队列
他最基本的是nats-server,也就是基于话题的订阅/发布功能。
然后,他还提供了一个 nats-streaming
https://docs.nats.io/nats-streaming-concepts/intro
通过这个功能实现了消息的持久化,消费模式等功能
#1
然后是自己的最小化实现。
我自己做过一个通知队列,简单来说,需要实现4快
1.编号,给每个消息编一个号。
2.持久化,随便找一个key-value db就可以,比如leveldb
3.核销,操作成功后根据消息编号删除消息记录
4.重试,这个根据自己的策略来做就行
#2