go 编程的架构模式(pipe-filter)

第一种架构模式 pipe-filter

  • 其架构适用于 解析,过滤,处理,返回这样的架构,如数据分析。
  • filter 用于数据的过滤。而pipe 用于连接filter传递数据,或者异步处理缓冲数据流。
  • 松耦合:filter 只和数据耦合。


下面的例子是一个将 字符串“1,2,3” 按逗号切分后,再字符转数字相加的过程。


1、首先实现一个 filter 的接口。该接口定义了数据的来源接口,输出接口,该filter接口必须拥有的处理方法

package pipe_filter

// Request is the input of the filter
type Request interface{}

// Response is the output of the filter
type Response interface{}

// Filter interface is the definition of the data processing components
// Pipe-Filter structure
type Filter interface {
    Process(data Request) (Response, error)

2、定义一个pipe-line, 目的是为了将所有的filter串起来。

package pipe_filter

// NewStraightPipeline create a new StraightPipelineWithWallTime
func NewStraightPipeline(name string, filters ...Filter) *StraightPipeline {
    return &StraightPipeline{
        Name:    name,
        Filters: &filters,

// StraightPipeline is composed of the filters, and the filters are piled as a straigt line.
type StraightPipeline struct {
    Name    string
    Filters *[]Filter

// Process is to process the coming data by the pipeline
func (f *StraightPipeline) Process(data Request) (Response, error) {
    var ret interface{}
    var err error
    for _, filter := range *f.Filters {
        ret, err = filter.Process(data)
        if err != nil {
            return ret, err
        data = ret
    return ret, err

3、定义需要的filter,在这里filter的工作顺序是串行的,首先是按“,”拆分,其次将字符型转换为数字形。最后加起来。每个filter都必须实现一个Process方法,因为只是在filter.go 里定义好的。
split_filter.go (拆分)

package pipe_filter

import (

var SplitFilterWrongFormatError = errors.New("input data should be string")

type SplitFilter struct {
    delimiter string

func NewSplitFilter(delimiter string) *SplitFilter {
    return &SplitFilter{delimiter}

func (sf *SplitFilter) Process(data Request) (Response, error) {
    str, ok := data.(string) //检查数据格式/类型,是否可以处理
    if !ok {
        return nil, SplitFilterWrongFormatError
    parts := strings.Split(str, sf.delimiter)
    return parts, nil

toint_filter.go (字符转整数)

package pipe_filter

import (

var ToIntFilterWrongFormatError = errors.New("input data should be []string")

type ToIntFilter struct {

func NewToIntFilter() *ToIntFilter {
    return &ToIntFilter{}

func (tif *ToIntFilter) Process(data Request) (Response, error) {
    parts, ok := data.([]string)
    if !ok {
        return nil, ToIntFilterWrongFormatError
    ret := []int{}
    for _, part := range parts {
        s, err := strconv.Atoi(part)
        if err != nil {
            return nil, err
        ret = append(ret, s)
    return ret, nil

sum_filter.go (累加)

package pipe_filter

import "errors"

var SumFilterWrongFormatError = errors.New("input data should be []int")

type SumFilter struct {

func NewSumFilter() *SumFilter {
    return &SumFilter{}

func (sf *SumFilter) Process(data Request) (Response, error) {
    elems, ok := data.([]int)
    if !ok {
        return nil, SumFilterWrongFormatError
    ret := 0
    for _, elem := range elems {
        ret += elem
    return ret, nil

这下一个完美的 pipe-filter 就完成了

package main

import (

func main() {
    spliter := pipe_filter.NewSplitFilter(",")
    converter := pipe_filter.NewToIntFilter()
    sum := pipe_filter.NewSumFilter()
    sp := pipe_filter.NewStraightPipeline("p1", spliter, converter, sum)
    ret, err := sp.Process("1,2,3")
    if err != nil {
    if ret != 6 {
        log.Fatalf("The expected is 6, but the actual is %d", ret)





