![image.png](https://static.studygolang.com/211015/65585096a2c4f934d1781223a3f22759.png)
```
package Pipeline
import (
"fmt"
"sync"
)
func Run() {
pipe := NewPipe()
ret := pipe.SetCmd(Evens).
SetCmdPipe(M10, 5).
Pipeline(1, 2, 3, 4, 5, 6, 7)
for item := range ret {
fmt.Println(item)
}
}
type InputChannel chan interface{}
type OutputChannel chan interface{}
type Cmd func(args ...interface{}) InputChannel
type CmdPipe func(InputChannel) OutputChannel
type Pipe struct {
Cmd Cmd
CmdPipe CmdPipe
Count int
}
func NewPipe() *Pipe {
return &Pipe{Count: 1}
}
func (this *Pipe) SetCmd(cmd Cmd) *Pipe {
this.Cmd = cmd
return this
}
func (this *Pipe) SetCmdPipe(cmd CmdPipe, count int) *Pipe {
this.CmdPipe = cmd
this.Count = count
return this
}
func (this *Pipe) Pipeline(args ...interface{}) OutputChannel {
in := this.Cmd(args...)
out := make(OutputChannel)
wg := sync.WaitGroup{}
for i := 0; i < this.Count; i++ {
wg.Add(1)
v := this.CmdPipe(in)
go func(input OutputChannel) {
defer wg.Done()
for item := range input {
out <- item
}
}(v)
}
go func() {
wg.Wait()
defer close(out)
}()
return out
}
func Evens(args ...interface{}) InputChannel {
var out = make(InputChannel)
go func() {
defer close(out)
for _, val := range args {
if val.(int)%2 == 0 {
out <- val
}
}
}()
return out
}
func M10(in InputChannel) OutputChannel {
var out = make(OutputChannel)
go func() {
defer close(out)
for val := range in {
out <- val.(int) * 10
}
}()
return out
}
```
有疑问加站长微信联系(非本文作者))