导语
迅猛发展的互联网将我们带入了大数据时代,大数据已经成为发展中不可或缺的力量支撑,大数据挑战和机遇并存,如何更好合理、灵活应用大数据是企业的关注所在。七牛大数据团队研发工程师孙健波为大家带来题为Go 在大数据开发中的实战经验的技术分享。以下是此次演讲内容整理。
作者介绍:
大数据
如图 1 可以看到,现在大数据的生态相对来说比较成熟了,有很多很多大数据相关的组件。比如储存 Kafka,HDFS,集群调度可以用 Mesos,检索用 Spark 等。数据可视化还有Zeppelin 等等工具,监控可以用 Grafan 等等。但是这么多复杂的组件放在一起,如果是玩技术可能玩的很嗨,有这么多组件可以玩,每个东西我们可以组合起来,可以完成自己的事情。但是对于一个只想要做业务,只关心业务,需要挖掘他们价值的人来说,这个其实非常痛苦的,因为他们需要知道怎样才能把这些组件联合起来,去把一个个组件的坑填上去。
七牛大数据平台是一个让大家能够更容易挖掘数据价值、让大家更容易去使用大数据服务的平台。
所以我们潘多拉七牛大数据团队做了以下的事情:
将多样的大数据工具整合
将复杂的大数据管理简化
构建完整的大数据生命周期闭环(收集-加工-分析-管理-消费-冷冻)
所以简单的说 Pandora 大数据平台的理念是什么呢?就是把各种各样的大数据的工具整合起来,让大家这个操作简化,可以关注数据本身的价值,完成这个数据信息的挖掘。
千上万上亿的数据,这样的话客户不直接关心这些原始的数据,客户会做一些筛选,做一些聚合。然后根据需要,导入到 Pandora 保存到日志检索服务,或者查询,快速检索。
我们不仅有开放 API 用户可以直接调用,在我们这个平台上,为你的用户提供你自己数据的价值,同时也可以基于开源的工具来查看你的这些数据。当然最终如果你针对这些比较久的数据,你还想做离线的分析,那么你云储存上面的数据,或者日志
们定制化的一个 Kafka,下一步进入数据转换(过滤、清洗、计算等等),然后经过定制的 Spark Streaming。最后提供一个导出服务,导出服务可以导出到你其他想要的服务,比如说时序数据库,这个是我们自研的分布式时序数据库,用于实时的数据监控、聚合等需求。
也可以导出到日志检索服务,用于数据查询分析,然后还可以导出到七牛的云储存,最后经过 Grafana、Kibana 等工具进行数据可视化。无论是离线还是实时,均可以这样处理。
目前七牛的大数据团队有比较高的数据规模,每天上百TB,2000多亿条实时的增量数据。我们提供的下游的的落地工具也是比较丰富的。基本满足了目前我们看到的一些大数据方面的使用的需求。
那么这种量级的数据导出,到底会有哪些问题?
大家可以看到,我们有很多服务,像实时数据库、日志检索、云储存等等,我们要把这些海量的数据经过一些计算,然后再导出。这里面就是海量数据会在我们的系统里面经过数次的变化,然后流动的效率怎么办呢?会不会有什么问题呢?大家马上就想到,最大的问题就是延迟。我们号称实时,如果有很大的延迟的话,用户肯定没有办法接受的,这样就没有意义了。所以我们做了很大的工作,就是怎么样把这个延迟降下来。
但是接受用户打过来的数据有什么问题呢?就是说你这个数据的效率,其实取决于用户客户打过来的姿势。如果对应不同的下游服务的话,可能用户使用的姿势不同,如果写程序的话,那么你只是普通的连一下,然后导一下,你就会遇到很多的问题。所以姿势非常的重要。
所以我们可以看一下,数据传输有几种,一般构想的,可以保证的东西,常见的东西,你会觉得数据的导出,流量是不会有太多变化的,比如说一个用户今天是 10 MB/s,那么他明天会不会变成 100 MB/s呢?最后大家想的是 20 MB/s等等,不会想到 100 MB/s。这个变化一定吗?我觉得未必。尤其像我们作为一个 PaaS 的厂商,就没办法去说,一定要想用户今天是 10 MB/s,明天有可能是还 15,20 MB/s,但是我们要时刻准备着他是 100 MB/s打过来。
然后我们还可能会想数据的下游服务是稳定可靠的话,我们提供的相当于一个数据的变化,大数据的分析。我们提供了非常多的下游的服务,那么很多人觉得下游是非常稳定的。但是下游的这个可靠性其实是不太确定的。像之前也爆出很多的厂商,知名国外的厂商也会有这种问题。所以你很难保证数据的下游一定是稳定的。
数据传输常见的情况:
1. 导出的上游数据产量是稳定不变(变化缓慢)的
2. 导出的下游服务永远是稳定可用(链路损耗严重)
3. 导出的速度仅受限于上下游中的一方影响
单向吞吐量= 请求大小*并发数
整体吞吐量= f(拉取吞吐量,链路承载能力,推送吞吐量)
举例 :流量 20k/s = 上游 10k/s*2+下游 5k/s*2 ?网络抽风?下游响应慢? 网卡打满?内存超限?
接着大家可能最容易想到的就是数据的导出或者传输的速度,就是上游、下游的速度下线,再取最小,实际上真的是这样的吗?实际上是一般认为并发数乘以每一个请求的大小,就是实际的总量。那我们的整体请求是怎样的呢?就是上游你拉数据,然后去下游打数据的量,最后你忽略了一个过程,就是这个传输链路的承载能力的推送吞吐量。举例来说,如果我们的流量是是 20 K/s 的话,那我们上游的请求是 10K×2,两个并发,下游是 5K×4。这样真的可以吗?未必可以。因为我们会遇到像网络不稳定,下游相应慢,内存超限等等这些问题,所以其实这都是我们必须要考虑到的。
那么怎样去解决这个问题呢?我们可以想到一些比较常见的思路。
上下游解耦:拉取与推送解耦,数据预取、队列暂存、拉取与发送并行
任务分割:大任务分解成小任务,小任务水平扩展
任务标准化:每个任务承载固定的流量,流量增加则增加任务数量
提升资源利用率:调度、平衡、压榨机器性能
提供任务管理能力:运维、运营、监控
更懂下游
上下游的解耦是怎样的呢?就是拉取数据与推送解耦分开来,中间提供一个队列,这样可以暂存数据,这样就被认为这个数据的速度,其实是相对来说快的。你只要保证解欧的队列不出问题就可以了。
还会想到什么呢?如果一个用户今天 10 MB/s,明天变成 100 MB/s了,这样你原来的服务肯定扛不住的。你要把这个 100 MB/s变成十个 10MB/s,那么这个问题就可以轻松的搞定了。再者就是任务标准化,我们经常会提的服务混部、这里用一个 5 核的机器、那里用一个 10 核的机器,实际上这样对你的服务影响非常大。如果你能把你的数据,标准化起来,同时我们所有的集群都是用同样规模的机器,那么在你做这个策略的时候就可以简化很多思考,同时你可以保证,你的这个任务分割,就是把大任务化解成小任务这个事情是可靠的。然后我们还想到怎么样提升这个利用率,管理能力调度,监控运营等等,最主要我们要更懂下游服务,比如实时数据库,日志检索等等,我们最终的目标是把这个延迟降下来。
刚刚已经我们看到有导出服务,导出服务就是我们的 Xspark,Xspark 已经做了很多很多任务,最重要的是它很轻。它做了哪些呢?就是刚刚看到的数据导出,从 Xspark 里面导出,还有数据的过滤转换、精细化的调度等等。精细化是什么概念呢?你不光考虑 CPU,考虑内存,同时还要看出网卡,机器的规模等等一系列的考虑。所以它做的事情非常多,最主要是这个精细化的调度。然后我们构建了一个轻量级的分布式的 goroutine 来做这个事情的。它可以提供一个非常强的保证,保证我们的导出服务,如果下游出了问题,完全不会影响其他的服务。但是我们今天服务的重点不是在讲这个导出,而是讲我们要讲,我们怎样构建一套更好的加速器,来加速导出服务。
-加速系统的选型
logstash?
beat?
flume?
自研?
那么在构建加速系统的时候你就会想选型的问题。一开始我们最先遇到的是我们的日志检索服务,对应其中的一个插件怎么处理,常规的借用社区的解决方法,像 logstash、beat、flume 等等的工具。那我们调研下来什么概念呢?就是对应我们刚才说的这些思考,比如上下游解耦等等。我们发现像 logstash 它更多注重做客户端搜集的事情,它作为一个中间端,或者服务端它接触数据然后再打向各个服务,其实它做的并不好。而 beat 就是提供一个轻量级的搜集系统的工具。
flume 提供这样一个缓存(图 9),我们觉得 flume 比较可靠,就去尝试了一下,最大的问题就是它在不同的位置的情况下,如果你配置这么多用户就可以了。但是如果你是一个 PaaS 厂商,你提供的用户是十万个,你难道就配置十万个让它自动生成吗?这个实在太不优雅了,也不符合我们 Gopher 的体会,所以我们就去自研了。
我发现几乎所有来大会上分享的老师都要回答一个话题,那就是为什么要选 Go ?其实用 Go 来做这个事情是很自然的选择,不止再这一个模板,在大数据里面做了很多很多的组件也都是使用 Go 来写的。那么我们来对比下我们的需求,从需求出发看语言的选型。
首先上下游解耦怎么做呢?这个就是有一个 buffer 的概念,是不是可以把数据有一个接收,然后传到 buffer channel 里面,然后另外一端从 buffer channel 里面拿数据。之后任何一个水平扩展怎么办呢?肯定会想到水平扩展就是分布式了。那分布式怎么处理呢?一般都是进程级的,那协程级的呢?协程级的会不会更舒服呢?因为已经有语言帮你做这个调度。然后你要提升这个资源利用率,提升任务管理能力,你是不是就可以把这个注意力专注到任务资源分配的调度管理等等方面。
然后最重要的更懂下游怎么办呢?因为是自研,可以让下游写组件的小伙伴自写对应的服务就可以了,我们就可以把这个过程通过插件写进去。例如你是做日志检索的,那么你写一个加速你日志检索的传输插件。当然还有很多很多理由,比如 go 所有人都会说简单,易学易用。社区经过这么几年的发展,已经非常活跃了,还有它的部署迭代更简便。大家都知道 golang 编译出来就是个二进制的包,你怎么玩都行。然后它效率非常高,它很稳定性能也高,并发编程,还有我们七牛的技术栈,基本就是 golang,所以我们坚定不移的选择了 golang。
-核心模型
然后我们看一下我们要做这个事情,如果我们要自己开发这个东西它的核心模型是怎样的?首先你会想到你面对的是一个数据源。然后你要用事务的形式把数据接收进来,为什么用事务的形式呢?我们后面再讲。我们经过一个队列,很多人讲这个队列怎么做比较好。如果你真的是加速考虑的话,只有一个选择,那就是内存,否则的话,其他的性能都会遇到很大的瓶颈。然后下游 sink 可以自己写各种各样的插件,你想导出到什么服务就导出到什么服务。
关于 sink ,用插件形式的下游适配器的形式,因为没有人比下游更懂下游。就像我们老大陈超经常说,情人节给你女朋友或者老婆送个红包就行了,让她自己买是最好的。就是这个道理,你把一个球给他扔过去,别人能不能承受得住这个重量,这个不好说,还是让他自己来吧。
提到我们刚才说的,用事务的的形式做。就是如果你这个不行了,你跟你的导出服务说慢点导或者导出到别的服务。然后如果你行就直接放进去。然后同时事务也是解决分布式的问题,我们本身在调度的过程中可以开多任务,那么怎么样保证这个数据只流向一处,其实也是事务。事务可能大家可能考虑到一个问题,如果有一个锁怎么办?如果累计的数据在内存里面传递的话,它只是把数据放到这个 channel 里面,实际上这个数据传输非常快的,这个锁是非常小的。同时争抢这个锁的这些并发,如果你控制的好的话,实际上只有十几个并发,或者几十个并发在抢这个锁的话,实际上这个锁的性能非常低的。所以这个事务我们实践过来以后用起来非常舒服的。
此外还有一个问题,万一需要重启或者挂了怎么办呢?对于重启,需要提供一个策略,怎么样让这个内存的队列进入到本地磁盘?我们用一个 sink 把内存里面的数据统统的进入一个本地的磁盘队列。然后根据你恢复的时候再把这个数据恢复过来,所以就解决的数据重启的问题。
如果挂了怎么办?因为挂了我们还有上游的导出服务,在这上我们可以认为它是做了专注于自己的事情,可以给我们提供数据重播的能力。我们怎么办呢?就是数据来了以后我们只要记录最基本的元数据,如这套数据的 Offset 是从多少到多少,哪个 patition,如果它成功发送到下游就 OK,这条数据就过去了。那么有一些数据它的 offset 从开始到结束放在这里一直没有导出,那怎么办呢?它故障了,我们就调用导出服务的重播能力,进行数据的重播,这样数据也不会丢失。
在众多模块组合之后,整个框架基本上搭起来了,保证了任务的流动。但是我们还要构建一rest-api,让别人数据能导过来,让导出的数据能过来。 而且是需要构建一个任务级的 rest-api,因为面临的是 PaaS 上面百万的用户,肯定要把这个事情做成一个单一的某一个用户的级别,那我们可以用 agent 来调度这个事情,封装成 task 的概念来针对不同的用户之后再导出到 sink 或者下游不同的服务。那么这样一个单机版,看似可以分布式化的已经完成了,而且相对比较简单干净。
一个 repo 一个单独的 task,不同 repo 间 task 不共享
一个 task 包含一个 MQ,多个 sink、一个磁盘队列
根据 mongo 的配置决定 sink 的数量(与 capacity 相同)MQ 中包含 transaction 池,每次 sink,通过事务控制数据进出 MQ 保证原子性
重启的过程也以 sink 的形式,只不过发送端变为磁盘,保证了服务升级时内存数据不丢失
我们总结一下,最简单的就是对一个用户的数据对应一个 task,用户单位是 repo,task 是不共享的,分别独占了资源。一个 task 包含一个 MQ,多个sink,有一个磁盘队列可以重启等等。还可以从 mongo 里面拿到相应的配置,使用 mongo。事务来控制原子性,重启的过程也是以 sink 的形式,通过数据把下游打到磁盘,这样整个事情就完成了。
分布式的困难
维护困难,如果我的集群要添加或者减少一个 producer 节点,该怎么办?
数据分散,所有的数据都有可能经过任意 producer 节点,对应每个 producer 发往下游的数据分散,导致每个请求的可能聚集的数据量小,batchsize 小。
资源浪费,每个 producer 都要维护大量的 task,对应大量的goroutine,浪费 CPU 和内存。
负载不均,若是纯粹的随机,或者轮询,一旦遇到机器配置不同或者服务混部等情况,负载无法均衡。
不易管理,新的业务需要启动对应的 producer task,需要改变配置等,都无法操作。
那么这样单机版仿佛已经解决问题了,还有什么问题呢?如果你简单的把这个组件,我们的加速服务导出,放到很多机器上面是不是就解决问题了呢?
其实并不是,还有哪些问题呢?就是维护困难,如果我们的数据分散,怎么样控制它在不同的机器上整合,或者平衡等等,资源浪费怎么办?刚刚也说了,我们有 task,如果有用户过来创建一下或者试玩一下就再也不用了,怎么办?怎么清理?还有负载不均衡怎么办?还有管理起来怎么办?
zookeeper/etcd
自研分布式算法
最终一致性 => pull系统+版本戳
所以我们面临着分布式服务里面一个常见的问题,就是怎么把数据传递到每一个节点,也就是分布式一致性的问题。简单来说就是这个时候你怎么让数据通知到每一个节点,让一个节点都知道你做什么,你要解决什么样的问题。其实现在社区发展这么多年,其实一致性问题解决起来也相对来说有比较成熟的方案。可能会选择 etcd/zookeeper 去解决这个强一致的问题,还有一些自研的算法。
那么要不要自研呢?我们想了一下,如果我们做这样一个加速服务,真的需要强一致吗?如果一个数据过来我们导出,我们跟他说你导出到这里不太平衡,机器的负载不太好,你应该用另外一个机器上使用另外一台的机器的加速服务,这个消息的同步真的需要那么实时吗?其实我们权衡下来并不是的,我们只要最终这个消息发送过来三五分钟以后,能够把这个事情达到一个非常协调的状态,那么事情就解决了。他只要把这个数据让我最终感知,达到最终的一致,这个事情就可以。所以我们就去拉这个源数据。加上版本戳保证数据的最终一致性。
说到这个最终一致性,七牛自己有一套很好的二级缓存框架两保证这个一致性。这个是怎么样的呢?首先你的源数据肯定要一个数据库做存储。想要用的时候,如果源数据直接拉的话肯定要把它击穿了。每一个请求过来都去访问,几乎所有的数据库都扛不住这种压力。这时候肯定会想到就是缓存。缓存是怎样的呢?首先一个数据过来,同步到 mongo 的数据库里面,然后做两层缓存,一层就是本地的,去本地拿,发现本地没有。那么再去二级缓存的服务器拿。然后拿了发现也没有,这时候再去 mongo 里面拿。之后再把数据存在缓存、本地各一份,然后根据你的需要设置过期时间,这样你的数据就缓冲的很好,相当于你对这个数据库本身的请求每两分钟才几百次,上千次这样,因为大部分的请求数据已经被缓存起来了。
这样做还有什么好处?比如说数据库挂了,我们还有二级缓存,这样二级缓存挂了,我们还有本地缓存,这样就保证了如果主服务也挂了,那么我其他的服务还可以继续的工作。相当我其实和 master 这个东西是解耦的,我不会受到 matser 挂掉的影响,如果数据有改变的话,已经通知到其他的组件。
所以我们看一下我们最终要维护缓存里的指标是什么?
1、保证状态。总不能说启动了以后不能关,首先要保证这个启动能停止
2、要有分配的能力,自动分配也好,手动分配也好,要有一个分配的过程
3、要保证批量发送,有能力去调整发送发小
4、并发数,你要开多少加速的服务,每一个发到下游的请求有多少的并发数
5、队列缓存容量有多大,超过多少后会反压
6、消息的接受有多少的并发
7、如果要手动指定机器的话,就可以指定一下
然后我们考虑到如果这个任务一会在这个机器,一会在那个机器的话,其实是对链路是浪费的,首先网卡就非常浪费,然后启停等等,调度的过程是非常浪费的。所以我们刚刚已经说了,我们是基于任务的标准化,每个任务其实是固定大小的规模。所以如果每个任务都已经是固定大小的规模,我们可以稳定把它分配到某些机器上。
基于一个最简单的,首先多少任务已经知道了,排一个序,然后根据它需要的数量我们给它足够的机器的分配。可能还会遇到一些其他的问题,比如机器的配置不太均衡,当然最好还是均衡一点,但是总有一些难免的情况。那么你可以通过手动指定的情况把某一个任务绑定到指定的机器上面,然后大概的配置,调度算法就完成了。非常简单,稳定,我们用平衡的任务标准化的机制解决这个问题。
手动与自动相结合
应对突发流量
防止大任务的抖动
弥补机器配置带来的差异化
admin 能力
此外我们还会把这个机器手动绑定的能力加上,这个绑定的能力大概是一个怎样的概念呢?首先你可以手动和自动相结合,我们刚开始写代码的时候,盲目的相信自动化的过程,觉得我写一个厉害的算法什么都是自动的,只要这个算法够厉害就没问题。但是真正线上的服务总归有出人意料的事情发生,你肯定要加上手动的能力。那手动能力有什么好处呢?就是应对突发流量。万一真的来不及扩容怎么办?可以临时调整,非常灵活,防止大任务的抖动。
比如我们就很多的大客户,他们的数据量非常巨大,我们可以给他指定一些集群和机器,把他们绑定在上面。这样相对来说这个大客户的数据是比较稳定的,这个任务就不会抖动,不会影响别的资源,不会侵蚀小客户。所以小客户和大客户的体验都是非常好的。然后还有一个弥补机器配饰带来的差异化,你可以有一些手动化的机制。
我们还会提供一些 API,来做什么呢?就是获取监控信息-任务数量、成功失败率、lag等等指标,还有提供一些管理接口,看一下历史的问题,然后我们看一下问题就可以通过这些来解决。
数据到达时才创建,按需创建。
producer 实例中某个 task 如果长时间没有数据过来,则销毁掉对应的 task,释放资源。
上述提到如果有用户来创建,创建了一会觉得不太好用怎么办?就不用了,那不用这个资源肯定要浪费了,那资源怎么回收呢?其实有很多的小客户他们是试用的性质,这时候我们提供一些免费的额度,这样他很快就用完了,很快用完了他们不想付费了,而且他们也没有真正的的有需要。那么他这个数据资源就会占在那边,很多 task 应该会碰到这样的问题,就是资源回收的问题。那资源回收怎么解决呢?最简单是基于对过去的统计进行一个预测。比如现在一直是在打数据,他突然有一会没有打数据了,可能是一天,也可能是多少小时,那么你界定一下,这个数据资源可以释放掉,然后快速的起停。
通过 protobuf 协议与上游通信
不重复解析数据,去除 json 等解析的 cpu 消耗
我们使用了序列化协议,在这块会用到 protobuf,这个效果非常好。对比一下,如果你用 json 序列化协议, cpu 的消耗和 protobuf 大概有十倍差距。如果你能用 protobuf 的话,尽量用这个,这个带来的体验是非常好的。
向下游写入失败,则休眠 1s 再重试,依然失败则休眠时间增加,一直到 10s 为止
若写入成功,则失败的休眠时间重置为 1s
有效减少下游压力
最后很重要的一点,变长失败等待的时间。比如你这一次访问出问题,那么下一次你再去访问,下下游还是挂了。那我给你一秒,如果你还是挂了,那我就等再一秒,因为我给你一个等待的机会。因为我们经常发现像数据打服务打挂了,很多时候一直在打,会有数据堆积,成堆的数据打过来的话,这样对下游会造成一个崩败式的过程。所以我们给他等待的机会,给他休息一秒,再休息三秒,等到一个预值十秒,如果它恢复了,我们再回到正常的过程。这样可以有效减少下游的压力,让下游快速的恢复,然后我们把这个数据快速的传输过去。
无数据重复写入、无数据丢失问题
写入更平滑,去除毛刺
更高的机器资源利用率
更懂下游
没有Lag!
所以最终这个加速服务构建了哪些成果呢?首先最重要就是没有数据重复的问题,也没有数据丢失的问题。可以让数据的写入更加平滑,就是去除毛刺。这个毛刺是什么概念呢?就是玩过大数据的,或者有一定数据量的朋友都会感受到,有时候你不同的机器,或者不同的组件,不同的实例去打的时候,他们这个时间是不一样的。因为根据你的请求,这个可能十秒返回,那个可能一秒就返回了,这个就是毛刺,你会认为所有的实例都返回的,你才认为这个请求OK。那我们做的这个导出的加速服务器,就是把这个毛刺解决了,这样整个上下游之间的数据传输的效率非常平滑,一直在一个非常高的水平。同时我们提高的机器资源的利用率。也更懂下游,因为我们以插件的模式编写整个下游的服务,最终达到的效果是没有延迟。
有疑问加站长微信联系(非本文作者)