潘卫华 / 唯品会基础架构部架构师,唯品会Dragonfly日志系统负责人。
对 Elasticsearch 和大数据流式处理有丰富经验,对Golang及其程序的性能优化也有较多研究。
前言
大家下午好!我是来自唯品会基础架构部的潘卫华。今天我们来跟大家一起看看在大数据领域里面,Golang的应用。我们知道在大数据领域里,Java 和 Scala 语言基本是处于统治地位的,主要是因为像 Hadoop 以及基于 Hadoop 的一些工具栈,比如 HBase/Hive/Spark/Flink 等等,这些都是基于 Java 或者 Scala开发,他们提供 的api 也是主要给 Java 系的语言来使用的。不过实际在项目过程中,我认为还是在一些地方Golang是可以有用武之地的。
我将从以下几个方面作分享:
一、唯品会日志系统及本人简介
二、Access Log存储需求分析
三、Access Log Writer设计和实现
四、Golang应用在大数据领域的思考
一、唯品会日志系统及本人简介
唯品会Dragonfly日志系统简介
我在唯品会负责公司日志系统,叫做 Dragonfiy。大家可能听过最近阿里有一个开源工具也叫 Dragonfly,但两者并不相同,阿里开源的Dragonfly是做容器镜像分发的,我们这个 Dragonfiy 是日志平台。我们这个项目从2015年就开始搭建,到现在为止已经接入全公司几乎全部应用的日志,还包括一共2000多个应用的访问日志,Dragonfly提供一些日志的存储和查询、统计包括告警这些功能。
整个系统是最初基于 ELK 来打造的,之后也加入我们自研的一些模块,包括告警。我们可以让用户自定义设计告警规则,比如说有哪些关键字,需要达到多少条数,就可以触发告警,我们也可以智能的去分析日志数据,发现异常时可以自己告警,不需要用户去配置。现在这个日志系统是整个唯品会监控系统里面非常重要的组成部分,每天会帮助我们预防或及早发现很多问题。
Golang 在 Dragonfly 方面的应用
这个系统数据量非常大,平日每天接入应用日志数 600 亿条左右,压缩完之后,每天日志两大概有 40 TB。另外访问日志数每天超过 1400 亿条。下面介绍在我们日志系统里面Golang主要应用。
采集客户端程序
第一,采集客户端程序。Elastic公司开源的beats主要用来做日志的采集上传,我们的日志采集工具是基于 beats 做二次开发,在 beats 的基础上我们做的日志的解析、统计、配置下发,包括在客户端里面配置要采集什么样的文件,是否要做限流或者采样,还有解析等的一些规则,这些可以通过管理界面配置,动态下发,所以整个配置管理非常方便。 然后我们还支持容器日志采集。
另外在 beats 的基础上也做了很多性能的优化。我们原来用的Logstash,大家可能之前听说过,我们性能比Logstash高了4倍以上,而且内存比Logstash少很多,Logstash我们原来用1G的内存来采集,有的情况还不够,而我们开发的新日志采集客户端,内存使用才几十兆。
最初我们系统里面用Golang的项目就是这个,经过这个项目之后我们发现Go语言性能非常好,而且开发起来效率非常高,后面我们尽可能把Go语言尽可能用在场景语言去。
后来Golang在Dragonfly的第二大应用,就是我们今天介绍的,Access Log存储系统。
二、Access Log存储需求分析
Access log存储和查询
Access log 每个互联网公司都会有,通常数据量非常大。像我们公司原来是用Access log 采集上来做离线处理生成一些报表,但是对于用户的Ad-hoc查询来说,非常不灵活,通过Hive查询,每次查询非常慢,如果每次查某个应用 Access log,可能超过十几分钟才能完成一次查询,如果有些语句或者有些字段输入有问题,条件不太好,出来并不是客户想要的结果,他又反复来查,但这时这个效率就非常低。
我们公司除了 Nginx Access Log ,还有其它组件的Access log,这些也没有接入到Hive里面,开发也没有办法去用,所以我们想打造另外一个Access log存储的系统,主要对开发的同事能够快速提供Access log下载和查询的系统,这是一个背景。
Access log 的写入和存储有什么样的特征,我们先要确定,再看怎么设计这个系统。
首先,Access log 的数据非常多,比如说我们公司每天有1000多亿条数据,平均下来每秒钟会有各种组件加起来的访问量都有几百万QPS,意味着每秒钟会有几百万条 Access log,这些如果要做存储,就要思考要做这么大流量写入的问题。像我们公司有几个组件,一个是 Nginx,一个是 janus 服务器网关,还有Osp——微服务组件。
第二个特点就是它们的使用频率并不是特别高。大家可能也会知道,工信部要求访问日志需要保存半年以上,这种更多是一个保存是为了监管的需求。另外就是报表,还有就是其它除了问题开发者才需要访问。很多访问日志,几个月下来有可能完全没有人来访问,也是有可能的。
所以访问频率很低,针对这种特点,没有必要花很多硬件或者其它复杂设计上去做一个非常完美的系统,我们只需要提供低成本然后高性能,主要是解决存储问题,其次提供一个简单的查询方案。
第三个特点是实时性不需要特别高,通常几分钟的ok的。
所以针对这几点需求,我们设计了大概的方案是这样的,就是我们把数据压缩保存到HDFS里面,然后提供一个简单的查询结果。为什么用到HDFS?其实也不一定,像其它分布式存储系统,比如GlusterFS之类,只要你的容量够其实也可以的。但存储的HDFS还保留一个可能,以后数据还会用一些大数据工具去分析,所以我们用HDFS。其实后面要解决一些问题,就算用其它分布式系统也是要解决的。
总体解决方案
这么大量的数据,我们肯定要压缩才能保存进去的,所以我们要选择压缩的方式。如果要满足一个比较高的实质性需求,我们只能选择流式的压缩,我们不可能等一个文件一天写完之后才导进去一次,这样用户也是没有办法去查的。我们选的是 gzip 压缩方式,因为 gzip 是支持拼接的,如果写完一块压缩写到一个文件里面去,然后后面我们再压缩另外一段数据,拼接到后面,这种数据是可以完整把它解压出来的,所以它这个 gzip 非常适合流式写入。
gzip 还有一个特点是解压相对比较方便一点,用户下载回去以后,这些文件有些 gzip 这样的工具就可以解压开,如果在 zless/zcat 等方式更方便了。但它本身是十几年前的一种压缩方法,虽然它的压缩率很高,但是它的压缩性能比起最近几年一些新的算法,比如snappy,lz4等等,它的压缩性能和解压性能都是没有那么高的。但主要考虑到用户使用方便,我们还是选用 gzip 流式压缩的方案。
还有一个就是文件怎么样命名给用户提供一个下载,这个主要考虑到用户怎么样使用,我们在命名方式就是按照类型,前面说的方式是什么样的应用存过来的,主机名、日期、小时、滚动序号。这里主要考虑到用户如果要下载,如果你有一个文件非常大,超过100兆下载是有困难的,所以我们是按照100兆来做一个滚动,这是一个文件命名的规则。
技术难点
在这个方案里面这里会遇到什么样的技术难点?首先一个数据量比较大,按照我们刚才命名规则来说,我们会遇到这样一个问题,每小时会写入几万以上的文件——有很多 Access log 的类型,然后有很多应用,然后有很多主机,我们也有上万台主机,各种 Access log 加起来总共会有2万以上的文件,我们要支持他能够流式写入到文件系统里面,这个会对文件系统 IO 会造成很大的压力,写入次数会非常大。这里就是我前面说的选哪种文件系统其实没有很大的关系,你都需要解决高并发量写入的问题,这是主要的技术难点。
那我们怎么样去解决,大家可以思考一下。有这里有一个套路,这么大的流量,我们肯定不会让每个数据都写入一次,最常用的套路就是缓存,缓存下来过一段时间再刷一次盘。
所以解决这个性能的一个方案就是缓存,缓存又会碰到另外一个问题,缓存又占用内存,比如说我们是每小时收一个文件,如果要缓存一个小时数据再写入,这个要非常多内存才能支持,所以整个设计方案里面要考虑到内存的占用。到底缓存多少合适,怎么样缓存比较合适。最初我们通过Spark实现这个方案,这样的实现方案最直接,Spark 从Kafka捞数据,然后缓存一段时间,一分钟,批量读一次,批量压缩。但是这个我们发现对内存占用也是消耗太大了,我们去做其中一种 Access log 的写入,最开始用了十台服务器才能支持,如果支持那么多中 Access log,可能要几十台机器,这个成本有点高,所以后来我们是考虑一定要用更好的方案去实现。
三、Access Log Writer设计和实现
语言选择
看一下这个设计方案,这个是引用大概的设计思路,从 kafka 里消费数据,解析出来之后会生成文件路径,通过两级的缓存,最后再写入到 HDFS 里面。
这个方案最初我们也考虑到用什么语言去设计。用 Java 还是用 Go?这种比较其实需要把它拆分几个方面来考虑,考虑不同的权重对我们影响比较大。其中一个方向就是考虑开源框架和开源库的支持,大数据前面已经说了,Java 是得到非常多支持,有很多库和框架来做,所以其实Java在这一块完全是领先的,但是具体到我们刚才涉及到的方案来说,我们其实只需要用两种开源库,一个是读Kafka,一种是写入到HDFS,这两个在 Go 里面其实都是有的,所以对这个框架来说两个语言都是可以的。前提是不需要分布式协同处理,只需要每台机能够单独处理,可以水平拓展,我们其实用go就可以实现。
从考虑性能来说,本来 Java 性能也不错,但是 Java 的一个很大问题,它的 GC 调优非常困难,我们整个日志系统里面原来大部分都是 Java 和 scala 做的,在我们以前经验来看,处理很大吞吐量情况下,Java 的 GC 会是一个非常大的问题,我们之前会花很多时间来做 GC 的调优。虽然最终 Java 和 Golang 可能会差不多,但是考虑到调优的复杂度来说,对内存使用的复杂度来说,其实 Golang 还是更优一点。在jdk9之前,Java对每个文本字符,需要两个byte来保存。所以它内存用 Go 来说,其实有比较大的优势。
模块设计
刚才说过了主要是两级缓存、多组协程池,每个模块都是多组 Goroutine。
模块一、consumer
下面再仔细深入一下,第一个模块 consumer,这块主要是消费 Kafka 的数据。日志是在客户端采集完之后上传到 Kafka 里面去的,就是需要从 Kafka 把它消费出来。
这里我们用到一个 sarama 读Kafka的库,读完kafka数据之后,需要把数据塞到 parser 解析,这里会根据读出来消息的partition id 到不同的 parser 里面去,这里有个目的是保证最终写入到文件里面的 Access log 的顺序性。我们在客户端,它是以什么样的顺序落盘的,我们最终提供给用户 Access log 的文件也要保证顺序性。这里需要做两件事情:
第一、在采集的时候就需要用 hostname 作为 partition,如果大家有接触过Kafka就知道,它有个 key 的配置,就是我以什么 key 来做Hash,然后决定写入到哪个 partition 里面去。如果以 hostname 作为 partition,我们就能保证同一台主机文件是落入到里面的,它在同一个 partition 里面处理是顺序处理,所以我们就能保证他的顺序性。
第二、在 Kafka上面保证顺序性之后,在这个地方处理的时候也要保证顺序性,就让同一个 partition 过来的数据也在同一个 parser 里面去处理,然后一直留到后面去。
模块二、parser
parser 这部分比较简单一点,工作模式就类似于 Nginx 里面的 worker process,做无状态的处理,它把日志解析完之后,提取了前面说那几个字段:应用名、主机名、时间戳的解析等等,那些解析完之后就成成一个文件路径,丢到后面不同的 Compressor 里面去。
模块三、Compressor
Compressor 是我们两级缓存的第一级,这里是做一个文件的批量压缩,同一个文件每超过1000行的日志或者等到2分钟之后,就做一个压缩,生成压缩块往后丢。为什么要批量压缩?这个是为了提高压缩效率,如果每条日志过来压缩也是可以的,但是这个压缩率大家知道没有大批量数据一起压缩的压缩率高的,所以第一级缓存主要提高压缩率。
模块四、writer
然后是 writer 的模块,这里就是第二级缓存,每超过5MB或者每10分钟的时候,我们就往 HDFS 写入一次,通过这个缓存可以减少写 HDFS 的写入次数。另外它还要处理文件滚动,文件每达到100兆可能就要写入一次,如果写入失败我们叫做重试等等。
刚才介绍完整个设计的思路,整个代码量不是太多,如果去掉什么参数处理,监控API之类的,核心代码不超过1000行。
优化
后面讲一下优化的思路,怎么样达到非常高的性能。我们做Go语言开发,大家如果做过优化就知道,通常有两个方面:
第一,CPU的分析和优化。这个主要用火焰图的工具。二是内存使用的优化。
通过火焰图的分析我们发现有几个地方是使用CPU的热点,这个需要我们优化的,其中一个是压缩的部分,我们找了一下,其实刚才看到整个数据的流程里面压缩那块占用最大一块CPU的时间,所以压缩库这块我们查了一下,一个有github.com/klauspost/compress/gzip这么一个压缩库,比原来Go自带的模块来说性能提高了20%。其实Go语言不少自带库的性能都不怎么样,比如说json序列化就非常差,一般会尽量用easyjson等库来替代。这个压缩库也是。
第二,这里给大家介绍一下 Kafka 库的优化,这并不是说库本身有什么问题。这里给大家介绍一下, slice growup,这是 Go 语言里面经常碰到对性能影响比较大的地方。像 Sarama 库里面也是,它会申请一块内存做一个消息的解压。kafka 里面他会批量压缩几百条消息,然后丢到 Kafka 里面,去消费的时候也需要把这块东西解压出来再处理的,Sarama这个库是耗内存的,需要把压缩的数据拉过来之后解压放到 buffer 里面,刚开始它不知道 buffer 需要多大,所以一开始是很小的,所以一边解压一边丢过去,发现 slice 不够大的时候,就申请新的空间,把旧数据copy过去,这块是非常消耗性能的,如果调用数特别多。
这是大家经常看到的一个地方,看火焰图或者看 CPU 分析里面也可以看到,通常解决方案就是预分配一个大小给buffer对象预分配一个大小,最关键我们怎么预测到这个 buffer 大小多大,避免这个 grow 或者减少 grow 的情况,在这个例子里面,我们设计了滑动平均值的变量,这个滑动平均值我们是用来计算压缩率的平均,就是滑动平均的情况,根据历史压缩滑动平均值然后乘上现在进来压缩块的大小,我们可以预测出来解压后有多大,经过这么一个优化,我们对整个程序其实也提高10%的性能,也是非常不错的。
第三,内存管理,我们整个模块里面用到非常多内存,无论在第一级里面还是第二级,如果没有一个比较好的内存管理,这个内存块用完就丢,也会造成一些问题。在这里也用到一个 Free list 去缓存我们这个内存块。
前面也提到golang官网里这个 effective go的文章,Free list 其实是 effective go 里面例子,也是教大家怎么样有效管理内存。我觉得这个非常好用,所以我们用在这个模块里面,是一个简洁的方案。首先定义了一个固定大小的 channel,叫做 Free list,然后当我们需要新的 buffer 的时候,如果调 getBuffer 这个函数,它先看 Free list 里面有没有多余对象,已经放到这个 chan 里面,有直接在 chan 弹一个出来,我们就得到一个 Buffer,如果没有就创建一个新的 Buffer 对象,getBuffer 的过程,我们再结合 returnBuffer 的过程,returnBuffer 我们用完这个 Buffer 的时候,我们就把这个 Buffer 塞到 Free list 里面,塞进去就是里面如果 get 的时候,发现这个 chan 里面有,就可以直接拿出来,这个 Buffer 就可以反复被使用,如果这个 Free list 里面已经满的时候,我们可以丢掉一些,因为有时候正在使用已经占一大批,同时有很多返回 Free list,chan 可能已经满了,这时候就可以把 chan 丢弃就可以了,大家可以看到 Free list 管理内存池一个东西,是非常简练的实现。
各位有多少在学习golang的时候见过这个内存管理的方法的?我建议大家看一下官方文档,无论属于什么语言,最好的教材就是官方文档,除了语法外还有很多技巧, effective go 里面有非常好的用法。
最终效果,我们这个模块最后能够控制通过 HDFS 写入频率在 50QPS 左右,对每小时2万个文件,如果不停有消息写入,HDFS 会非常高,对这个情况已经优化了很多,50QPS 对于 HDFS 来说一点压力都没有,对于其它文件系统来说也一点压力都没有。在我们公司常用的服务器上面,我们处理能力能够达到每秒达到150万条日志,我们用128GB的内存,实际上内存使用在64G以下,我们会预留一些 Buffer 来说,如果有延迟,可以用更多数据缓存。在这次双十一大促的时候,我们日志峰值达到每秒钟1200万条日志,我们只用8台服务器就可以支撑下来,性能非常高,跑起来非常稳定。
除了写入之外,我们还提供了简单查询,也是用go来做的,这块主要考虑支持用户以前传统使用 Access log 的习惯,习惯做 AWK 等等命令这些拼接起来处理,可以算一下在某一段时间最大的响应时间或者90分位之类的统计,通过这种计算都可以做的。
总结
总结一下我们整个设计里面重点:
第一,gzip 流式压缩是整个设计的前提;
第二,两级缓存,第一级缓存里面减少缓存的大小,前面说Spark对内存占用很多,还有没有压缩,或者说压缩比例很小,通过一级缓存,我们缓存里面可以放更多的日志。通过第二级缓存我们可以降低HDFS写入次数;
第三,我们通过Golang保证整个系统性能非常高,对资源占用也有保障。
我们回头再看一下,通过这么一个项目,大家想象一下 Golang 在大数据里面到底有多大的使用前景?
Golang应用在大数据领域的思考
讨论这个问题,我们再对比一下 Golang 和Java,Java 开源框架和开源库的支持是非常好的,可以打5星,Golang 在这方面还是比较缺失。总体来说如果要用多很多开源方向库,Java 这块还是优势,另外还有性能前面对比过了,考虑对内存使用来说,Golang 还是稍微好一点。开发效率,Golang 比 Java 要强很多,还有部署复杂度大家非常清楚,Golang部署非常方便。
从这里引申开来,我认为Go语言在大数据领域确实有不适应的场景,特别是需要用在 MapReduce 等分布式计算模型。这里就包括如果在多台机器上面处理数据,最终需要聚合起来,用 reduce 计算的情况,用 go 是因为没有这种框架的,如果要做完全自己实现是非常复杂的逻辑。所以如果用 reduce 的处理这种 Golang 是不适合,意味着大部分离线计算,现在很多离线计算是用 MapReduce 或者用 Spark 上面类似于 MapReduce 这样的计算框架来做的,如果没有这个框架是做不了这个计算的。在哪些地方可以适用,最大场景就是ETL,这个ETL作用就是从拉取原数据做数据的转换,然后再存储到数据仓库里面过程,这个大数据计算里面非常重要的环节,其实刚才我们介绍了的Access Log存储就是这么一个过程。
中间的转化主要是包括这么目的,一个是数据的清洗,有没有无效的数据要把它清洗掉,还有一个数据的修复,比如说有些字段本来是正值,忽然传来负值真么办,这时候应该尝试修复一下,而不是把它丢掉,还有一些数据格式的转化,按照你数据的格式把它转化一下再存储到数据仓库里面,这是ETL的过程。
其实前面几年大家都追求大数据,希望数据越多约好,现在大家开始注重数据质量,存储到大数据里面的数据质量,最重要大数据处理的结果是不是有非常大的影响,所以在ETL的过程,我认为 Golang 由于前面性能的优势,在这块是可以施展拳脚的。引申开就是分布式协同计算,不需要reduce计算的流式处理。还有就是中小规模数据量,针对ETL,用 Go 没有分布式框架来做,可能麻烦一点,如果涉及到需要很大的集群,有几十台机器,上百台机器,这时候就需要加入到调度框架上面做这些事情,因为在 Java 已经有成熟的框架支持,所以会简单一些,在 GO 上,在几台服务器规模的实践上其实是可以做的,这些是我的思考,有些思路希望跟大家分享一下。
我的分享就这么多,大家有什么问题。
Q&A
问:有两个问题想问一下,第一, Access log 该怎么使用,Access log 对报表统计或者什么计算之类的,刚才听到使用API,这是你们做二次开发处理用户的命令吗?
潘卫华:第一个问题你说Access log该怎么使用,一个场景是做报表,我们公司是有Hive的系统来做的,当然以后我们这边也去考虑,也可以提供报表的功能。另外主要服务于开发者,关于他发现问题,需要查某个来源IP,他调用了什么URL,会查这个记录的时候,必须访问到Access log做快速查询,也就是所谓的Ad-hoc查询。
问:第二个问题,我看到你们四个阶段,从消费、解析、压缩到写入,这四个阶段,消息会丢失什么场景?
潘卫华:在写HDFS的时候是最有可能发生失败的地方,我们会进行重试,超过一定次数之后如果还是失败,我们还是会落盘到本地,通过定时任务把处理失败的数据会再重新写到HDFS里面去。
问:刚才只听到只是Buffer重试,你前面好几个步骤,比如说服务端重启这些场景。
潘卫华:这些就要考虑到用户到底对数据的可靠性要求有多高,或者说不同的应用场景有不同的设计方法。如果数据要求非常高,比如说交易数据,可能就要单独做一个 Kafka 消费模型,最终写入到 HDFS,才能确定这个数据写入成功了,这种非常高的可靠性要求,对性能也会损失很多。对于日志查询这种应用,我们认为它的场景并不需要那么高的可靠性要求,所以做得简单一点。当真的出现丢失问题的时候,我们有需要的话还是可以从kafka再重新拉取数据补回。
问:讲师您好,请教一下,目前 Access log 是运用在公司包括各个应用服务的产品吗?比如各个业务做应用,它的日志收集是会用到 Access log 去做收集吗?
潘卫华:主要是 Access log 或者有前面所说几个特征的日志场景,我们都可以帮他收集,我们公司都这几种,主要是外网的网端,内网我们有一些网关,可以作用防火墙或者流量的分发,还有微服务组件会有 Access log。
问:其实格式大家已经统一了?
潘卫华:对,这几种组件的日志都有各自统一格式的。
问:如果业务性开发应用,这些理论上没有办法,除非你们提供了一个结构格式给他们。
潘卫华:对于Nginx Access Log,主要需要运维去做推动。如果不统一,我们也可以在写过很多规则去适配它来做,但最好方法就是统一。
问:当时在选型存储的时候,你这边选可是 HDFS,其实是不是可以用时序数据库,你要保存半年或者三个月这个要做什么考虑?
潘卫华:首先时序数据库TSDB,适合用来放指标类的东西,主要用来看趋势线的,日志这种放进去是不合适的。
问:老师您好,我想问一下刚刚说收集那一块,它的性能有没有对比过像 Flume这种,他们两种对比你觉得优势在哪里?
潘卫华:不同采集端的性能我们是对比过,Flume性能还不错,但是前面说 Flume 是基于Java,对内存的消耗比较大,会占用预期的内存,另外是二次开发复杂度,我们用GO二次开发比他那边开发基于Flume开发还要快。
问:对于完备性有没有对比过,收集会不会丢失,你把它抓取到了,会不会存在丢失的情况?
潘卫华:首先它会有记录Offset,它读了多少,它会记录在文件里面,整个开发处理完之后会发给 Kafka,整个过程中间没有发现丢失。
Gopher China 2019 早鸟票开售中〜,点击下方“阅读原文”即可报名!
有疑问加站长微信联系(非本文作者)