今天继续优化了bigpipe项目,核心目标就是解决重启程序损失流量的问题。
背景
bigpipe作为一个消息中间件,其出现是为了给php程序提供方便的异步Http调用功能。然而php语言并不是常驻进程模型,当它请求bigpipe失败后,顶多重试几次,就必须尽快的向用户返回应答。因此,bigpipe服务的可用性是非常重要的。
bigpipe使用golang编写,采用channel逐层缓冲流量和数据,采用协程并发处理数据。因为bigpipe承接了若干业务,经常会对配置文件做一些修改,那么就必须重启bigpipe。
思考
在最初的版本中,bigpipe提供了优雅退出功能,也就是在退出前首先停止对外的Http服务,然后将进程内剩余的数据处理干净,最后再退出,这样不至于损失已经接受到的数据请求。
优雅退出存在一个问题,就是先要停止对外http服务,这样才不会有新的流量涌入,才有可能把缓冲在内存里的剩余流量处理干净。因为这个设计,导致在http停止服务后的一段时间内,客户端是无法访问bigpipe的,服务完全不可用。
最初的想法是,部署多个bigpipe,前端采用lvs/haproxy等负载均衡,这样一旦http端口关闭,lvs会自动转发流量。但是,这样的缺点是要求bigpipe必须多点部署,而且lvs/haproxy并不能保证流量瞬时切换到正常节点,总要损失一些流量,而这就要求客户端支持重试逻辑,总之不是一个完美的方案。
另外一个想法是,仍旧部署多个等价bigpipe组成集群,在bigpipe之前部署一个自研发的轻量级的proxy服务,其支持多个bigpipe之间转发重试,然而这样不仅是带来了更大的运维成本,其实还是没有直面问题本质,在错误的路上越绕越远。
方案
必须让bigpipe支持配置热加载,这一点实现起来并不是很简单,下面我来说说难在哪里。
首先,在加载新的配置期间,不能停止http对外服务,因此我决定Http模块自身不支持热加载(http监听地址,读写超时等简单配置),它始终保持对外服务。
然而,请求的处理模块等是需要加载新的配置的,在重新加载这些模块期间,http接收的请求必须要缓冲起来,这样才能做到流量0损失,因此我重新设计了模块结构,在http接口层和业务处理层之间增加一个缓冲层,专门用来支撑热加载期间的流量缓冲作用。
为了简化设计,无论是否使用热加载特性,这个缓冲层总是存在。
另外一个重要的变更点是,之前配置文件我采用了全局单例的模式,并假设了一旦加载就不会再变化。然而在golang这样一个多线程并发的模型下,要支持热加载配置,就不能让配置自身成为单例了,否则各个模块正在访问单例的同时配置内容加载成新的,那模块就会崩溃。
因此,关于配置热加载的正常的设计思路是,旧模块使用旧配置,新模块使用新配置,配置文件不再保存单例,而是解析成功后将副本传入到各个模块之内保存。
一旦配置文件重新加载到内存,那么接下来要做的就是和优雅退出类似,先让http模块暂停向内部模块转发流量,但是它仍旧接收外部流量,并缓存起来。
接下来,各个旧模块开始消耗剩余的流量,最终销毁自身。
当所有旧模块退出后,将新的配置传递给各个模块,启动新的模块实例,并恢复http模块继续向内部模块转发流量,程序恢复运行。
不过,仅仅完成这些设计并不能解决整个问题,最棘手的是log和stats模块,前者负责日志,后者负责程序计数,它们一样需要热加载配置,比如:运维想把日志的输出目录或者日志级别变更一下。
这两个模块比较特殊,它们被其他各个模块调用,并且是并发的调用,相当于”给天上的飞机换发动机”,非常难。按照设计,应当在老模块全部销毁后,将log和stats销毁并重建。但是问题来了,http服务模块并没有销毁,它仍旧在实时的操作log和stats库,那么又怎么重启这2个模块呢?
这里我使用了atomic库,log和stats模块都是单例模式,保存的是对象的指针。在程序仍旧在持续访问2个模块的情况下,想要销毁这个单例并重建,必须对指针进行原子操作,好在golang提供了指针的atomic操作:
1 2 |
atomic.StorePointer atomic.LoadPointer |
有了这2个api,我就可以原子的操作单例指针,完成瞬时的转换。
当然,在销毁之后到重建之间的这段时间,http模块打印的log和stats统计都会无效,但是这个时间通常可以短到忽略。
以log库为例,相应的日志操作函数也首先通过atomic获取log指针,如果存在则进行实际的操作,否则什么也不做:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// 单例 var gLogger unsafe.Pointer = nil func getLogger() *logger { return (*logger)(atomic.LoadPointer(&gLogger)) } func FATAL(format string, v ...interface{}) { if logger := getLogger(); logger != nil { userLog := fmt.Sprintf(format, v...) logger.queueLog(LOG_LEVEL_FATAL, &userLog) } } func ERROR(format string, v ...interface{}) { if logger := getLogger(); logger != nil { userLog := fmt.Sprintf(format, v...) logger.queueLog(LOG_LEVEL_ERROR, &userLog) } } |
这就是我在实现bigpipe热加载期间遇到的一些问题,希望对大家设计热加载时有所帮助。