大家下午好!我是来自七牛云大数据团队的党合萱。今天向大家介绍一下我们是如何基于Go搭建大数据平台的。
七牛的大数据的产品——Pandora
数据打进系统之后先落地到消息队列,落地之后有一个计算任务,数据经过计算之后还可以再次落地到消息队列。这个过程很灵活,比如可以选择最左边的一条线把数据直接倒出到下游的系统,图1上的示例是一个HTTP导出,除此之外还可以导出到实时数据库、日志检索服务,或者到七牛云存储上。日志检索服务上可以生成图3的柱状图,时序数据库可以生成上面的仪表盘分析(图 2)。
系统设计分析和架构
一个环节——从消息队列导出数据到下游系统,即图 4 中的 Export Service 模块。这是一个连接上下游,有承上启下功能的系统,在实现过程中我们遇到了很多挑战。接下来,我们将从最初的设计开始,来看一下中间有哪些具体的挑战。
一下可能会遇到的一些问题,如图 5 所示。一个好的系统不仅要解决当下的问题,更要考虑到未来可预期时间里业务上或者数据规模上可能面临的一些挑战。这六点当中左边三点是我们在业务上考虑的,右边三点是从具体的实现或者架构上考虑的问题。可以看到最核心的要求就是系统要有高吞吐、低延时的能力。
这个系统最核心的是图中的红色和蓝色框部分,这是一个master/server结构的系统,经过master的分配和调度,把数据拉到下游的各个业务模块里。最右边的绿色框图是我们的监控系统,这个监控系统从最上层的业务指标、链路性能,以及机器的健康状况都可以收集和监控。黄色框图部分可以对业务和监控做可视化的展示。在图 1 的可视化的界面中,用户可以通过鼠标式的拖拽创建工作流,这个工作流的导出部分体现在导出系统当中就是做数据的拉取、处理和推送的工作。
多种上下游数据适配
我们对整个系统的要求,除了了高吞吐低延时之外,还必须能将数据传送到多种下游,比如说七牛云存储、数据库服务、实时数据库、日志检索当中去。这要求要考虑多种导出业务逻辑上的相同和不同的点。我们整理出两种模型,第一种是通用导出模型,一个任务开始执行之后,要从上游拉取数据,经过一些处理和过滤之后,将数据推送到下游系统,比如说图中的日志检索、时序数据和远端的http服务等。除此之外有一个特例,就是七牛云存储,系统数据导出到云存储之后不要有太多的小文件,也不要占用太多的空间,否则从存储费用和下游系统二次处理这些数据的角度看都有不少弊端。所以,我们增加了中间的两步,就是把数据存储在本地,进行一次压缩之后再存到云存储,这样的话可以有效的减少文件的个数,也能减小云存储空间的使用,降低了用户的成本。
高吞吐/低延迟问题探究
这个问题是比较困难的,尤其是在数据量比较大的时候,高吞吐和低延迟各自有一些困难的点。根据我们的实践过程来看,吞吐量方面遇到最大的问题就是资源的利用率不够高,或者上下游系统的吞吐量能力不一致,存在短板效应,我们需要做好几个系统之间的衔接。低延迟问题在很多情况下其实是对服务稳定性的一个要求,或者说要规避热点,就是如果真的有一个任务比较热、比较重,怎么做才能合理切分和规避热点。最后就是一些环节上可能过度消耗资源,这在很大的程度上就是代码写的好不好的问题。在所有问题当中,如果有那么一个两个没有得到很好的解决,就会出现lag,比如说用户打到消息队列里面的数据,半个小时都没能导出,对用户来讲这就是灾难性的结果,尤其是那些对我们业务有强依赖的产品。
为了逐步减轻高吞吐和低延迟方面的问题,我们做了一些优化。第一个优化就是在消费数据的过程中做预取。从下游取数据的时候,需要拉取数据过来。一开始看到是简单的模型,先拉取再推送,但数据量打了之后我们发现机器的资源利用率并不高,lag以肉眼可见的速度在上涨。观察后发现任务中数据的拉和推应该同时进行,在推数据的同时预取下一批数据,这样从单位时间处理数据量上来看效率有一倍的提升。
第二个就是我们做了数据推送协议的优化。我们从export service推送数据的时候用了Json格式,但是它的序列化和反序列化的性能比较差,影响服务的整体性能,经过调研我们将Json格式换成了Protobuf格式,经观察带宽消耗减少了近一倍,吞吐量提升了,CPU消耗也降低了一倍以上。
我们还优化了资源的使用方式,在数据导出到云存储时,为了节省空间,而且考虑到下游系统使用数据的便利,我们使用了parquet压缩。parquet的压缩效率是随着文件的大小而改变的,经过观察,大约1GB左右的文件压缩效果是最优,可以达到8:1左右。后来业务量逐渐增大以后,我们发现parquet压缩的CPU使用情况超过了我们的预期,如果同时有十到二十个文件一起做压缩,CPU就会打满,这时候网络拉取也会受到比较大的影响,速度会降低到总带宽的三分之一或者四分之一,总体来看,这种情况会影响导出的性能,对外表现出来就是数据可能来不及导出从而出现lag。观察到这个情况之后我们就对parquet压缩的并发度进行了限制,比如说最多只允许八个或者十个parquet压缩,这样做可以相对有效的降低CPU的峰值,但我们还没有更精确、更平滑的使用CPU资源,虽然限制了并发度,但偶尔也会出现CPU飙高的情况,我们后续会继续在CPU的平滑使用上做一些工作,彻底的规避掉这个情况。
提升了导出的性能之后,上下游系统的处理能力之间不匹配就有可能出现木桶效应,一方面数据吞吐量难以再进行提升,另一方面可能会将连接的上游或者下游打垮。那么该如何感知上下游系统的压力,压力到一定程度之后,如何做一些退避,我们采用了一些实践比较好的指标。第一个是退避时间,我们从kafka拉数据的时候,如果拉取过慢(1秒或者是2秒以上)我们就将两个批次之间的休眠时间扩大一倍,但这种策略对我们的效率影响很大。还有特定的错误码,这可以视作下游系统的一个负载过大的信号,需要少发一些数据。再就是超时,我们拉数据的时候并不是一个数据点,而是一个批次,有些数据点比较大,有些数据点就有1M、2M,如果同时拉2万个点,就没办法等到这个数据,30秒可能就超时了,针对这个就认为对上游的数据影响比较大。
刚才说我们是一个批次一个批次来做,批次的数量有个上限和下限,怎么设置上限和下限呢?有快启动和慢启动两种方式。我们先尝试了慢启动,发现这个资源的使用率变得很低。因为一个批次是从比较小的数量开始,增加到一个合理的大小是需要时间的,但是线上的实际情况当中,大部分的用户数据点比较小,没必要使用慢启动慢慢寻找这个合理批次的大小,时间上浪费比较大,所以我们采用了快启动方式。只有极个别的任务数据点比较大,可能遇到批次需要减小的问题。
还有退避策略,如果上游或者是下游,遇到繁忙、没有数据或者说数据比较稀疏的时候,怎么做一些退避,避免不停的造成压力。一旦出现这种情况,前5次拉取和到处的过程中会休眠1秒钟,如果休眠结束后还是需要退避,我们会对休眠时间做指数极的增大,直到它增加到一个上限值,比如说32秒为止。
高可用和水平拓展
我们的系统是一个master/server的结构,要达到低延迟和高吞吐的目标,节点的高可用和系统的可扩展行是一个必须考虑的问题。
master负责的任务、切分和server的感知,以及任务的调度和分发。比如说T1、T2、T3,如何进行任务的切分,切分之后分配在不同的机器上,才能将热点进行拆分。
我们看一下master如何做到高可用,在设计的时候master尽可能做成一个无状态的节点,它的所有身份信息包括一些中间的调度状态,我们把它定期放在zookeeper上。系统启动的时候,多个master实例通过抢锁的方式决定谁成为主master,其他没抢到锁的master就会成为备选的节点。如果这时网络断掉,或者主master宕机,master就会发生主备切换。由于master的身份信息都注册在zookeeper上,所有的sever通过监听zookeeper就能感知到master切换的这个事件,这就是master高可用的实现原理。
那么server是如何实现高可用的呢?server要通过上报心跳的方式表明自己的存在,以及自己正在执行哪些任务。比如说server1正在执行三个任务,现在网络断联,master在两个心跳周期内一直都没有发现server1的存在,那么server1上面的T1、T2、T3就被master调度到其他的机器上,server1就从集群中摘掉了。这里的server也是无状态的设计,某一台机器的宕机不会对服务造成影响。
除了高可用之外,水平扩展也是影响系统可用性的重要因素,水平扩展的需求经常出现在系统资源水位比较高的时候,比如说现在有3台机器,由于业务量的增大,CPU、网络等资源接近满载,实在不堪重负,这时候可以增加1台机器。增加机器之后,master通过心跳发现了新的server,对各个服务器之间的压力做出评估之后,发现新加的server4处于空闲的状态,就会尝试从比较繁忙的server1和server2上调度一些任务过来,分配到server4上。因为我们是通过心跳的方式来感觉新的server,所以水平扩展的复杂程度很低,只要把新的server按照同样的配置启动就可以,它会自动加入到集群当中去。
看完了刚才的高可用和水平扩展之后,可以看一下整个Export Service。export master有个任务扫描器、执行计划发送器和心跳收集器。心跳收集器会收到每台机器的心跳,形成server列表,server列表包括了所有资源的使用,包括CPU、跑的任务等。经过一次评话之后,信息被传送到执行器,执行器里有好几个调度组。每一个调度组中有各自的export server,以及各自不同的任务,各个调度组之间是完全隔离的。各个调度组调度的结果会统一传送给发送器,发送器会周期性的把计划下发给每个server。
这里一定有人会问为什么还有M个调度组,调度组的出现是刚才已经看到了下游系统有好几种,比如说实时数据库、云存储。我们对每个任务组使用的资源方式不同,所以在每个调度组里可以看到一些调度的参数,包括对一些不同的资源权重可以单独进行设置,包括调度的间隔以及平滑机的平滑方式都可以单独设置。其实平滑器是嵌在调度组里的。还有一些大任务,这些大任务过来之后独占一个机器,就和小任务做了物理上的隔离,避免相互干扰导致的数据延迟。我们看到图的最上方有个RESTful的管理接口,下面有一个调度执行器可以看到任务的执行情况和调度组的情况,作为工程师可以通过调度接口了解集群的情况。还有监控报警,包括下游系统,也可以通过接口查看系统的状况,对系统资源做一些调整。
在了解了调度的框架之后,接下来我们看一下是系统是如何调度任务的。我们的目标是提高资源的利用率,减少任务之间的影响,包括说尽量把降低任务的延迟。最后一点比较特殊,我们要尝试用各种异构的机器。最开始设置的时候,发现我们的服务器有好几种不同的规格,比如说CPU、网络,甚至硬盘的规格以及数量差异都很大。我们怎么才能合理的去调度,利用这些机器呢?我们对整个调度做了一些抽象,就是怎么去评估资源的需求和余量。比如说一台四核16G的机器和一台两核的机器差别是什么,应该怎么用它呢?就涉及到第一个问题,就是它的需求和余量评估,还有任务的轻和重。并不是每个任务都一样,总是有一些任务数据来得又快又猛,有一些相对比较空闲,量化它也是一个调量。
第三个是任务间的影响,大任务和小任务,大任务对小任务冲击,怎么才能不影响小任务,或者他们之间不要互相影响,我们在这个过程中做了一些抽象抽象之后定量,比如说CPU的余量。假设现在这个机器控制了一个核,CPU的资源就有1分,网络也是一样,10M就是一个资源的力度。
还有就是一些资源和打分,比如说任务有多重,过去的一段时间里,到底使用了多少个CPU,有多少数据,使用了多少网卡都考虑进去。还有就是分布度的打分,我们发现任务分布的不均匀会对导出效率产生影响,所以我们要平均分配在不同的机器上,所以会有分布度的打分。
最后是指标之间的权重,比如说CPU的使用和网卡到底哪个比较重要,哪个对我们的影响比较大。我们在每个调度组里有不同的权重进行控制。最后通过任务的打分、分布度评估机器,在任务的分布时候做一些任务的调整,轻松一点的就多分配一点任务,重一点的机器就会让它空闲一点,通过分担让任务尽量不要有延迟,不要有资源的过度消耗。
自动化运维
做完上面的事情之后总有运维的过程,写代码突然被打断了,会感觉很不爽,所以我们希望尽量自动化一点。我们是如何做自动化运维的,首先从监控来看,我们使用了一个比较有力的工具就是logkit,这是我们大数据团队用纯Golang开发的工具,这个工具可以采集机器上的一些信息,比如说网卡、CPU都可以采集,同时也支持多种下游系统的推送,推送到Pandora本身以及时序数据库。通过grafana进行实时的推送。还有slack,我们工作经常会用这个沟通,如果有出现问题,第一时间会有反馈。还有就是email。我们还有自己写的监控脚本,对数据进行一些聚合或者说分析处理之后,会给下游的系统,或者说通过邮件的方式,让每个人都可以看到。
系统热点自动感知与调整。在logkit收集数据之后,全部转到Pandora或者七牛云存储之上,基于xspark服务进行分析,看一下历史的流量,比如从昨天1点、2点、3点每个时间段资源的使用以及任务的分配,究竟是怎样的。从历史数据做一个基本的预测,比如说今天究竟流量可能会达到什么样的程度。然后,我们的master和server之间会上报心跳,心跳之中会包含很多的实时数据。通过实时数据和历史预测数据的结合,我们会考虑是不是暂时,或者说数据真的会热起来,提供参考对系统进行调整。
经过了所有的工作之后,现在的系统现状就是每天处理超过千亿的数据点,超过百TB的数据量。正常的业务导出延迟都在1分之内,用户导出数据之后可以在日志里检索数据,可以在grafana里面看到。就是刚才说的不希望在写代码的时候被打断,极少的人工介入。我们秒级的扩容,我们直接把服务器进去就登载master的调度,把热度切过来就比较容易。还有就是实时了可视化监控系统和报警,其实主要是通过logkit。还会生成自动生成线上日报,看哪些的延迟是比较大的,包括有些错误都会发出来。其实除此之外,还有一些小时级的邮件,通过它来看是不是真的有问题,看线上的状况如何。
Go的应用
在整个开发和设计过程中,我们是怎么用Go的呢?Pandora的流式计算、离线计算、日志检索、时序数据库等一整套服务的核心代码都是用Go开发的,还有刚才提到的logkit、还有一整套的脚本语言工具都是用Go来开发的。
为什么选择Go?真的是比较容易上手,入门也简单,我之前一直是写C++,到了七牛之后,原本我以为会有一些C++,看到全部是Go的时候,刚开始也略有担心,实际上一天就可以写出可以用的代码。就是因为这个语言的特性,它的语法相对比较简单,而且由于提供了更容易的并发模型,降低了整个程序员的心智负担,可以让大家把精力集中在业务逻辑上,不用像过去一样纠结这个地方要怎么写,或者总是编译不过的事情已经很少了。还有就是它有比较丰富的库,一般遇到的问题,在官方的库中都可以找到解决方案,除此之外github上也有很丰富的资源可以使用。七牛是国内第一批在Go语言方面进行实践的公司。七牛内部的RPC、缓存系统等都已经打磨的很成熟,基于这些已有的东西进行系统开发确实比较容易,所以我们选择了Go语言。事实证明我们的开发节奏比较快,对于需求的反应比较灵敏,所以Go语言是很好的选择。
了解更多大数据玩法,点击“阅读原文”
与会者:我有两个问题,第一个是刚才看您的架构是双股的玩法,这个双股的策略是怎么样的呢?就是里面有双master的。
党合萱:现在我们是通过抢锁的方式,同一时间只有一个master,如果出现问题,就立马自杀,进入抢锁的逻辑,所以不会出现双master的情况。
与会者:这样会带来一个问题,一台永远是忙着,另一台备着永远是空闲。
党合萱:是有这个可能。
与会者:另外我们会做多个数据源的抽取,抽取了之后,我看有一个实时数据过来的,你们的任务策略是怎么安排的?
党合萱:我们到云存储是作为单独的分组,对于其他的实时数据库或者是http都是绑在一起的,因为这样的数据量和导出模型类似,基本上没有相互的影响,所以都放在一起。
与会者:你好!我想问一下,你们的产品是你们自己内部用还是外部使用?
党合萱:现在我们内部在使用,外部也在逐渐使用。
与会者:刚才讲的分组量和lag的问题,刚才你说数据预取怎么操作?因为Kafka的并发速度超过了效用,和你刚才说的预取怎么处理?
党合萱:这个预取是基于单个的流做,这涉及到outsinde进行处理。如果这个数据没有处理,其他数据还在往上走,会出现数据空洞,是这难以处理。所以我们是一个每一个contion(音),有一个线程的数据,拿到一批数据就从这里取一个数据过来,一对一的处理。
与会者:我想问一下这个产品整个系统QPS是多少?Kafka需要多少个集群?
党合萱:我们现在QPS是每分钟有接近200万个。
与会者:200万个QPS?
党合萱:就是请求个数,就是每个请求会有多少处理。
与会者:刚才你的架构图里有一个report studio
党合萱:这和garafa不一样。
与会者:所以是你们自己开发的报表系统?
党合萱:对。