原文在此。遗憾的是文章只提出了问题,并没明确提供如何解决这些问题。但无论如何,对于这种可以引起反思的文章,是不能放过的。另外,我得承认,似乎高层次的分布式系统的抽象,用函数式语言的范式来表述更容易一些(实现上其实未必)。
————翻译分隔线————
channel 独木难支
或者说为什么流水线作业没那么容易
@kachayev 撰写
概述
Go 被设计用于更容易的构建并发系统,因此它有运行独立的计算的 goroutine 和用于它们之间通讯的 channel。我们之前都听过这个故事。所有的例子和指南看起来都挺好的:我们可以创建一个新的 channel,可以向这个 channel 发送数据,可以从 channel 读取,甚至还有漂亮和优雅的 select 语句(顺便提一下,为什么 21 世纪了我们还在用语句?),阻塞读和缓存……
主旨:99% 的情况下,我其实并不关心响应是由 channel 传递的,还是一只魔法独角兽从它的角上带来的。
在为初学者撰写指南的时候这确实挺酷的!但是当你尝试实现大型的复杂系统的时候这就很痛苦了。channel 太原始了。它们是低级的构件,我相当怀疑你愿意在日常工作中天天和它们打交道。
看看“高级模式”和“流水作业”。不是那么简单吧?有太多的东西要考虑,并且永远记得:什么时候、如何关闭 channel;如何传递错误;如何释放资源。我抱怨这些是因为我曾尝试实现一些东西,然后失败了。而我每天都在面对这些东西。
你可能会说,对于初学者没必要理解所有细节。不过……描述一个模式真得很“高级”?不幸的是,答案是否定的。它们是基础和常识。
更仔细的了解一下流水作业问题。这真得是流水作业?不,“…对于每个来自目录的路径计算 MD5
校验码,并将结果存入一个 map[string][string]
…”。这只是一个 pmap
(并行 map)。或具有池化执行器
的、有限的并行化的 pmap
。而 pmap
不应当需要我输入如此多行代码。想了解真正的流水作业——我将在文章的最后介绍一个(参阅“构建 Twitter 分析器”的段落)。
那么模式如何呢?
为了快速开发真实的应用,我们应当能够提炼出比原始的 channel 层面更高的抽象。它们只是传输层。我们需要应用层的抽象来编写程序(对比 OSI),否则你会发现你总是在低级的 channel 网络的细节上纠结,试图在生产环境中、偶发的、没有任何有效的方法重现的找到它不工作的原因。参阅 Erlang OTP 是如何针对性的解决类似的问题的:将你保护在低级的消息传递代码之外。
低级的代码有什么问题?这里有一篇超棒的文章“爱德华 C++ 手(译注:借‘爱德华剪刀手’)”:
手里有把剪刀并不一定总是那么糟糕。爱德华有许多天赋:例如,他能创造劲爆的狗狗的发型。别误会——它展示了许多劲爆的狗狗的发型(我是说优雅且简单的 C++ 代码),但是主要内容还是关于如何避免剪坏,以及在发生剪坏的情况下进行急救。
在 Kyiv Go 聚会的时候,我经历了相同的情况:在一页幻灯上那 20 行干净可读的代码。一个不一般的竞态条件和一个可能出现的运行时错误。这对于所有听众来说很明显吗?不。至少一半人不明白。
痛苦的缘由?
好,让我们试着收集一些类似的模式。从工作的经验中、从书中、从其他语言中(是,伙计们,我知道这有点令人难以相信,不过还有许多其他语言同样也有并发的设计)。
Rob Pike 讨论了 Fan-in、Fan-out。在许多情况下,这很有用,不过还是关于网络的 channel。而不是你的应用。在任何情况下,看看(无耻的从这里偷的)。
Rob Pike talks about Fan-in, Fan-out. It’s useful in many ways, but still about the network of channels. Not about your application. In any case, let’s check (shamelessly stolen from here).
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 为 cs 中每个输入的 channel 启动一个输出用的 goroutine。 // 从 c 中复制值出来直到 c 被关闭,然后又调用 wg.Done。 output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // 一旦所有输出的 goroutine 完成的,就启动一个 goroutine 来关闭 out。 // 这必须在 wg.Add 调用后启动。 go func() { wg.Wait() close(out) }() return out }
呃…… <-chan int
。在我的应用中重用起来没那么抽象(例如,迁移到库中)……并且在每次我需要的时候都重新实现也不是那么清晰。那么如何让其可以重用?<-chan interface{}
?欢迎来到类型转换和运行时错误的领地。如果,希望实现一个高级的 fan-in(合并)就必须牺牲类型安全。同样的(不幸的)是其他模式也是一样。
我真正想要的是:
func merge[T](cs ...<-chan T) <-chan T
是,我知道 Go 没有泛型,因为谁需要它们呢?
现在天气如何?
回到模式。让我们分析一个假设的项目,服务器端开发会与实际经验非常接近。我们需要一个服务器接收请求,输入一个美国的州,返回从 OpenWeatherMap 收集到的信息。例如这样:
$ http localhost:4912/weather?q=CA HTTP/1.1 200 OK Access-Control-Allow-Credentials: true Access-Control-Allow-Methods: GET, POST Access-Control-Allow-Origin: * Connection: keep-alive Content-Type: application/json; charset=utf-8
[{ "clouds": { "all": 40 }, "id": 5391959, "main": { "temp": 288.89, "temp_max": 291.48, "temp_min": 286.15 }, "name": "San Francisco", "weather": [ { "description": "mist", "icon": "50d", "id": 701, "main": "Mist" } ] }, { "clouds": { "all": 90 }, "id": 5368361, "main": { "temp": 292.83, "temp_max": 296.15, "temp_min": 289.15 }, "name": "Los Angeles", "weather": [ { "description": "mist", "icon": "50d", "id": 701, "main": "Mist" } ] }]
pmap
让我们从一些我们已经知道的东西开始。那么,我们收到了请求 ?q=CA
。我不想对从哪里得到相关城市的列表进行解释。我们可以用这个数据库,在内存中缓存以及其他什么合理的东西。假设我们有一个神奇的 findCities(state)
函数,返回 chan City
(像通常 go
程序表现的延迟序列那样)。然后呢?每个城市我们都必须调用 OpenWeatherMap API 并解析结果到一个 map[City]Weather
中。我们已经讨论过这个模式了。这是个 pmap
。我希望我的代码像这样:
chanCities := findCities(state) resolver := func(name City) Weather { return openWeatherMap.AskFor(name) } weather := chanCities.Par.Map(resolver)
或限制并发数:
chanCities := findCities(state) pool := NewWorkers(20) resolver := func(w Worker, name City) Weather { return w.AskFor(name) } weather := chanCities.Par.BoundedMap(pool, resolver)
我希望所有这些 <-done
同步和神圣的 select
完全被隐藏起来。
Futures & Promises
获取当前天气可能需要很长的时间,例如,你有一个很长的城市列表。当然,你不希望重复的 API 调用,因此应当可以用某种方法管理并行的请求:
func collect(state string) Weather { calc, ok := calculations.get(state) // check if it's in progress if !ok { calc.calculations.run(state) // run otherwise } return calc.Wait() // wait until done }
这也被叫做 future/promise 。Wiki 的解释:
它们描述了一个对象对于结果扮演了代理的角色,而这在一开始是不可预知的,通常是由于它的值尚未完成计算造成的。
我已经听过太多人说 go 的 future
很简单:
f := make(chan int, 1)
这是错误的,因为所有的等待者都应当得到结果(译注:实现 channel 的订阅与变化值的广播确实另人头大)。而这个版本也是错的:
[C]
f := make(chan int, 1)
v <- f
f <- v
// 在这里使用 v
[/C]
由于不可能用这个方法管理资源。所以,当某个家伙在他的代码里丢了 f <- v
那部分,我希望你能幸运的发现这个 bug。
将数据直接发给所有的等待者来实现 promise 没那么复杂(我不确定这段代码是不是有 bug):
type PromiseDelivery chan interface{} type Promise struct { sync.RWMutex value interface{} waiters []PromiseDelivery } func (p *Promise) Deliver(value interface{}) { p.Lock() defer p.Unlock() p.value = value for _, w := range p.waiters { locW := w go func(){ locW <- value }() } } func (p *Promise) Value() interface{} { if p.value != nil { return p.value } delivery := make(PromiseDelivery) p.waiters = append(p.waiters, delivery) return <-delivery } func NewPromise() *Promise { return &Promise{ value: nil, waiters: []PromiseDelivery{}, } }
如何使用他呢?
p := NewPromise() go func(){ p.Deliver(42) }() p.Value().(int) // 阻塞,当有值的时候返回 interface{}
不过这里有 interface{} 和类型转换。我实际上想要什么呢?
// 在那些经过良好测试的库,甚至 stdlib 中 type PromiseDelivery[T] chan T type Promise[T] struct { sync.RWMutex value T waiters []PromiseDelivery[T] } func (p *Promise[T]) Deliver(value T) func (p *Promise[T]) Value() T func NewPromise[T]() *Promise[T] // 我的代码: v := NewPromise[int]() go func(){ v.Deliver("woooow!") // 错误 v.Deliver(42) }() v.Value() // 阻塞并返回 42,而不是 interface{}
是的,当然,没有人需要泛型。我在讨论什么鬼玩意啊?
也可以通过使用 select 来避免 p.Lock()
来监听 deliver
,并在一个 goroutine
中 wait
操作。还可以引入对最终用户极为有用的 .ValueWithTimeout
方法。还有许多许多其他“你可以……”。尽管我们实际上是在讨论一个 20 行的代码(它的长度可能在每次你发现 future/promise 交互更多细节的时候就开始增长了)。我真得需要知道(或想到) channel 为我传递值吗?不!
pub/sub
假设我们想要构建一个实时服务。那么我们的客户端现在可以开启一个 websocket 连接,传递 q=CA 请求,并即刻获得加利福尼亚的天气变化情况。它看起来应该像:
// deliverer calculation.WhenDone(func(state string, w Weather) { broker.Publish("CA", w) }) // client ch := broker.Subscribe("CA") for update := range ch { w.Write(update.Serialize()) }
这是一个典型的 pub/sub(译注:公告/订阅)。你可以从高级 Go 模式的演讲中学习它,甚至可以找到即刻可用的实现。问题是,它们全都基于接口的。
有没有可能实现:
broker := NewBroker[String, Weather]() // so that broker.Subs(42) // compilation failure // and broker.Subs("CA") // returns (chan Weather) not (chan interface{})
当然!如果你能勇敢的在项目之间复制粘贴代码,并到处进行修改。
map/filter
假设希望给与我们的用户更多的弹性,从而引入了新的查询参数:show
,它的值可以是 all|temp|wind|icon
。
可能你可以从基础开始:
ch := broker.Subscribe("CA") for update := range ch { temps := []Temp for _, t := update.Temp { temps = append(temps, t) } w.Write(temps) }
不过,在写了 10 个这样的方法之后,你会意识到它没那么模块化,并且也很无聊。可能你需要:
ch := broker.Subscribe("CA").Map(func(w Weather) Temp { return w.Temp }) for update := range ch { w.Write(update) }
等等,我有提过 channel 是一个 functor(译注:函子)吗?跟 future/promise 一样。
p := NewPromise().Map(func(w Weather) Temp { return w.Temp }) go func(){ p.Deliver(Weather{Temp{42}}) }() p.Value().(Temp) // Temp, not Weather
这意味着我重用了 future 的 channel 的相同代码。你也可以用想 transducers 这样的东西来完成它。我经常在 ClojureScript 的代码中使用的技巧:
(->> (send url) ;; returns chan, put single value to it {:status 200 :result 42} when ready (async/filter< #(= 200 (:status %))) ;; check that :status is 200 (async/map< :result)) ;; expose only 42 to end user ;; note, that it will close all channels (including implicit intermediate one) properly
当我可以简单的进行 x.Map(transformation)
并得到相同类型的值的时候,我真得需要关心 x
是个 channel 还是个 future 吗?在这个例子里,为什么允许我创建 make(chan int)
而不能创建 make(Future int)
呢?
Request/Reply
假设我们的用户喜欢这个服务,并且频繁的使用它。那么就需要引入一些简单的 API 限制:每天、每个 IP 请求的数量。收集这个数量保存在一个 map[string]int
中很简单。Go 的文档说“不要通过共享内存来通讯,用通讯来共享内存”。好吧,听起来是个好主意。
req := make(chan string) go func() { // wow, look here - it's an actor! m := map[string]int{} for r := range req { if v, ok := m[r]; !ok { m[r] = 1 } else { m[r] = v + 1 } } }() go func() { req <- "127.0.0.2" }() go func() { req <- "127.0.0.1" }()
这很容易。现在可以计算每个 IP 请求的数量了。不但如此……同时也可以要求执行请求需要权限。
type Req struct { ip string resp chan int } func NewRequest(ip string) *Req { return &Req{ip, make(chan int)} } requests := make(chan *Req) go func() { m := map[string]int{} for r := range requests { if v, ok := m[r.ip]; !ok { m[r.ip] = 1 } else { m[r.ip] = v + 1 } r.resp <- m[r.ip] } }() go func() { r := NewRequest("127.0.0.2") requests <- r fmt.Println(<- r.resp) }() go func() { r := NewRequest("127.0.0.1") requests <- r fmt.Println(<- r.resp) }()
我不会再问你要泛型的解决方案(没有写死的 string 和 int)。换而言之,我希望你检查一下这段代码中是不是都正确?真得这么简单吗?
你确定 r.resp <- m[r.ip]
是个好办法?不,肯定不是!我希望有任何人等待那些很慢的客户端。是吗?而如果我有许多很慢的客户端的时候会怎么样呢?可能我需要对此进行一些处理。
而 requests <- r
这部分简单吗?如果我的 actor(服务器)过载无法响应的时候呢?可能我需要在这里处理超时……
时不时的我就需要特定的初始化和清理过程……都需要超时机制。并且需要能保持请求,直到初始化完成。
那么调用的优先权呢?例如,当我需要为了分析系统实现 Dump 方法,但是又不想让所有的用户暂停来收集需要的数据。
还有……看看 Erlang 中的 gen_server。为了保险起见,我希望它一被实现就可以是具有良好的文档的,经过高度覆盖测试的库。98% 的情况下,我不希望看到这样的介绍:make(chan int, ?) 而我不想思考到底我应该将 ? 替换成多少。
99% 的情况下,我其实并不关心响应是由 channel 传递的,还是一只魔法独角兽从它的角上带来的。
数不胜数
还有许多其他常见的并发的情况。我想你已经明白了。
苦难
你可以说,这些模式都不常见。不过……我在我的项目中不得不实现它们中的大多数。每!一!次!可能我不怎么走运,而你的项目会跟写给初学者的指南一样简单。
我知道,你们中的大多数会说“世界是艰辛的,编程是苦难的”。我会继续打击你:至少有一些语言展示了部分解决这些问题的示例。至少,在尝试解决它。Haskell 和 Scala 的类型系统提供了构建强大的高级抽象的能力,甚至自定义控制流来处理并发。而另一阵营的 Clojure 利用动态类型鼓励和共享高级的抽象。Rust 有 channel 和泛型。
让它工作 -> 让它优雅 -> 让它可重用。
现在,第一步已经完成。接下来呢?不要误会,go 是一个有远见的语言:channel 和 goroutine 比起例如 pthread 来说更好,不过是不是真得就停留在此?
补充:构建 Twitter 分析器
关于真实的流水作业。
你可能已经看过 Twitter 的分析了,它真得很棒。假设它尚未出现,而我们需要自己的分析工具:提供一个用户名,来统计有多少用户看过(至少是理论上)他的 tweet。应该如何做呢?其实不难:读取用户的时间轴,过滤掉所有的 retweet 和回复,然后请求其他 tweet 的 retweeter,为每个 retweeter 请求 follower 的列表,合并所有 retweeter 的 follower 在一起,然后加上这个用户的 follower。对于这个步骤我想要的结果是:
map[TweetId][]Username
(retweeter)和 map[Username][]Username
。这些用于构造一个向请求者展示的魔幻的表格是足够了。
有一些技术细节你应当留意:
-
Twitter API 需要每个调用都使用 OAuth,并且设定了很强的限制(每个用户每 15 分钟 450 次调用)。为了对付这个限制,我们将用预定义的一个 OAuth token 列表(例如 50 个)组织在一个池中供 worker 使用,每个 worker 在达到限制之前都可以让自己休息一会。
-
大多数 Twitter API 调用通过 since_id 或 max_id 使用了结果分页。因此你不能依赖一个请求就可以获取完整的结果。
一个粗糙的实现的例子。注意,你没必要理解这个文件中所有的内容。相反,如果你无法理解的话,这恰恰说明我们做对了。
那么我们现在有什么?
- 一些步骤的计算:TimelineReader -> RetweetersReader -> FollowersReader -> FinalReducer。
- 自供消息。由于分页所有阶段都是递归的。这意味着每个步骤都会向下一个阶段和其本身发出消息。在这个情况下,很难处理取消的情况。甚至无法发现某个步骤的工作全部完成。
- 尽早传播。至少有两种情况:首先为了通过 TweetId 来收集 []Username,我们需要将收集到的信息直接从 RetweetersReader 发送到 FinalReducer。然后,一开始我们就知道,需要获得初始用户的 follower,因此他的用户名应当从 TimelineReader 传递到
RetweetersReader 步骤。 - 中间收缩。FollowersReader 不只是一个管道。它会过滤我们已经见过的用户名(因为总不想重复工作吧)。
- 持续工作的 worker。在许多情况下,你无法等待 worker 退出。例如,当你实现了一个服务,它会同时响应许多客户端的时候。
有疑问加站长微信联系(非本文作者)