原文链接: https://blog.thinkeridea.com/201907/go/compress_file_io_optimization2.html
[上一篇文章《使用压缩文件优化io (一)》](<https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html>)中记录了日志备份 io 优化方案,使用文件流数据压缩方案优化 io 性能,效果十分显著。这篇文章记录数据分析前置清洗、格式化数据的 io 优化方案,我们有一台专用的日志前置处理服务器,所有业务日志通过这台机器从 OSS 拉取回来清洗、格式化,最后进入到数据仓储中便于后续的分析。
随着业务扩展这台服务器压力越来越大,高峰时数据延迟越来越厉害,早期也是使用 Python 脚本 + awk 以及一些 shell 命令完成相关工作,在数据集不是很大的时候这种方案很好,效率也很高,随着数据集变大,发现服务器负载很高,经过分析是还是 io 阻塞,依旧采用对数据流进行处理的方案优化io,以下记录优化的过程。
## 背景介绍
> 服务器配置:4 核 8G; 磁盘:1T
分析前置服务会根据业务不同分为十分钟、一小时两个阶段拉取分析日志,每隔一个阶段会去 OSS 拉取日志回到服务器进行处理,处理过程因 io 阻塞,导致 CPU 和 load 异常高,且处理效率严重下降,这次优化主要就是降低 io 阻塞,提升 CPU 利用率 (处理业务逻辑而不是等待 io) 和处理效率。
后文中会详细描述优化前后的方案,并用 go 编写测试,使用一台 2 核4G的服务器进行测试,测试数据集大小为:
- 文件数量:432个
- 压缩文件:17G
- 解压后文件:63G
- 压缩方案:lzo
- Goroutine 数量:20
## 优化前
优化前日志处理流程:
- 获取待处理文件列表
- 拉取 OSS 日志到本地磁盘 (压缩文件)
- 解压缩日志文件
- 读取日志数据
- 业务处理……
- 导入到数据仓储中
导致 io 阻塞的部分主要是: 拉取 OSS 日志、解压缩日志文件及读取日志数据,优化也主要从这三块着手。
这里有一段公共的日志读取方法,该方法接收一个 `io.Reader`, 并按行读取日志,并简单切分日志字段,并没有实质的处理日志数据,后面的优化方案也将使用这个方法读取日志。
```go
package main
import (
"bufio"
"bytes"
"io"
"github.com/thinkeridea/go-extend/exbytes"
)
func Read(r io.Reader) {
rawBuffer := make([]byte, 512)
buf := bufio.NewReader(r)
for {
line, ok, err := readLine(buf, rawBuffer)
if err == io.EOF {
return
}
if err != nil {
panic(nil)
}
if ok {
rawBuffer = line
}
c := bytes.Count(line, []byte{'\x01'})
if c != 65 {
panic("无效的行")
}
}
}
func readLine(r *bufio.Reader, rawBuffer []byte) ([]byte, bool, error) {
var ok bool
line, err := r.ReadSlice('\n')
if (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF {
rawBuffer = append(rawBuffer[:0], line...)
for (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF {
line, err = r.ReadSlice('\n')
rawBuffer = append(rawBuffer, line...)
}
line = rawBuffer
ok = true
}
if len(line) > 0 && err == io.EOF {
err = nil
}
return line, ok, err
}
```
日志按 `\r\r\n` 分隔行,使用 `\x01` 切分字段,读取方法使用 `bufio.ReadSlice` 方法,避免内存分配,且当 `bufio` 缓冲区满之后使用 `rwaBuffer` 作为本地可扩展缓冲,每次扩展之后会保留最大的扩展空间,因为业务日志每行大小差不多,这样可以极大的减少内存分配,效率是 `bufio.ReadLine` 方法的好几倍。
```go
package main
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
".../pkg/aliyun_oss" // 虚假的包
)
func main() {
var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装
files := list() // 这是一个虚构的方法,用来获取待处理的文件列表
fmt.Printf("待处理文件数量:%d\n", len(files))
start := time.Now()
defer func(t time.Time) {
fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds())
}(start)
// 下载日志文件
n := 20
var wg sync.WaitGroup
c := make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
f, ok := <-c
if !ok {
return
}
if _, err := os.Stat(f); err == nil {
return
} else if !os.IsNotExist(err) {
panic(err)
}
dir := filepath.Dir(f)
err := os.MkdirAll(dir, 0755)
if err != nil {
panic(err)
}
err = oss.GetObjectToFile(f, f)
if err != nil {
panic(err)
}
}
}()
}
for _, f := range files {
c <- f
}
close(c)
wg.Wait()
fmt.Printf("下载文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
// 解压日志文件
start = time.Now()
shell := exec.Command("/bin/bash", "-c", "lzop -df logs/*/*/*/*/*/*.lzo")
err := shell.Run()
if err != nil {
panic(err)
}
fmt.Printf("解压文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
// 读取日志文件
start = time.Now()
c = make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
file, ok := <-c
if !ok {
return
}
f, err := os.Open(file)
if err != nil {
panic(err)
}
Read(f)
f.Close()
}
}()
}
for _, f := range files {
c <- strings.TrimRight(f, ".lzo")
}
close(c)
wg.Wait()
fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
}
```
运行程序输出如下:
```go
待处理文件数量:432
下载文件耗时:303.562865
解压文件耗时:611.236232
读取文件耗时:460.371245
共耗时:1375.187261
```
通过 `iostat -m -x 5 10000` 分析各个阶段结果如下:
- 下载时:
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
7.85 0.00 16.44 11.24 0.00 64.48
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 80.40 7.80 8.98 0.04 209.36 0.40 4.57 4.64 3.77 0.50 4.44
vdb 1.40 761.20 247.60 264.00 14.70 60.92 302.72 9.17 17.92 10.36 25.00 0.52 26.52
```
- 解压时:
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
8.54 0.00 8.33 68.39 0.00 14.74
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 1.20 3.40 11.80 0.01 0.05 8.95 0.30 20.03 0.41 25.68 0.55 0.84
vdb 0.00 22037.80 107.80 243.20 26.45 107.01 778.71 83.52 236.68 74.31 308.65 2.52 88.54
```
- 读取时:
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
2.74 0.00 5.07 92.19 0.00 0.00
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 2.40 3.80 23.60 0.01 0.14 11.85 0.12 4.48 1.95 4.89 0.33 0.90
vdb 1.80 4.60 347.20 6.20 139.97 0.08 811.60 126.62 358.04 360.79 203.48 2.83 100.00
```
通过 `iostat` 结果可以看出,在解压和读取日志时 `io` 阻塞比较严重,且运行时间较长,下载时 `io` 阻塞也存在,但还可以接受,通过下面两个方案逐渐消除掉 `io`。
## 优化方案一
优化前的方案反应出在解压和读取日志时 `io` 阻塞比较严重,那么是否可以通过读取 `lzo` 压缩文件,以此来消除解压缩日志耗时太大、`io` 太高的问题呢?并且读取 `lzo` 压缩文件远比解压后文件小,来降低读取日志耗时太大、`io` 太高的问题呢?
优化后日志处理流程:
- 获取待处理文件列表
- 拉取 OSS 日志到本地磁盘 (压缩文件)
- 读取压缩日志数据
- 业务处理……
- 导入到数据仓储中
```go
package main
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
".../pkg/aliyun_oss" // 虚假的包
"github.com/cyberdelia/lzo"
)
func main() {
var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装
files := list() // 这是一个虚构的方法,用来获取待处理的文件列表
fmt.Printf("待处理文件数量:%d\n", len(files))
start := time.Now()
defer func(t time.Time) {
fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds())
}(start)
// 下载日志文件
n := 20
var wg sync.WaitGroup
c := make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
f, ok := <-c
if !ok {
return
}
if _, err := os.Stat(f); err == nil {
return
} else if !os.IsNotExist(err) {
panic(err)
}
dir := filepath.Dir(f)
err := os.MkdirAll(dir, 0755)
if err != nil {
panic(err)
}
err = oss.GetObjectToFile(f, f)
if err != nil {
panic(err)
}
}
}()
}
for _, f := range files {
c <- f
}
close(c)
wg.Wait()
fmt.Printf("下载文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
start = time.Now()
c = make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
file, ok := <-c
if !ok {
return
}
f, err := os.Open(file)
if err != nil {
panic(err)
}
r, err := lzo.NewReader(f)
if err != nil {
panic(err)
}
Read(r)
r.Close()
f.Close()
}
}()
}
for _, f := range files {
c <- f
}
close(c)
wg.Wait()
fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
}
```
这个方案消除了解压缩日志,并且直接读取压缩日志,使用 `github.com/cyberdelia/lzo` 包对压缩文件数据流进行边读取边解压,这次不用单独封装新的方法了,直接使用 `lzo` 包中的接口即可。
程序运行结果如下:
```shell
待处理文件数量:432
下载文件耗时:286.146603
读取文件耗时:132.787345
共耗时:418.942862
```
这个方案效果非常明显,总耗时从 `1375.187261` 降低到 `418.942862` 提升了 3 倍的效率,不仅消除了压缩的时间,还大大缩短了读取文件耗时,成果显著。
通过 `iostat -m -x 5 10000` 分析各个阶段结果如下:
下载时:
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
5.08 0.00 13.24 29.34 0.00 52.33
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 2.80 1.40 11.80 0.01 0.07 12.00 0.02 1.85 1.14 1.93 0.18 0.24
vdb 0.00 17207.60 0.60 212.40 0.00 75.06 721.74 55.81 236.34 84.33 236.77 2.49 53.14
```
读取时:
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
80.66 0.00 4.83 14.50 0.00 0.00
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 6.20 0.20 0.06 0.00 20.00 0.01 1.69 1.71 1.00 0.62 0.40
vdb 0.00 6.80 390.40 19.20 118.78 0.23 595.04 74.87 190.55 197.95 40.08 1.85 75.90
```
通过 `iostat` 结果分析,下载时 `io` 阻塞和优化前波动不是很大,读取时的 `io` 优化已经非常好了,`iowait` 从 `92.19%` 降低到 `14.5%` ,CPU 更多的任务用来处理解压缩日志,而不是处理 `io` 阻塞。
## 优化方案二
本来优化到上面的效果已经非常满意了,不过既然开始做优化就不能草草结束了,仔细思考业务场景,需要 本地 `lzo` 文件?重新处理日志的频率高吗?本地 `lzo` 日志清理方便吗?
通过上面的几个问题发现,除非程序出现问题或者数据存储出现故障,否者极少需要重新处理日志,一年里面这种情况也是极少的,甚至不会发生。
那么思考一下,不下载日志,直接读取网络数据流,实现边下边解压边读取,这样岂不是没有 `io` 了吗?
优化后日志处理流程:
- 获取待处理文件列表
- 拉取 OSS 日志,在内存中解压并读取分析日志
- 业务处理……
- 导入到数据仓储中
具体实现如下:
```go
package main
import (
"fmt"
"sync"
"time"
".../pkg/aliyun_oss" // 虚假的包
"github.com/cyberdelia/lzo"
)
func main() {
var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装
files := list() // 这是一个虚构的方法,用来获取待处理的文件列表
fmt.Printf("待处理文件数量:%d\n", len(files))
start := time.Now()
defer func(t time.Time) {
fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds())
}(start)
n := 20
var wg sync.WaitGroup
c := make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
f, ok := <-c
if !ok {
return
}
r1, err := oss.GetObject(f)
if err != nil {
panic(err)
}
r, err := lzo.NewReader(r1)
if err != nil {
panic(err)
}
Read(r)
r.Close()
r1.Close()
}
}()
}
for _, f := range files {
c <- f
}
close(c)
wg.Wait()
fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
}
```
优化后只有一个流程了,代码简洁了不少,看看效率如何吧!
程序运行结果如下:
```shell
待处理文件数量:432
读取文件耗时:285.993661
共耗时:285.993717
```
天啊发生了什么,我使劲擦了擦眼睛,太不可思议了,居然只消耗了下载日志的耗时,较上一个方案总耗时从 `418.942862` 降低到 `285.993717`,提升了近 2 倍的效率,让我们看看上个方案下载文件耗时 `286.146603` ,而新方案总耗时是 `285.993717` 居然只用了上个优化版本的下载时间,究竟发生了什么?
通过 `iostat -m -x 5 10000` 分析结果如下:
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
43.73 0.00 9.64 0.31 0.00 46.32
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 1.20 4.40 3.80 0.02 0.03 10.93 0.01 1.49 0.59 2.53 0.71 0.58
vdb 0.00 6.80 0.00 24.60 0.00 0.27 22.83 0.02 0.84 0.00 0.84 0.28 0.68
```
通过 `iostat` 结果分析,在程序运行期间没有任何 `io` 开销,CPU 居然还有一半的空闲,前面两个版本 CPU 是没有空闲的啊,由此看来之前 CPU 更多的消耗在 `io` 阻塞上了,并没有用来处理业务逻辑。
由此来看也就不足为奇了,为啥优化后只需要下载日志的时间就能处理完所有日志了,没有了 `io` 阻塞,CPU 更多了用来处理业务,把之前下载时写文件 `io` 的耗时,用来解压缩数据,读取数据,且还有更多的空闲,跑出这样的结果也就很正常了。
## 总结
从优化前耗时 `1375.187261` 秒到 `285.993717` 秒,性能提升 80%, 从 `iowait` `92.19%` 到 `0.31%` 提升近 `100%`,从没有任何 CPU 空闲到有一半空闲,这个过程中有很多值得总结的事情。
`io` 对性能的影响非常大,对 CPU 占用非常严重,导致 CPU 处理业务逻辑的时间片非常少。从 `io` 转移到 CPU 对性能提升非常明显。CPU 计算效率十分的高,从 `io` 密集到密集计算,只要符合业务场景,往往能给我们带来意想不到的效果。
往往优化业务并不需要十分高大上的技术,只是转变一下思路,不仅代码更少,且程序更简短、好维护、逻辑更清晰。
一定要结合实际业务场景进行思考,减少理所当然和业务无关的中间环节,往往就可以极大的提升程序效率。
**转载:**
**本文作者: 戚银([thinkeridea](https://blog.thinkeridea.com/))**
**本文链接: [https://blog.thinkeridea.com/201907/go/compress_file_io_optimization2.html](https://blog.thinkeridea.com/201907/go/compress_file_io_optimization2.html)**
**版权声明: 本博客所有文章除特别声明外,均采用 [CC BY 4.0 CN协议](http://creativecommons.org/licenses/by/4.0/deed.zh) 许可协议。转载请注明出处!**
有疑问加站长微信联系(非本文作者)