golang 基于channel的生产者消费者实现

· · 1236 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

背景


公司开发新系统,使用go语言。

模式为来一个客户给部署一套,用户在万个以下。

为方便部署,要求单可执行文件。

分模块之后耦合严重,单独部署消息队列服务没有必要,且会增加部署成本。

所以自己手写了一个生产者消费者的库,整合到程序中。

核心代码


核心结构体的定义
// 结构定义
type Broker struct {
    event      chan interface{} // 接收事件的管道
    handlers   []func(interface{})  // 处理事件的方法
    handlersMu sync.RWMutex // 添加方法时的锁
    Name       string // 名称
    wait       *sync.WaitGroup // wait group 用于停止时等待所有事件处理完成
    onceStart  sync.Once // 确保只启动一次
    onceStop   sync.Once // 确保只关闭一次
}
启动代码 事件处理循环
func (b *Broker) Start() {
    b.onceStart.Do(func() {
        b.wait.Add(1)
        go func() {
            for {
                event, ok := <-b.event
                if ok {
                    // 事件分发
                    b.handlersMu.RLock()
                    for _, v := range b.handlers {
                        v(event) // 有recover
                    }
                    b.handlersMu.RUnlock()
                } else {
                    // 通道已经关闭
                    b.wait.Done()
                    return
                }
            }
        }()
    })
}
创建并启动
// NewStartedBroker 创建broker,并开始
func NewStartedBroker(name string, chanBuf int) *Broker {
    b := &Broker{
        event:    make(chan interface{}, chanBuf),
        handlers: make([]func(interface{}), 0),
        Name:     name,
        wait:     &sync.WaitGroup{},
    }
    b.Start()
    return b
}
注册处理方法
// Register 注册事件
func (b *Broker) Register(ctx context.Context, f func(interface{})) (err error) {
    b.handlersMu.Lock()
    defer b.handlersMu.Unlock()
    b.handlers = append(b.handlers, func(o interface{}) {
        defer func() {
            if err := recover(); err != nil {
                err = fmt.Errorf("panic on broker handler msg name:%v err:%v msg:%v", b.Name, err, o)
                log.Println(err)
            }
        }()
        f(o)
    })
    return nil
}
事件发送
// Send 注册事件
func (b *Broker) Send(ctx context.Context, o interface{}) (err error) {
    defer func() {
        if errs := recover(); errs != nil {
            err = fmt.Errorf("消息处理异常 name:%v msg:%v err:%v", b.Name, o, errs)
            log.Println(err)
        }
    }()
    b.event <- o
    return
}
关闭方法
// Stop 调用stop之前确保写入方都已经退出了,不然要panic
func (b *Broker) Stop() {
    b.onceStop.Do(func() {
        close(b.event)
        b.wait.Wait()
    })
}
删除所有处理方法
func (b *Broker) Clear() {
    b.handlersMu.Lock()
    defer b.handlersMu.Unlock()
    b.handlers = b.handlers[0:0]
}

链接

项目代码 https://github.com/krilie/go-smq


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:

查看原文:golang 基于channel的生产者消费者实现

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1236 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传