需求
业务中,有很多处理流程,涉及到多个环节,其中很多环节是相同的。
通过使用 pipeline,好处:
- 分离了业务与流程处理,业务代码只是给一个输入,处理并产生一个输出。
- pipeline 中打印执行过程日志,方便定位问题点,发现瓶颈。
- 当前执行到哪一个步骤了?
- 程序卡在哪个环节了?
- 一次处理耗时多少?
- 哪个步骤的 channel 满了?
- 通过并发执行某些环节来提高效率
- 批量处理数据库操作
- 每个步骤的代码易于测试
思路
感谢 (myntra)[github.com/myntra/pipeline] 的项目,思路类似,区别:
- 每个
Step
执行时,会首先创建输出的out channel
,从输入的in channel
里面读取数据。
func (s *Step) Exec(in chan interface{}) chan interface{} (
out := make(chan interface{}, 100)
go func() {
// call 业务处理函数
}()
return out
)
- 业务的执行函数输入和输出的参数都是
interface{}
func (s *BusinessProcessor) Process(in interface{}) interface{} {
// process
return result
}
流程
pipeline 简单时序图:
代码
遗留问题
- 某个 Step 处理失败,停止整个 pipeline,这个逻辑在很多业务中都存在,可以添加到 pipeline 中。
- 支持停止。
- 支持 Step 并发执行。
- 支持批处理,有些环节需要向数据库插入大量数据,批量处理效率会高很多。
reference
有疑问加站长微信联系(非本文作者)