【Go】使用压缩文件优化io (二)

thinkeridea · · 576 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

原文链接: https://blog.thinkeridea.com/201907/go/compress_file_io_optimization2.html [上一篇文章《使用压缩文件优化io (一)》](<https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html>)中记录了日志备份 io 优化方案,使用文件流数据压缩方案优化 io 性能,效果十分显著。这篇文章记录数据分析前置清洗、格式化数据的 io 优化方案,我们有一台专用的日志前置处理服务器,所有业务日志通过这台机器从 OSS 拉取回来清洗、格式化,最后进入到数据仓储中便于后续的分析。 随着业务扩展这台服务器压力越来越大,高峰时数据延迟越来越厉害,早期也是使用 Python 脚本 + awk 以及一些 shell 命令完成相关工作,在数据集不是很大的时候这种方案很好,效率也很高,随着数据集变大,发现服务器负载很高,经过分析是还是 io 阻塞,依旧采用对数据流进行处理的方案优化io,以下记录优化的过程。 ## 背景介绍 > 服务器配置:4 核 8G; 磁盘:1T 分析前置服务会根据业务不同分为十分钟、一小时两个阶段拉取分析日志,每隔一个阶段会去 OSS 拉取日志回到服务器进行处理,处理过程因 io 阻塞,导致 CPU 和 load 异常高,且处理效率严重下降,这次优化主要就是降低 io 阻塞,提升 CPU 利用率 (处理业务逻辑而不是等待 io) 和处理效率。 后文中会详细描述优化前后的方案,并用 go 编写测试,使用一台 2 核4G的服务器进行测试,测试数据集大小为: - 文件数量:432个 - 压缩文件:17G - 解压后文件:63G - 压缩方案:lzo - Goroutine 数量:20 ## 优化前 优化前日志处理流程: - 获取待处理文件列表 - 拉取 OSS 日志到本地磁盘 (压缩文件) - 解压缩日志文件 - 读取日志数据 - 业务处理…… - 导入到数据仓储中 导致 io 阻塞的部分主要是: 拉取 OSS 日志、解压缩日志文件及读取日志数据,优化也主要从这三块着手。 这里有一段公共的日志读取方法,该方法接收一个 `io.Reader`, 并按行读取日志,并简单切分日志字段,并没有实质的处理日志数据,后面的优化方案也将使用这个方法读取日志。 ```go package main import ( "bufio" "bytes" "io" "github.com/thinkeridea/go-extend/exbytes" ) func Read(r io.Reader) { rawBuffer := make([]byte, 512) buf := bufio.NewReader(r) for { line, ok, err := readLine(buf, rawBuffer) if err == io.EOF { return } if err != nil { panic(nil) } if ok { rawBuffer = line } c := bytes.Count(line, []byte{'\x01'}) if c != 65 { panic("无效的行") } } } func readLine(r *bufio.Reader, rawBuffer []byte) ([]byte, bool, error) { var ok bool line, err := r.ReadSlice('\n') if (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF { rawBuffer = append(rawBuffer[:0], line...) for (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF { line, err = r.ReadSlice('\n') rawBuffer = append(rawBuffer, line...) } line = rawBuffer ok = true } if len(line) > 0 && err == io.EOF { err = nil } return line, ok, err } ``` 日志按 `\r\r\n` 分隔行,使用 `\x01` 切分字段,读取方法使用 `bufio.ReadSlice` 方法,避免内存分配,且当 `bufio` 缓冲区满之后使用 `rwaBuffer` 作为本地可扩展缓冲,每次扩展之后会保留最大的扩展空间,因为业务日志每行大小差不多,这样可以极大的减少内存分配,效率是 `bufio.ReadLine` 方法的好几倍。 ```go package main import ( "fmt" "os" "os/exec" "path/filepath" "strings" "sync" "time" ".../pkg/aliyun_oss" // 虚假的包 ) func main() { var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装 files := list() // 这是一个虚构的方法,用来获取待处理的文件列表 fmt.Printf("待处理文件数量:%d\n", len(files)) start := time.Now() defer func(t time.Time) { fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds()) }(start) // 下载日志文件 n := 20 var wg sync.WaitGroup c := make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { f, ok := <-c if !ok { return } if _, err := os.Stat(f); err == nil { return } else if !os.IsNotExist(err) { panic(err) } dir := filepath.Dir(f) err := os.MkdirAll(dir, 0755) if err != nil { panic(err) } err = oss.GetObjectToFile(f, f) if err != nil { panic(err) } } }() } for _, f := range files { c <- f } close(c) wg.Wait() fmt.Printf("下载文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) // 解压日志文件 start = time.Now() shell := exec.Command("/bin/bash", "-c", "lzop -df logs/*/*/*/*/*/*.lzo") err := shell.Run() if err != nil { panic(err) } fmt.Printf("解压文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) // 读取日志文件 start = time.Now() c = make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { file, ok := <-c if !ok { return } f, err := os.Open(file) if err != nil { panic(err) } Read(f) f.Close() } }() } for _, f := range files { c <- strings.TrimRight(f, ".lzo") } close(c) wg.Wait() fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) } ``` 运行程序输出如下: ```go 待处理文件数量:432 下载文件耗时:303.562865 解压文件耗时:611.236232 读取文件耗时:460.371245 共耗时:1375.187261 ``` 通过 `iostat -m -x 5 10000` 分析各个阶段结果如下: - 下载时: ```shell avg-cpu: %user %nice %system %iowait %steal %idle 7.85 0.00 16.44 11.24 0.00 64.48 Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 80.40 7.80 8.98 0.04 209.36 0.40 4.57 4.64 3.77 0.50 4.44 vdb 1.40 761.20 247.60 264.00 14.70 60.92 302.72 9.17 17.92 10.36 25.00 0.52 26.52 ``` - 解压时: ```shell avg-cpu: %user %nice %system %iowait %steal %idle 8.54 0.00 8.33 68.39 0.00 14.74 Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 1.20 3.40 11.80 0.01 0.05 8.95 0.30 20.03 0.41 25.68 0.55 0.84 vdb 0.00 22037.80 107.80 243.20 26.45 107.01 778.71 83.52 236.68 74.31 308.65 2.52 88.54 ``` - 读取时: ```shell avg-cpu: %user %nice %system %iowait %steal %idle 2.74 0.00 5.07 92.19 0.00 0.00 Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 2.40 3.80 23.60 0.01 0.14 11.85 0.12 4.48 1.95 4.89 0.33 0.90 vdb 1.80 4.60 347.20 6.20 139.97 0.08 811.60 126.62 358.04 360.79 203.48 2.83 100.00 ``` 通过 `iostat` 结果可以看出,在解压和读取日志时 `io` 阻塞比较严重,且运行时间较长,下载时 `io` 阻塞也存在,但还可以接受,通过下面两个方案逐渐消除掉 `io`。 ## 优化方案一 优化前的方案反应出在解压和读取日志时 `io` 阻塞比较严重,那么是否可以通过读取 `lzo` 压缩文件,以此来消除解压缩日志耗时太大、`io` 太高的问题呢?并且读取 `lzo` 压缩文件远比解压后文件小,来降低读取日志耗时太大、`io` 太高的问题呢? 优化后日志处理流程: - 获取待处理文件列表 - 拉取 OSS 日志到本地磁盘 (压缩文件) - 读取压缩日志数据 - 业务处理…… - 导入到数据仓储中 ```go package main import ( "fmt" "os" "path/filepath" "sync" "time" ".../pkg/aliyun_oss" // 虚假的包 "github.com/cyberdelia/lzo" ) func main() { var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装 files := list() // 这是一个虚构的方法,用来获取待处理的文件列表 fmt.Printf("待处理文件数量:%d\n", len(files)) start := time.Now() defer func(t time.Time) { fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds()) }(start) // 下载日志文件 n := 20 var wg sync.WaitGroup c := make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { f, ok := <-c if !ok { return } if _, err := os.Stat(f); err == nil { return } else if !os.IsNotExist(err) { panic(err) } dir := filepath.Dir(f) err := os.MkdirAll(dir, 0755) if err != nil { panic(err) } err = oss.GetObjectToFile(f, f) if err != nil { panic(err) } } }() } for _, f := range files { c <- f } close(c) wg.Wait() fmt.Printf("下载文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) start = time.Now() c = make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { file, ok := <-c if !ok { return } f, err := os.Open(file) if err != nil { panic(err) } r, err := lzo.NewReader(f) if err != nil { panic(err) } Read(r) r.Close() f.Close() } }() } for _, f := range files { c <- f } close(c) wg.Wait() fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) } ``` 这个方案消除了解压缩日志,并且直接读取压缩日志,使用 `github.com/cyberdelia/lzo` 包对压缩文件数据流进行边读取边解压,这次不用单独封装新的方法了,直接使用 `lzo` 包中的接口即可。 程序运行结果如下: ```shell 待处理文件数量:432 下载文件耗时:286.146603 读取文件耗时:132.787345 共耗时:418.942862 ``` 这个方案效果非常明显,总耗时从 `1375.187261` 降低到 `418.942862` 提升了 3 倍的效率,不仅消除了压缩的时间,还大大缩短了读取文件耗时,成果显著。 通过 `iostat -m -x 5 10000` 分析各个阶段结果如下: 下载时: ```shell avg-cpu: %user %nice %system %iowait %steal %idle 5.08 0.00 13.24 29.34 0.00 52.33 Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 2.80 1.40 11.80 0.01 0.07 12.00 0.02 1.85 1.14 1.93 0.18 0.24 vdb 0.00 17207.60 0.60 212.40 0.00 75.06 721.74 55.81 236.34 84.33 236.77 2.49 53.14 ``` 读取时: ```shell avg-cpu: %user %nice %system %iowait %steal %idle 80.66 0.00 4.83 14.50 0.00 0.00 Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 0.00 6.20 0.20 0.06 0.00 20.00 0.01 1.69 1.71 1.00 0.62 0.40 vdb 0.00 6.80 390.40 19.20 118.78 0.23 595.04 74.87 190.55 197.95 40.08 1.85 75.90 ``` 通过 `iostat` 结果分析,下载时 `io` 阻塞和优化前波动不是很大,读取时的 `io` 优化已经非常好了,`iowait` 从 `92.19%` 降低到 `14.5%` ,CPU 更多的任务用来处理解压缩日志,而不是处理 `io` 阻塞。 ## 优化方案二 本来优化到上面的效果已经非常满意了,不过既然开始做优化就不能草草结束了,仔细思考业务场景,需要 本地 `lzo` 文件?重新处理日志的频率高吗?本地 `lzo` 日志清理方便吗? 通过上面的几个问题发现,除非程序出现问题或者数据存储出现故障,否者极少需要重新处理日志,一年里面这种情况也是极少的,甚至不会发生。 那么思考一下,不下载日志,直接读取网络数据流,实现边下边解压边读取,这样岂不是没有 `io` 了吗? 优化后日志处理流程: - 获取待处理文件列表 - 拉取 OSS 日志,在内存中解压并读取分析日志 - 业务处理…… - 导入到数据仓储中 具体实现如下: ```go package main import ( "fmt" "sync" "time" ".../pkg/aliyun_oss" // 虚假的包 "github.com/cyberdelia/lzo" ) func main() { var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装 files := list() // 这是一个虚构的方法,用来获取待处理的文件列表 fmt.Printf("待处理文件数量:%d\n", len(files)) start := time.Now() defer func(t time.Time) { fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds()) }(start) n := 20 var wg sync.WaitGroup c := make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { f, ok := <-c if !ok { return } r1, err := oss.GetObject(f) if err != nil { panic(err) } r, err := lzo.NewReader(r1) if err != nil { panic(err) } Read(r) r.Close() r1.Close() } }() } for _, f := range files { c <- f } close(c) wg.Wait() fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) } ``` 优化后只有一个流程了,代码简洁了不少,看看效率如何吧! 程序运行结果如下: ```shell 待处理文件数量:432 读取文件耗时:285.993661 共耗时:285.993717 ``` 天啊发生了什么,我使劲擦了擦眼睛,太不可思议了,居然只消耗了下载日志的耗时,较上一个方案总耗时从 `418.942862` 降低到 `285.993717`,提升了近 2 倍的效率,让我们看看上个方案下载文件耗时 `286.146603` ,而新方案总耗时是 `285.993717` 居然只用了上个优化版本的下载时间,究竟发生了什么? 通过 `iostat -m -x 5 10000` 分析结果如下: ```shell avg-cpu: %user %nice %system %iowait %steal %idle 43.73 0.00 9.64 0.31 0.00 46.32 Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util vda 0.00 1.20 4.40 3.80 0.02 0.03 10.93 0.01 1.49 0.59 2.53 0.71 0.58 vdb 0.00 6.80 0.00 24.60 0.00 0.27 22.83 0.02 0.84 0.00 0.84 0.28 0.68 ``` 通过 `iostat` 结果分析,在程序运行期间没有任何 `io` 开销,CPU 居然还有一半的空闲,前面两个版本 CPU 是没有空闲的啊,由此看来之前 CPU 更多的消耗在 `io` 阻塞上了,并没有用来处理业务逻辑。 由此来看也就不足为奇了,为啥优化后只需要下载日志的时间就能处理完所有日志了,没有了 `io` 阻塞,CPU 更多了用来处理业务,把之前下载时写文件 `io` 的耗时,用来解压缩数据,读取数据,且还有更多的空闲,跑出这样的结果也就很正常了。 ## 总结 从优化前耗时 `1375.187261` 秒到 `285.993717` 秒,性能提升 80%, 从 `iowait` `92.19%` 到 `0.31%` 提升近 `100%`,从没有任何 CPU 空闲到有一半空闲,这个过程中有很多值得总结的事情。 `io` 对性能的影响非常大,对 CPU 占用非常严重,导致 CPU 处理业务逻辑的时间片非常少。从 `io` 转移到 CPU 对性能提升非常明显。CPU 计算效率十分的高,从 `io` 密集到密集计算,只要符合业务场景,往往能给我们带来意想不到的效果。 往往优化业务并不需要十分高大上的技术,只是转变一下思路,不仅代码更少,且程序更简短、好维护、逻辑更清晰。 一定要结合实际业务场景进行思考,减少理所当然和业务无关的中间环节,往往就可以极大的提升程序效率。 **转载:** **本文作者: 戚银([thinkeridea](https://blog.thinkeridea.com/))** **本文链接: [https://blog.thinkeridea.com/201907/go/compress_file_io_optimization2.html](https://blog.thinkeridea.com/201907/go/compress_file_io_optimization2.html)** **版权声明: 本博客所有文章除特别声明外,均采用 [CC BY 4.0 CN协议](http://creativecommons.org/licenses/by/4.0/deed.zh) 许可协议。转载请注明出处!**

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

576 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传