most idiomatic way to "tee" a channel?

polaris · · 1481 次点击    
这是一个分享于 的资源,其中的信息可能已经有所发展或是发生改变。
<p>For a subscriber model, such that every call to <code>Notify() &lt;-chan Alert</code> returns a new channel to listen for Alerts on, what would be the good way to internally Tee the source <code>chan Alert</code> into multiple listener <code>&lt;-chan Alert</code>s?</p> <p>Ie, i&#39;ve got a single source of data, a channel, and three listeners all want to receive that event on a channel.</p> <p>Now, i could do a simple slice of channels, pushing into it with every new subscription and every time an event arrives, send it out to each channel on the slice.. but this seems sloppy. You have to lock it, to ensure subscribers are closed and that new subscribers don&#39;t subscribe after the fact.</p> <p>I thought about Teeing the channel, such that with every new <code>Notify()</code> call the channel is Tee&#39;d with something like this:</p> <pre><code>func TeeCh(source chan string) (tee1, tee2 chan string) { tee1 = make(chan string) tee2 = make(chan string) go func() { for s := range source { tee1 &lt;- s tee2 &lt;- s } close(tee1) close(tee2) }() return tee1, tee2 } </code></pre> <p>But this seems prone to accidental hanging channels, as you end up with a sort of dependancy graph of channels. One tee could block the other, delaying sending, and worse potentially not close the channel properly.</p> <p>So, i&#39;m a bit out of ideas. What&#39;s the &#34;good&#34; method for having one channel send to many channels, while also cleanly closing them when the source channel closes?</p> <p><strong>edit</strong>: I appreciate all the discussion everyone, thank you! In regards to what I&#39;m going to choose, I&#39;m still not sure. This discussion at least assures me I&#39;m not doing something blatantly obvious, though.</p> <hr/>**评论:**<br/><br/>peterbourgon: <pre><blockquote> <p>But this seems prone to accidental hanging channels, as you end up with a sort of dependancy graph of channels. One tee could block the other, delaying sending, and worse potentially not close the channel properly.</p> </blockquote> <p>But this is the inevitable consequence of teeing, isn&#39;t it?</p></pre>throwlikepollock: <pre><p>Not sure, honestly. I never suspected such downsides with unix Tee or Golang&#39;s TeeReader. Eg, with TeeReader, the source reader is just buffered and barring any limitation on buffer size, I see no reason why one of the tees would block the other. </p> <p>Ultimately, I feel like I&#39;m committing an anti-pattern. If it feels like I&#39;m fighting Go, I&#39;m probably doing it wrong haha.</p> <p>Currently I&#39;m just removing this usage of allowing multiple receivers for a single channel. </p></pre>Redundancy_: <pre><p>I don&#39;t think that the feeling of discomfort is that you&#39;re fighting Go, it&#39;s that you have to think <em>very carefully</em> about the behaviour that you want / need. I even remember this sort of example being mentioned in something about advanced usage.</p> <p>How much can you buffer before you should block etc?</p></pre>peterbourgon: <pre><p>Very often it&#39;s a mistake to make your API (certainly public, sometimes private) deal in channels directly. Can you model it as io.Readers and Writers instead? If you need to tokenize (i.e. if you need discrete strings), you can use bufio.Scanners on the read side, and write with newlines. Then you can use io.TeeReader or io.MultiWriter directly. But observe that MultiWriter has the same blocking semantics as your channel solution above, and it&#39;s harder to do buffering...</p></pre>allhatenocattle: <pre><p>You could use select to not block when sending to the tee&#39;d channels, although you&#39;ll need to figure out what you want the behavior to be.</p> <pre><code>func TeeCh(source chan string) (tee1, tee2 chan string) { tee1 = make(chan string) tee2 = make(chan string) go func() { for s := range source { select { case tee1 &lt;- s: case default: // log.Println(&#34;blocked sending to tee1&#34;) } select { case tee2 &lt;- s: case default: // log.Println(&#34;blocked sending to tee2&#34;) } } close(tee1) close(tee2) }() return tee1, tee2 } </code></pre></pre>Morgahl: <pre><p>How about this?</p> <pre><code>func Blocking(dst chan&lt;- string, s string) { dst &lt;- s } func NonBlocking(dst chan&lt;- string, s string) { select { case dst &lt;- s: default: } } func TeeChan(src &lt;-chan string, fn func(chan&lt;- string, string)) func(...chan&lt;- string) { return func(dsts ...chan&lt;- string) { go func() { for s := range src { for _, d := range dsts { fn(d, s) } } for _, d := range dsts { close(d) } }() } } func TeeChan2(src &lt;-chan string, fn func(chan&lt;- string, string)) (&lt;-chan string, &lt;-chan string) { tee1 := make(chan string) tee2 := make(chan string) go func() { for s := range src { fn(tee1, s) fn(tee2, s) } close(tee1) close(tee2) }() return tee1, tee2 } </code></pre> <p>This provides the ability to take a variadic amount of out channels and let&#39;s you configure blocking or non-blocking options.</p> <p>edit: TeeChan2 provides maybe a cleaner way to do this?</p></pre>Veonik: <pre><p>IMO, building a generic solution for this is problematic. It would probably be better to tailor the teeing to each use-case.</p> <p>That said: To prevent the first write from blocking the second, you could perform the writes inside their own goroutine. To facilitate buffering, you would probably want another pair of channels (not exposed) that have a large enough buffer to perform the initial write to (and never block), then a separate goroutine for each that reads from the internal channel and writes to the exposed output channel (and blocks as necessary).</p> <p>The output channels should be unbuffered so they can be closed safely. If they are buffered, closing them before the reader is done reading will destroy any values in the channel.</p> <p>Of course, this still leaves several problems such as channels that will never be read from (and so never finish) and duplicating all of the data several times and keeping it in memory in a buffer channel.</p></pre>dchapes: <pre><blockquote> <p>If they are buffered, closing them before the reader is done reading will destroy any values in the channel.</p> </blockquote> <p>Closing a buffered channel that has values in it yet to be read does not destroy them. The reader will read out the buffered values before seeing the channel close. Or do you mean something else?</p></pre>Veonik: <pre><p>Huh, you&#39;re right. I was mistaken, taking the <a href="https://dave.cheney.net/2014/03/19/channel-axioms" rel="nofollow">Channel Axiom</a> &#34;A receive from a closed channel returns the zero value immediately&#34; too literally. Dave even has <a href="https://play.golang.org/p/KcmnLYNlzL" rel="nofollow">an example that shows I was wrong</a> on that page.</p> <p>Thanks for the correction!</p></pre>ctcherry: <pre><p>I think the slice of channels sounds much simpler than a tree of channels. Here&#39;s a couple of actions that illustrate the complexity I&#39;m thinking of:</p> <p>Add Subscriber:</p> <p>TeeTree: you need to find where you want to attach this new channel, as the tree grows there could more and more options for placement.</p> <p>Slice: append a new channel on to the end of the slice</p> <p>Remove Subscriber: </p> <p>TeeTree: you will need to remove the channel and re-wire the parent/children to keep the tree intact.</p> <p>Slice: remove the channel element from the slice</p></pre>

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1481 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传