Golang-配置化 pipeline

王谙然 · · 1700 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

Unix 中的 pipeline,用于程序间的统一接口,示例:

tail -f biz.log | grep "ERROR" | grep "error_no=1234" | wc -l

借鉴这个设计思路,我们想设计一个 pipeline 模块,需要有以下功能:

  1. 抽象数据处理器(Handler)
  2. 一组 Handler 可以串行
  3. 一组 Handler 可以并行
  4. Handler 可配置化
  5. Handler 可被引用


    pipeline.png

下面我们就以 Golang 为例,尝试将 pipeline 的基本模型搭建出来:

接口 Handler

pipeline 中数据处理器的对外接口

// Args args for handler
type Args struct {
    InValue interface{}
    Params  map[string]interface{}
}

// Args response for handler
type Resp struct {
    OutValue interface{}
    Params   map[string]interface{}
}

// Handler defines some one who can handle something
type Handler interface {
    Handle(ctx context.Context, args Args) (resp *Resp, err error)
}

Handler Builder

一个 Handler 需要一个 Builder 去实例化

// HandlerBuilder build a Handler with JSON conf
type HandlerBuilder interface {
    BuildHandlerByJSON(id string, confJSON string) (Handler, error)
}

Handler Option

用 JSON 配置文件实例化一个 Handler:

  1. 可以引用一个已存在的 Handler
  2. 也可以用一个 HandlerBuilder 构建一个 Handler
  3. 需要额外配置 Required, Timeout, DefaultValue
type Option struct {
    // ID ref of a existing handler
    ID string `json:"id"`

    // create a handler from pipe
    PipeName string          `json:"pipe_name"`
    PipeConf json.RawMessage `json:"pipe_conf"`

    // handler conf
    TimeOutMillisecond int64       `json:"time_out_millisecond"`
    Required           bool        `json:"required"`
    DefaultValue       interface{} `json:"default_value"`

    // Handler underlying handler
    Handler Handler `json:"handler"`
}

并行 Handler

  1. 一组 Handler,为每个 Handler 起一个 goroutine 并发执行。
  2. 每个 Handler 都配置了 Required/Timeout/DefaultValue,当一个必要的(Requried=true) Handler 超时或报错了,整体处理报错,反之则以 DefaultValue 作为响应。
  3. 这个 并发执行 本身也是一种处理器,即实现了 Handler 接口,可用被当做一个 Handler 用于其他处理流中。
type Handlers []pipeline.Option

func (handlers Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
    // prepare params
    var (
        wg       sync.WaitGroup
        fatalErr error
        hValChan = make(chan struct {
            idx int
            val interface{}
            err error
        })
        respData = make([]interface{}, len(handlers))
    )
    // set wait number
    wg.Add(len(handlers))

    // start goroutines to handle
    for i, h := range handlers {
        go func(index int, handler pipeline.Option) {
            // do handle ...
            // push response
            hValChan <- struct {
                idx int
                val interface{}
                err error
            }{idx: index, val: respResult.val, err: respResult.err}
            return
        }(i, h)
    }

    // receive responses of handlers
    go func() {
        for {
            resp := <-hValChan
            func() {
                defer wg.Done()
                // ...
            }()
        }
    }()

    // wait for response
    wg.Wait()

    // build response
    return &pipeline.Resp{
        OutValue: respData,
        Params:   args.Params,
    }, fatalErr
}

串行 Handler

  1. 一组 Handler,一个接一个地执行。
  2. 每个 Handler 都配置了 Required/Timeout/DefaultValue,即当一个必要的(Requried=true) Handler 超时或报错了,整体处理报错,反之则已 DefaultValue 作为响应。
  3. 这个 串行执行 本身也是一个处理器,即实现了 Handler 接口,可用作其他数据流的一个环节。
type Handlers struct {
    ID               string            `json:"id"`
    Handlers         []pipeline.Option `json:"handlers"`
}

func (handlers *Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
    for step, h := range handlers.Handlers {
        inArgs := args

        if h.TimeOutMillisecond <= 0 {
            resp, err = h.Handler.Handle(ctx, args)
        } else {
            resp, err = handlers.stepWithTimeout(ctx, h, args)
        }

        if err != nil {
            if h.Required {
                return
            }

            log.Printf("line-handler failed: id=%v, step=%v, err=%v", handlers.ID, step, err)
            args = pipeline.Args{
                InValue: h.DefaultValue,
                Params:  inArgs.Params,
            }
            continue
        }

        args = pipeline.Args{
            InValue: resp.OutValue,
            Params:  resp.Params,
        }
    }

    return
}

// stepWithTimeout handles the args with timeout
func (handlers *Handlers) stepWithTimeout(ctx context.Context, h pipeline.Option, args pipeline.Args) (*pipeline.Resp, error) {
    hValChan := make(chan struct {
        resp *pipeline.Resp
        err  error
    })

    go func() {
        resp, err := h.Handler.Handle(ctx, args)
        hValChan <- struct {
            resp *pipeline.Resp
            err  error
        }{resp: resp, err: err}
    }()

    select {
    case <-time.After(time.Millisecond * time.Duration(h.TimeOutMillisecond)):
        return &pipeline.Resp{Params: args.Params}, errors.New("timeout: handler_id=" + h.ID)
    case r := <-hValChan:
        return r.resp, r.err
    }
}

源码

github.com/Focinfi/pipeline


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:王谙然

查看原文:Golang-配置化 pipeline

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1700 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传