原文连接:https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html
最近遇到一个日志备份 io 过高的问题,业务日志每十分钟备份一次,本来是用 Python 写一个根据规则扫描备份日志问题不大,但是随着业务越来越多,单机上的日志文件越来越大,文件数量也越来越多,导致每每备份的瞬间 io 阻塞严重, CPU 和 load 异常的高,好在备份速度很快,对业务影响不是很大,这个问题会随着业务增长,越来越明显,这段时间抽空对备份方式做了优化,效果十分显著,整理篇文章记录一下。
## 背景说明
> 服务器配置:4 核 8G; 磁盘:500G
>
> 每十分钟需要上传:18 个文件,高峰时期约 10 G 左右
业务日志为了保证可靠性,会先写入磁盘文件,每10分钟切分日志文件,然后在下十分钟第一分时备份日志到 OSS,数据分析服务会从在备份完成后拉取日志进行分析,日志备份需要高效快速,在最短的时间内备份完,一般备份均能在几十秒内完成。
备份的速度和效率并不是问题,足够的快,但是在备份时 io 阻塞严重导致的 CPU 和 load 异常,成为业务服务的瓶颈,在高峰期业务服务仅消耗一半的系统资源,但是备份时 CPU 经常 100%,且 iowait 可以达到 70 多,空闲资源非常少,这样随着业务扩展,日志备份虽然时间很短,却成为了系统的瓶颈。
后文中会详细描述优化前后的方案,并用 go 编写测试,使用一台 2 核4G的服务器进行测试,测试数据集大小为:
- 文件数:336
- 原始文件:96G
- 压缩文件:24G
- 压缩方案:lzo
- Goroutine 数量:4
## 优化前
优化前日志备份流程:
- 根据备份规则扫描需要备份的文件
- 使用 `lzop` 命令压缩日志
- 上传压缩后的日志到 OSS
下面是代码实现,这里不再包含备份文件规则,仅演示压缩上传逻辑部分,程序接受文件列表,并对文件列表压缩上传至 OSS 中。
`.../pkg/aliyun_oss` 是我自己封装的基于阿里云 OSS 操作的包,这个路径是错误的,仅做演示,想运行下面的代码,OSS 交互这部分需要自己实现。
```go
package main
import (
"bytes"
"fmt"
"os"
"os/exec"
"path/filepath"
"sync"
"time"
".../pkg/aliyun_oss"
)
func main() {
var oss *aliyun_oss.AliyunOSS
files := os.Args[1:]
if len(files) < 1 {
fmt.Println("请输入要上传的文件")
os.Exit(1)
}
fmt.Printf("待备份文件数量:%d\n", len(files))
startTime := time.Now()
defer func(startTime time.Time) {
fmt.Printf("共耗时:%s\n", time.Now().Sub(startTime).String())
}(startTime)
var wg sync.WaitGroup
n := 4
c := make(chan string)
// 压缩日志
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for file := range c {
cmd := exec.Command("lzop", file)
cmd.Stderr = &bytes.Buffer{}
err := cmd.Run()
if err != nil {
panic(cmd.Stderr.(*bytes.Buffer).String())
}
}
}()
}
for _, file := range files {
c <- file
}
close(c)
wg.Wait()
fmt.Printf("压缩耗时:%s\n", time.Now().Sub(startTime).String())
// 上传压缩日志
startTime = time.Now()
c = make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for file := range c {
name := filepath.Base(file)
err := oss.PutObjectFromFile("tmp/"+name+".lzo", file+".lzo")
if err != nil {
panic(err)
}
}
}()
}
for _, file := range files {
c <- file
}
close(c)
wg.Wait()
fmt.Printf("上传耗时:%s\n", time.Now().Sub(startTime).String())
}
```
程序运行时输出:
```shell
待备份文件数量:336
压缩耗时:19m44.125314226s
上传耗时:6m14.929371103s
共耗时:25m59.118002969s
```
从运行结果中可以看出压缩文件耗时很久,实际通过 `iostat` 命令分析也发现,压缩时资源消耗比较高,下面是 `iostat -m -x 5 10000` 命令采集各个阶段数据。
- 程序运行前
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
2.35 0.00 2.86 0.00 0.00 94.79
Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
vdb 0.00 0.60 0.00 0.60 0.00 4.80 16.00 0.00 0.67 0.00 0.67 0.67 0.04
```
- 压缩日志时
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
10.84 0.00 6.85 80.88 0.00 1.43
Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 0.60 0.00 2.40 0.00 8.00 0.00 0.67 0.67 0.00 0.67 0.04
vdb 14.80 5113.80 1087.60 60.60 78123.20 20697.60 172.13 123.17 106.45 106.26 109.87 0.87 100.00
avg-cpu: %user %nice %system %iowait %steal %idle
10.06 0.00 7.19 79.06 0.00 3.70
Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 1.60 0.00 103.20 0.00 129.00 0.01 3.62 3.62 0.00 0.50 0.08
vdb 14.20 4981.20 992.80 52.60 79682.40 20135.20 190.97 120.34 112.19 110.60 142.17 0.96 100.00
```
- 上传日志时
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
6.98 0.00 7.81 7.71 0.00 77.50
Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 13.40 0.00 242.40 0.00 36.18 0.02 1.63 1.63 0.00 0.19 0.26
vdb 0.40 2.40 269.60 1.20 67184.80 14.40 496.30 4.58 15.70 15.77 0.33 1.39 37.74
avg-cpu: %user %nice %system %iowait %steal %idle
7.06 0.00 8.00 4.57 0.00 80.37
Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 0.60 0.00 75.20 0.00 250.67 0.00 2.67 2.67 0.00 2.00 0.12
vdb 0.20 0.00 344.80 0.00 65398.40 0.00 379.34 5.66 16.42 16.42 0.00 1.27 43.66
```
从 `iostat` 的结果中发现,压缩时程序 `r_await` 和 `w_await` 都到了一百多,且 `iowait` 高达 `80.88%`,几乎耗尽了所有的 CPU,上传时 `iowait` 是可以接受的,因为只是单纯的读取压缩文件,且压缩文件也很小。
### 分析问题
上述结果中发现程序主要运行消耗在压缩日志,那优化也着重日志压缩的逻辑上。
压缩时日志会先压缩成 `lzo` 文件,然后再上传 `lzo` 文件到阿里云 OSS 上,这中间发生了几个过程:
- 读取原始日志文件
- 压缩数据
- 写入 `lzo` 文件
- 读取 `lzo` 文件
- `http` 发送读取的内容
压缩时 `r_await` 和 `w_await` 都很高,主要发生在读取原始日志文件,写入 `lzo` 文件, 怎么优化呢?
先想一下原始需求,读取原始文件 -> 上传数据。但是直接上传原始文件,文件比较大,网络传输慢,而且存储费用也比较高,怎么办呢?
这个时候我们期望可以上传的是压缩文件,所以就有了优化前的逻辑,这里面产生了一个中间过程,即使用 `lzop` 命令压缩文件,而且产生了一个中间文件 `lzo` 文件。
读取原始文件和上传数据是必须的,那么可以优化的就是压缩的流程了,所以 `r_await` 是没有办法优化的,那么只能优化 `w_await`,`w_await` 是怎么产生的呢,恰恰是写入`lzo` 时产生的,可以不要 `lzo` 文件吗?这个文件有什么作用?
如果我们压缩文件数据流,在 读取原始文件 -> 上传数据 流程中对上传的数据流进行实时压缩,把压缩的内容给上传了,实现边读边压缩,对数据流进行处理,像是一个中间件,这样就不用写 `lzo` 文件了,那么 `w_await` 就被完全优化没了。
`lzo` 文件有什么作用?我想只有在上传失败之后可以节省一次文件压缩的消耗。上传失败的次数多吗?我用阿里云 OSS 好几年了,除了一次内网故障,再也没有遇到过上传失败的文件,我想是不需要这个文件的,而且生成 `lzo` 文件还需要占用磁盘空间,定时清理等等,增加了资源消耗和维护成本。
### 优化后
根据之前的分析看一下优化之后备份文件需要哪些过程:
- 读取原始日志
- 在内存中压缩数据流
- http 发送压缩后的内容
这个流程节省了两个步骤,写入 `lzo` 文件和 读取 `lzo` 文件,不仅没有 `w_await`,就连 `r_await` 也得到了小幅度的优化。
优化方案确定了,可是怎么实现 `lzo` 对文件流进行压缩呢,去 `Github` 上找一下看看有没有 `lzo` 的压缩算法库,发现 `github.com/cyberdelia/lzo` ,虽然是引用 C 库实现的,但是经典的两个算法(`lzo1x_1` 和 `lzo1x_999`)都提供了接口,貌似 Go 可以直接用了也就这一个库了。
发现这个库实现了 `io.Reader` 和 `io.Writer` 接口,`io.Reader` 读取压缩文件流,输出解压缩数据,`io.Writer` 实现输入原始数据,并写入到输入的 `io.Writer`。
想实现压缩数据流,看来需要使用 `io.Writer` 接口了,但是这个输入和输出都是 `io.Writer`,这可为难了,因为我们读取文件获得是 `io.Reader`,http 接口输入也是 `io.Reader`,貌似没有可以直接用的接口,没有办法实现了吗,不会我们自已封装一下,下面是封装的 `lzo` 数据流压缩方法:
```go
package lzo
import (
"bytes"
"io"
"github.com/cyberdelia/lzo"
)
type Reader struct {
r io.Reader
rb []byte
buff *bytes.Buffer
lzo *lzo.Writer
err error
}
func NewReader(r io.Reader) *Reader {
z := &Reader{
r: r,
rb: make([]byte, 256*1024),
buff: bytes.NewBuffer(make([]byte, 0, 256*1024)),
}
z.lzo, _ = lzo.NewWriterLevel(z.buff, lzo.BestSpeed)
return z
}
func (z *Reader) compress() {
if z.err != nil {
return
}
var nr, nw int
nr, z.err = z.r.Read(z.rb)
if z.err == io.EOF {
if err := z.lzo.Close(); err != nil {
z.err = err
}
}
if nr > 0 {
nw, z.err = z.lzo.Write(z.rb[:nr])
if z.err == nil && nr != nw {
z.err = io.ErrShortWrite
}
}
}
func (z *Reader) Read(p []byte) (n int, err error) {
if z.err != nil {
return 0, z.err
}
if z.buff.Len() <= 0 {
z.compress()
}
n, err = z.buff.Read(p)
if err == io.EOF {
err = nil
} else if err != nil {
z.err = err
}
return
}
func (z *Reader) Reset(r io.Reader) {
z.r = r
z.buff.Reset()
z.err = nil
z.lzo, _ = lzo.NewWriterLevel(z.buff, lzo.BestSpeed)
}
```
这个库会固定消耗 512k 内存,并不是很大,我们需要创建一个读取 buf 和一个压缩缓冲 buf, 都是256k的大小,实际压缩缓冲的 buf 并不需要 256k,毕竟压缩后数据会比原始数据小,考虑空间并不是很大,直接分配 256k 避免运行时分配。
实现原理当 http 从输入的 `io.Reader` (实际就是我们上面封装的 `lzo` 库), 读取数据时,这个库检查压缩缓冲是否为空,为空的情况会从文件读取 256k 数据并压缩输入到压缩缓冲中,然后从压缩缓冲读取数据给 http 的 `io.Reader`,如果压缩缓冲区有数据就直接从压缩缓冲区读取压缩数据。
这并不是线程安全的,并且固定分配 512k 的缓冲,所以也提供了一个 `Reset` 方法,来复用这个对象,避免重复分配内存,但是需要保证一个` lzo` 对象实例只能被一个 Goroutine 访问, 这可以使用 `sync.Pool` 来保证,下面的代码我用另一种方法来保证。
```go
package main
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
".../pkg/aliyun_oss"
".../pkg/lzo"
)
func main() {
var oss *aliyun_oss.AliyunOSS
files := os.Args[1:]
if len(files) < 1 {
fmt.Println("请输入要上传的文件")
os.Exit(1)
}
fmt.Printf("待备份文件数量:%d\n", len(files))
startTime := time.Now()
defer func() {
fmt.Printf("共耗时:%s\n", time.Now().Sub(startTime).String())
}()
var wg sync.WaitGroup
n := 4
c := make(chan string)
// 压缩日志
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
var compress *lzo.Reader
for file := range c {
r, err := os.Open(file)
if err != nil {
panic(err)
}
if compress == nil {
compress = lzo.NewReader(r)
} else {
compress.Reset(r)
}
name := filepath.Base(file)
err = oss.PutObject("tmp/"+name+"1.lzo", compress)
r.Close()
if err != nil {
panic(err)
}
}
}()
}
for _, file := range files {
c <- file
}
close(c)
wg.Wait()
}
```
程序为每个 Goroutine 分配一个固定的 `compress` ,当需要压缩文件的时候判断是创建还是重置,来达到复用的效果。
该程序运行输出:
```shell
待备份文件数量:336
共耗时 18m20.162441931s
```
实际耗时比优化前提升了 28%, 实际通过 `iostat` 命令分析也发现,资源消耗也有了明显的改善,下面是 `iostat -m -x 5 10000` 命令采集各个阶段数据。
```shell
avg-cpu: %user %nice %system %iowait %steal %idle
15.72 0.00 6.58 74.10 0.00 3.60
Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
vdb 3.80 3.40 1374.20 1.20 86484.00 18.40 125.79 121.57 87.24 87.32 1.00 0.73 100.00
avg-cpu: %user %nice %system %iowait %steal %idle
26.69 0.00 8.42 64.27 0.00 0.62
Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.20 426.80 0.80 9084.80 4.00 42.51 2.69 6.29 6.30 1.00 0.63 26.92
vdb 1.80 0.00 1092.60 0.00 72306.40 0.00 132.36 122.06 108.45 108.45 0.00 0.92 100.02
```
通过 `iostat` 发现只有 `r_await`, `w_await` 被完全优化,`iowait` 有明显的改善,运行时间更短了,效率更高了,对 io 产生影响的时间也更短了。
## 优化期间遇到的问题
首先对找到的 `lzo` 算法库进行测试,确保压缩和解压缩没有问题,并且和 `lzop` 命令兼容。
在这期间发现使用压缩的数据比 `lzop` 压缩数据大了很多,之后阅读了源码实现,并没有发现任何问题,尝试调整缓冲区大小,发现对生成的压缩文件大小有明显改善。
这个发现让我也很为难,究竟多大的缓冲区合适呢,只能去看 `lzop` 的实现了,发现 lzop 默认压缩块大小为 256k, 实际 lzo 算法支持的最大块大小就是 256k,所以实现 `lzo` 算法包装是创建的是 256k 的缓冲区的,这个缓冲区的大小就是压缩块的大小,大家使用的时候建议不要调整了。
## 总结
这个方案上线之后,由原来需要近半分钟上传的,改善到大约只有十秒(Go 语言本身效率也有很大帮助),而且 load 有了明显的改善。
优化前每当运行日志备份,CPU 经常爆表,优化后备份时 CPU 增幅 20%,可以从容应对业务扩展问题了。
测试是在一台空闲的机器上进行的,实际生产服务器本身 `w_await` 会有 20 左右,如果使用固态硬盘,全双工模式,读和写是分离的,那么优化掉 `w_await` 对业务的帮助是非常大的,不会阻塞业务日志写通道了。
当然我们服务器是高速云盘(机械盘),由于机械盘物理特征只能是半双工,要么读、要么写,所以优化掉 `w_await` 确实效率会提升很多,但是依然会对业务服务写有影响。
**转载:**
**本文作者: 戚银([thinkeridea](https://blog.thinkeridea.com/))**
**本文链接: [https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html](https://blog.thinkeridea.com/201906/go/compress_file_io_optimization1.html)**
**版权声明: 本博客所有文章除特别声明外,均采用 [CC BY 4.0 CN协议](http://creativecommons.org/licenses/by/4.0/deed.zh) 许可协议。转载请注明出处!**
有疑问加站长微信联系(非本文作者)