Go:实现秒级读取16GB文件

汪明军_3145 · · 1534 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

【译文】原文地址
如今任何计算机系统每天都会产生大量的日志或数据。随着系统的增长,将调试数据存储到数据库中是不可行的,因为它们是不可变的,主要用于分析和解决故障的目的。因此,企业倾向于将其存储在文件中,并保存在本地磁盘中。

我们将使用Golang从大小为16 GB的.txt或.log文件中提取日志,该文件有数百万行。

直接上代码,首先打开文件,将使用标准Go os.File来读取文件IO:

f, err := os.Open(fileName)
 if err != nil {
   fmt.Println("cannot able to read the file", err)
   return
 }
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

一旦打开文件后,我们有以下两个选择进行下一步处理。
1、逐行读取文件,这有助于减少系统内存压力,但会在IO中花费更多时间。
2、一次性将整个文件都读入内存并处理该文件,这会消耗很大内存,但会节约时间。
由于文件太大例如16GB,无法将整个文件加载到内存中。但是第一个选择也不可行,因为我们希望几秒钟内处理该文件。

但是你猜怎么着,还有第三种选择。不是加载整个文件到内存,而是使用bufio.NewReader()以块的形式加载文件。

r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
   buf = buf[:n]
if n == 0 {
   
     if err != nil {
       fmt.Println(err)
       break
     }
     if err == io.EOF {
       break
     }
     return err
  }
}

一旦我们读取到文件块,我们将fork一个线程,也就是Go协程,来同时处理每个块。上面的代码将改为如下:

//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte, 500*1024)
        return lines
}}
stringPool := sync.Pool{New: func() interface{} {
          lines := ""
          return lines
}}
slicePool := sync.Pool{New: func() interface{} {
           lines := make([]string, 100)
           return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
     
     buf := linesPool.Get().([]byte)
     n, err := r.Read(buf)
     buf = buf[:n]
if n == 0 {
        if err != nil {
            fmt.Println(err)
            break
        }
        if err == io.EOF {
            break
        }
        return err
     }
nextUntillNewline, err := r.ReadBytes('\n')//read entire line
     
     if err != io.EOF {
         buf = append(buf, nextUntillNewline...)
     }
     
     wg.Add(1)
     go func() { 
      
        //process each chunk concurrently
        //start -> log start time, end -> log end time
        
        ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
wg.Done()
     
     }()
}
wg.Wait()
}

上面的代码引入了两个新的优化:
1、sync.Pool是一个强大的实例池,可以重用实例来减少垃圾收集器的压力。我们将重用分配的内存片。这将减少内存消耗,使代码优化更高效。
2、Go协程并发处理缓冲区块,大大提高了处理速度。
下面来实现ProcessChunk函数,处理如下格式的日志信息:

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳提取日志:

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further                             
      var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
      logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow 
         noOfThread++
      }
length := len(logsSlice)
//traverse the chunk
     for i := 0; i < length; i += chunkSize {
         
         wg2.Add(1)
//process each chunk in saperate chunk
         go func(s int, e int) {
            for i:= s; i<e;i++{
               text := logsSlice[i]
if len(text) == 0 {
                  continue
               }
           
            logParts := strings.SplitN(text, ",", 2)
            logCreationTimeString := logParts[0]
            logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
                 fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                 return
            }
// check if log's timestamp is inbetween our desired period
          if logCreationTime.After(start) && logCreationTime.Before(end) {
          
            fmt.Println(text)
           }
        }
        textSlice = nil
        wg2.Done()
     
     }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
   //passing the indexes for processing
}  
   wg2.Wait() //wait for a chunk to finish
   logsSlice = nil
}

上面的代码使用16GB的日志文件进行基础测试,提取日志所花费的时间约25秒。
下面是完成代码:

func main() {

    s := time.Now()
    args := os.Args[1:]
    if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
        fmt.Println("Please give proper command line arguments")
        return
    }
    startTimeArg := args[1]
    finishTimeArg := args[3]
    fileName := args[5]

    file, err := os.Open(fileName)
    
    if err != nil {
        fmt.Println("cannot able to read the file", err)
        return
    }
    
    defer file.Close() //close after checking err
    
    queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
    if err != nil {
        fmt.Println("Could not able to parse the start time", startTimeArg)
        return
    }

    queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
    if err != nil {
        fmt.Println("Could not able to parse the finish time", finishTimeArg)
        return
    }

    filestat, err := file.Stat()
    if err != nil {
        fmt.Println("Could not able to get the file stat")
        return
    }

    fileSize := filestat.Size()
    offset := fileSize - 1
    lastLineSize := 0

    for {
        b := make([]byte, 1)
        n, err := file.ReadAt(b, offset)
        if err != nil {
            fmt.Println("Error reading file ", err)
            break
        }
        char := string(b[0])
        if char == "\n" {
            break
        }
        offset--
        lastLineSize += n
    }

    lastLine := make([]byte, lastLineSize)
    _, err = file.ReadAt(lastLine, offset+1)

    if err != nil {
        fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
        return
    }

    logSlice := strings.SplitN(string(lastLine), ",", 2)
    logCreationTimeString := logSlice[0]

    lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
    if err != nil {
        fmt.Println("can not able to parse time : ", err)
    }

    if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
        Process(file, queryStartTime, queryFinishTime)
    }

    fmt.Println("\nTime taken - ", time.Since(s))
}

func Process(f *os.File, start time.Time, end time.Time) error {

    linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte, 250*1024)
        return lines
    }}

    stringPool := sync.Pool{New: func() interface{} {
        lines := ""
        return lines
    }}

    r := bufio.NewReader(f)

    var wg sync.WaitGroup

    for {
        buf := linesPool.Get().([]byte)

        n, err := r.Read(buf)
        buf = buf[:n]

        if n == 0 {
            if err != nil {
                fmt.Println(err)
                break
            }
            if err == io.EOF {
                break
            }
            return err
        }

        nextUntillNewline, err := r.ReadBytes('\n')

        if err != io.EOF {
            buf = append(buf, nextUntillNewline...)
        }

        wg.Add(1)
        go func() {
            ProcessChunk(buf, &linesPool, &stringPool, start, end)
            wg.Done()
        }()

    }

    wg.Wait()
    return nil
}

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {

    var wg2 sync.WaitGroup

    logs := stringPool.Get().(string)
    logs = string(chunk)

    linesPool.Put(chunk)

    logsSlice := strings.Split(logs, "\n")

    stringPool.Put(logs)

    chunkSize := 300
    n := len(logsSlice)
    noOfThread := n / chunkSize

    if n%chunkSize != 0 {
        noOfThread++
    }

    for i := 0; i < (noOfThread); i++ {

        wg2.Add(1)
        go func(s int, e int) {
            defer wg2.Done() //to avaoid deadlocks
            for i := s; i < e; i++ {
                text := logsSlice[i]
                if len(text) == 0 {
                    continue
                }
                logSlice := strings.SplitN(text, ",", 2)
                logCreationTimeString := logSlice[0]

                logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
                if err != nil {
                    fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
                    return
                }

                if logCreationTime.After(start) && logCreationTime.Before(end) {
                    //fmt.Println(text)
                }
            }
            

        }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
    }

    wg2.Wait()
    logsSlice = nil
}

欢迎您提出任何疑问和改进。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:汪明军_3145

查看原文:Go:实现秒级读取16GB文件

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1534 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传