基于 Golang channel 的 pipeline(一)

哈哈笑321 · · 1649 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

需求

业务中,有很多处理流程,涉及到多个环节,其中很多环节是相同的。

通过使用 pipeline,好处:

  • 分离了业务与流程处理,业务代码只是给一个输入,处理并产生一个输出。
  • pipeline 中打印执行过程日志,方便定位问题点,发现瓶颈。
    • 当前执行到哪一个步骤了?
    • 程序卡在哪个环节了?
    • 一次处理耗时多少?
    • 哪个步骤的 channel 满了?
  • 通过并发执行某些环节来提高效率
  • 批量处理数据库操作
  • 每个步骤的代码易于测试

思路

感谢 (myntra)[github.com/myntra/pipeline] 的项目,思路类似,区别:

  • 每个 Step 执行时,会首先创建输出的 out channel,从输入的 in channel 里面读取数据。
func (s *Step) Exec(in chan interface{}) chan interface{} (
    out := make(chan interface{}, 100)
    
    go func() {
        // call 业务处理函数
    }()
    
    return out
)
  • 业务的执行函数输入和输出的参数都是 interface{}
func (s *BusinessProcessor) Process(in interface{}) interface{} {
    // process
    
    return result
}

流程

pipeline 简单时序图:


pipeline framework

代码

pipeline

遗留问题

  • 某个 Step 处理失败,停止整个 pipeline,这个逻辑在很多业务中都存在,可以添加到 pipeline 中。
  • 支持停止。
  • 支持 Step 并发执行。
  • 支持批处理,有些环节需要向数据库插入大量数据,批量处理效率会高很多。

reference


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:哈哈笑321

查看原文:基于 Golang channel 的 pipeline(一)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1649 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传