Unix 中的 pipeline,用于程序间的统一接口,示例:
tail -f biz.log | grep "ERROR" | grep "error_no=1234" | wc -l
借鉴这个设计思路,我们想设计一个 pipeline 模块,需要有以下功能:
- 抽象数据处理器(Handler)
- 一组 Handler 可以串行
- 一组 Handler 可以并行
- Handler 可配置化
-
Handler 可被引用
下面我们就以 Golang 为例,尝试将 pipeline 的基本模型搭建出来:
接口 Handler
pipeline 中数据处理器的对外接口
// Args args for handler
type Args struct {
InValue interface{}
Params map[string]interface{}
}
// Args response for handler
type Resp struct {
OutValue interface{}
Params map[string]interface{}
}
// Handler defines some one who can handle something
type Handler interface {
Handle(ctx context.Context, args Args) (resp *Resp, err error)
}
Handler Builder
一个 Handler
需要一个 Builder
去实例化
// HandlerBuilder build a Handler with JSON conf
type HandlerBuilder interface {
BuildHandlerByJSON(id string, confJSON string) (Handler, error)
}
Handler Option
用 JSON 配置文件实例化一个 Handler:
- 可以引用一个已存在的 Handler
- 也可以用一个
HandlerBuilder
构建一个Handler
- 需要额外配置
Required, Timeout, DefaultValue
type Option struct {
// ID ref of a existing handler
ID string `json:"id"`
// create a handler from pipe
PipeName string `json:"pipe_name"`
PipeConf json.RawMessage `json:"pipe_conf"`
// handler conf
TimeOutMillisecond int64 `json:"time_out_millisecond"`
Required bool `json:"required"`
DefaultValue interface{} `json:"default_value"`
// Handler underlying handler
Handler Handler `json:"handler"`
}
并行 Handler
- 一组 Handler,为每个 Handler 起一个
goroutine
并发执行。 - 每个 Handler 都配置了
Required/Timeout/DefaultValue
,当一个必要的(Requried=true
) Handler 超时或报错了,整体处理报错,反之则以DefaultValue
作为响应。 - 这个 并发执行 本身也是一种处理器,即实现了 Handler 接口,可用被当做一个 Handler 用于其他处理流中。
type Handlers []pipeline.Option
func (handlers Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
// prepare params
var (
wg sync.WaitGroup
fatalErr error
hValChan = make(chan struct {
idx int
val interface{}
err error
})
respData = make([]interface{}, len(handlers))
)
// set wait number
wg.Add(len(handlers))
// start goroutines to handle
for i, h := range handlers {
go func(index int, handler pipeline.Option) {
// do handle ...
// push response
hValChan <- struct {
idx int
val interface{}
err error
}{idx: index, val: respResult.val, err: respResult.err}
return
}(i, h)
}
// receive responses of handlers
go func() {
for {
resp := <-hValChan
func() {
defer wg.Done()
// ...
}()
}
}()
// wait for response
wg.Wait()
// build response
return &pipeline.Resp{
OutValue: respData,
Params: args.Params,
}, fatalErr
}
串行 Handler
- 一组 Handler,一个接一个地执行。
- 每个 Handler 都配置了
Required/Timeout/DefaultValue
,即当一个必要的(Requried=true
) Handler 超时或报错了,整体处理报错,反之则已DefaultValue
作为响应。 - 这个 串行执行 本身也是一个处理器,即实现了 Handler 接口,可用作其他数据流的一个环节。
type Handlers struct {
ID string `json:"id"`
Handlers []pipeline.Option `json:"handlers"`
}
func (handlers *Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
for step, h := range handlers.Handlers {
inArgs := args
if h.TimeOutMillisecond <= 0 {
resp, err = h.Handler.Handle(ctx, args)
} else {
resp, err = handlers.stepWithTimeout(ctx, h, args)
}
if err != nil {
if h.Required {
return
}
log.Printf("line-handler failed: id=%v, step=%v, err=%v", handlers.ID, step, err)
args = pipeline.Args{
InValue: h.DefaultValue,
Params: inArgs.Params,
}
continue
}
args = pipeline.Args{
InValue: resp.OutValue,
Params: resp.Params,
}
}
return
}
// stepWithTimeout handles the args with timeout
func (handlers *Handlers) stepWithTimeout(ctx context.Context, h pipeline.Option, args pipeline.Args) (*pipeline.Resp, error) {
hValChan := make(chan struct {
resp *pipeline.Resp
err error
})
go func() {
resp, err := h.Handler.Handle(ctx, args)
hValChan <- struct {
resp *pipeline.Resp
err error
}{resp: resp, err: err}
}()
select {
case <-time.After(time.Millisecond * time.Duration(h.TimeOutMillisecond)):
return &pipeline.Resp{Params: args.Params}, errors.New("timeout: handler_id=" + h.ID)
case r := <-hValChan:
return r.resp, r.err
}
}
源码
有疑问加站长微信联系(非本文作者)