golang 生产消费模式,如何保证不丢数据呢?

chazex · 2021-02-24 20:46:53 · 1724 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2021-02-24 20:46:53 的主题,其中的信息可能已经有所发展或是发生改变。

如下是一个简单的生产-消费的代码,当我们程序要退出,或者重启的时候,channel里面的数据会丢失掉。 大家在使用生产消费者的时候,是怎么处理的呢?

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

var oriTasks chan int

var taskStage21 chan int
var taskStage22 chan int

var endTasks chan int

func init() {
    oriTasks = make(chan int , 1000)

    taskStage21 = make(chan int , 1000)
    taskStage22 = make(chan int , 1000)

    endTasks = make(chan int , 1000)
}

func main() {
    //  模拟任务来源,一般从网络获取
    go func() {
        i := 0
        for {
            oriTasks <- i
            i++
            time.Sleep(10*time.Second)
        }
    }()

    go consumerStage1()

    go consumerStage21()
    go consumerStage22()

    go consumerStageEnd()

    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
    <-quit

}

func consumerStage1() {
    for {
        select {
        case tasks := <-oriTasks:
            if tasks %2 ==0 {
                // do something ....
                taskStage22 <- tasks
            } else {
                // do other something ...
                taskStage22 <- tasks
            }
        }
    }
}


func consumerStage21() {
    for {
        select {
        case tasks := <-oriTasks:
            // stage2-2 biz
            endTasks <- tasks * 2
        }
    }
}


func consumerStage22() {
    for {
        select {
        case tasks := <-oriTasks:
            // stage2-3 biz
            endTasks <- tasks * 4
        }
    }
}

func consumerStageEnd() {
    for {
        select {
        case tasks := <-endTasks:
            // end task biz
            r := tasks +1
            fmt.Println(r)
        }
    }
}

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1724 次点击  
加入收藏 微博
8 回复  |  直到 2021-03-10 01:13:51
jarlyyn
jarlyyn · #1 · 4年之前

你这个和消费生产者关系不大

你这个是一个最简化的消息队列

我们可以参考一下go实现的消息队列

https://nats.io/

nats是一个 go实现的消息队列

他最基本的是nats-server,也就是基于话题的订阅/发布功能。

然后,他还提供了一个 nats-streaming

https://docs.nats.io/nats-streaming-concepts/intro

通过这个功能实现了消息的持久化,消费模式等功能

jarlyyn
jarlyyn · #2 · 4年之前

然后是自己的最小化实现。

我自己做过一个通知队列,简单来说,需要实现4快

1.编号,给每个消息编一个号。 2.持久化,随便找一个key-value db就可以,比如leveldb 3.核销,操作成功后根据消息编号删除消息记录 4.重试,这个根据自己的策略来做就行

cfanbo
cfanbo · #3 · 4年之前

也有过这类的疑惑,暂时无解

jthmath
jthmath · #4 · 4年之前

你的数据在内存中,重启肯定丢,这些生产者消费者、或者channel是没有任何关系的。要想不丢就必须放在外面,无论是数据库还是消息队列或者其他什么的

lwldcr
lwldcr · #5 · 4年之前

程序退出前 close所有channel

读取channel的函数 等待消费完毕再退出 可以埋一个sync.WaitGroup到每个消费函数里,main里做wait即可

chazex
chazex · #6 · 4年之前
jarlyynjarlyyn #1 回复

你这个和消费生产者关系不大 你这个是一个最简化的消息队列 我们可以参考一下go实现的消息队列 https://nats.io/ nats是一个 go实现的消息队列 他最基本的是nats-server,也就是基于话题的订阅/发布功能。 然后,他还提供了一个 nats-streaming https://docs.nats.io/nats-streaming-concepts/intro 通过这个功能实现了消息的持久化,消费模式等功能

是想在一个进程内做这个,不想做成分布式的呀。

chazex
chazex · #7 · 4年之前
lwldcrlwldcr #5 回复

程序退出前 close所有channel 读取channel的函数 等待消费完毕再退出 可以埋一个sync.WaitGroup到每个消费函数里,main里做wait即可

这个方法是靠谱的。需要注意的是,关闭的时候,要注意顺序,保证所有写channel的协程全部退出,否则会panic

jarlyyn
jarlyyn · #8 · 4年之前
chazexchazex #6 回复

#1楼 @jarlyyn 是想在一个进程内做这个,不想做成分布式的呀。

我只是让你参考架构

这个和分布式没关系啊。

你需要的是消息的持久化,要持久化就是要落地,落地最简单的就是都存到一个key-value database里。

然后做了持久化还要考虑失败(重启服务)时的策略,保证一次/至少一次/精确一次。

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传