package main
import (
"fmt"
"time"
)
func sig(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
func main() {
// start := time.Now()
// //or-Done模式
// <-or(
// sig(10*time.Second),
// sig(20*time.Second),
// sig(30*time.Second),
// sig(40*time.Second),
// sig(50*time.Second),
// sig(01*time.Minute),
// )
//fan in
// <-fanInRec(
// sig(10*time.Second),
// sig(20*time.Second),
// sig(30*time.Second),
// sig(40*time.Second),
// sig(50*time.Second),
// sig(01*time.Minute),
// )
// fmt.Printf("done after %v", time.Since(start))
//fan out
ch := make(chan interface{})
chLister := []chan interface{}{make(chan interface{}), make(chan interface{}), make(chan interface{})}
fanOut(ch, chLister, false)
ch <- 888
fmt.Println(<-chLister[0])
fmt.Println(<-chLister[1])
fmt.Println(<-chLister[2])
}
func or(channels ...<-chan interface{}) <-chan interface{} {
// 特殊情况,只有零个或者1个chan
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2: // 2个也是一种特殊情况
select {
case <-channels[0]:
case <-channels[1]:
}
default: //超过两个,二分法递归处理,也可以使用reflect
m := len(channels) / 2
select {
case <-or(channels[:m]...):
case <-or(channels[m:]...):
}
}
}()
return orDone
}
func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
m := len(chans) / 2
return mergeTwo(
fanInRec(chans[:m]...),
fanInRec(chans[m:]...))
}
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil { //只要还有可读的chan
select {
case v, ok := <-a:
if !ok { // a 已关闭,设置为nil
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok { // b 已关闭,设置为nil
b = nil
continue
}
c <- v
}
}
}()
return c
}
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
go func() {
defer func() { //退出时关闭所有的输出chan
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
for v := range ch { // 从输入chan中读取数据
v := v
for i := 0; i < len(out); i++ {
i := i
if async { //异步
go func() {
out[i] <- v // 放入到输出chan中,异步方式
}()
} else {
out[i] <- v // 放入到输出chan中,同步方式
}
}
}
}()
}
有疑问加站长微信联系(非本文作者)