golang channel实现fan-in、fan-out、or-done

陈陈陈_6150 · · 690 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

package main

import (
    "fmt"
    "time"
)

func sig(after time.Duration) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}

func main() {

    // start := time.Now()
    
    // //or-Done模式
    // <-or(
    //  sig(10*time.Second),
    //  sig(20*time.Second),
    //  sig(30*time.Second),
    //  sig(40*time.Second),
    //  sig(50*time.Second),
    //  sig(01*time.Minute),
    // )

    //fan in
    // <-fanInRec(
    //  sig(10*time.Second),
    //  sig(20*time.Second),
    //  sig(30*time.Second),
    //  sig(40*time.Second),
    //  sig(50*time.Second),
    //  sig(01*time.Minute),
    // )
    // fmt.Printf("done after %v", time.Since(start))

    //fan out
    ch := make(chan interface{})
    chLister := []chan interface{}{make(chan interface{}), make(chan interface{}), make(chan interface{})}
    fanOut(ch, chLister, false)
    ch <- 888
    fmt.Println(<-chLister[0])
    fmt.Println(<-chLister[1])
    fmt.Println(<-chLister[2])

}

func or(channels ...<-chan interface{}) <-chan interface{} {
    // 特殊情况,只有零个或者1个chan
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)

        switch len(channels) {
        case 2: // 2个也是一种特殊情况
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default: //超过两个,二分法递归处理,也可以使用reflect
            m := len(channels) / 2
            select {
            case <-or(channels[:m]...):
            case <-or(channels[m:]...):
            }
        }
    }()

    return orDone
}

func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
    switch len(chans) {
    case 0:
        c := make(chan interface{})
        close(c)
        return c
    case 1:
        return chans[0]
    case 2:
        return mergeTwo(chans[0], chans[1])
    default:
        m := len(chans) / 2
        return mergeTwo(
            fanInRec(chans[:m]...),
            fanInRec(chans[m:]...))
    }
}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        for a != nil || b != nil { //只要还有可读的chan
            select {
            case v, ok := <-a:
                if !ok { // a 已关闭,设置为nil
                    a = nil
                    continue
                }
                c <- v
            case v, ok := <-b:
                if !ok { // b 已关闭,设置为nil
                    b = nil
                    continue
                }
                c <- v
            }
        }
    }()
    return c
}

func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
    go func() {
        defer func() { //退出时关闭所有的输出chan
            for i := 0; i < len(out); i++ {
                close(out[i])
            }
        }()

        for v := range ch { // 从输入chan中读取数据
            v := v
            for i := 0; i < len(out); i++ {
                i := i
                if async { //异步
                    go func() {
                        out[i] <- v // 放入到输出chan中,异步方式
                    }()
                } else {
                    out[i] <- v // 放入到输出chan中,同步方式
                }
            }
        }
    }()
}


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:陈陈陈_6150

查看原文:golang channel实现fan-in、fan-out、or-done

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

690 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传