tees/channels.go
package tees
type TeeOp struct {
}
func NewTeeOp() *TeeOp {
teeOp := &TeeOp{}
return teeOp
}
func (teeOp *TeeOp) OrDone(
done, c <-chan interface{},
) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
func (teeOp *TeeOp) Tee(
done <-chan interface{},
in <-chan interface{},
) (_, _ <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out1)
defer close(out2)
for val := range teeOp.OrDone(done, in) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
func (teeOp *TeeOp) Repeat(
done <-chan interface{},
args ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range args {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
func (teeOp *TeeOp) Take(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
teedemo.go
package main
import (
"fmt"
"teedemo/tees"
)
func main() {
teeOp := tees.NewTeeOp()
done := make(chan interface{})
defer close(done)
out1, out2 := teeOp.Tee(done, teeOp.Take(done, teeOp.Repeat(done, 1, 2, 3), 6))
for val1 := range out1 {
fmt.Printf("out1: %v, out2: %v \n", val1, <-out2)
}
}
程序输出如下
有疑问加站长微信联系(非本文作者)