请教下golang goroutine泄露问题?

breadHood · 2023-03-13 13:51:47 · 3353 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2023-03-13 13:51:47 的主题,其中的信息可能已经有所发展或是发生改变。

golang新手, 最近在写个练手的小项目, 需求是收集指定日志目录下最新的日志文件, 通过正则过滤日志内容, 把想要的信息存到一个map里.

但是这个程序在实际运行中我发现, 由于在catLog包里读取文件内容时里面我必须使用for死循环实现实时读取文件, 导致在getNewFile包里监控到的系统事件信息无法发送给catLog包,这样如果有新的日志文件产生, 它无法切换文件. 于是我在getNewFile包调用catLog包时启用goroutine, 这样就可以收到通知了.

结果导致每次读取文件新内容时, 程序都会启动一个新的goroutine调用catLog包, 并且不会自动退出,因为catLog包里是for死循环, 一直在累积增加.

请大佬们帮忙看下下面的代码, 该怎么修改可以控制goroutine增长, 并且还能实时监控最新的日志文件.

package getNewFile

import (
    "fmt"
    "log"
    "regexp"
    catlog "study/prometheus/exporter_watcherLog/catLog"
    "time"

    "github.com/fsnotify/fsnotify"
)

// 实时监控指定目录下文件并读取
func GetNewFile(pattren, logDir string) {
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        fmt.Println(err)
        return
    }
    defer watcher.Close()

    done := make(chan bool)

    re := regexp.MustCompile(pattren) // Compile 解析一个正则表达式,如果成功则返回一个可用于匹配文本的 Regexp 对象。如错误会panic
    go func() {
        for {
            select {
            case event := <-watcher.Events:
                isMatch := re.MatchString(event.Name)
                if isMatch {
                    if event.Op&fsnotify.Create == fsnotify.Create { // 监控创建文件动作
                    fmt.Println(event.Name)
                    go catlog.DealLog(event.Name)
                    }

                    if event.Op&fsnotify.Write == fsnotify.Write { // 监控写入文件动作
                    fmt.Println(event.Name)
                    go catlog.DealLog(event.Name)
                    }
                }

            case err := <-watcher.Errors:
                log.Println(err)
            }
            time.Sleep(time.Second * 1)
        }
    }()

    err = watcher.Add(logDir) // 监控指定目录
    if err != nil {
        log.Fatal(err)
    }
    <-done
}
package catlog

import (
    "fmt"
    "regexp"
    "runtime"
    "time"

    "sync"

    "github.com/hpcloud/tail"
)

var AllAddrLoglist map[string]ClassLogInfo
var Sm sync.Map
var Notify = false

type ClassLogInfo struct {
    Host string
    Time string
    Date string
    Cfc  string
}

// 实时读取文件内容, 并筛选出host, date, time, cfc存入map中
func DealLog(file string) {
    config := tail.Config{
        ReOpen:    true,                                 // 重新打开
        Follow:    true,                                 // 是否跟随
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件哪个地方开始读
        MustExist: false,                                // 文件不存在报错
        Poll:      true,
        // Logger:    tail.DiscardingLogger, // 禁用日志记录
    }
    tails, err := tail.TailFile(file, config) // 打开文件, 并用上面的配置
    if err != nil {
        fmt.Printf("tail file failed, err: %v\n", err)
        return
    }
    var (
        line *tail.Line
        ok   bool
    )

    ch := make(chan struct{}, 1) // 创建缓冲区大小, 控制并发, 最多发送x个消息就阻塞

    // 正则匹配, 最后只筛选出host;date;time;cfc, 其余的跳过.
    pattern := `(^[0-9]+/[0-9]+/[0-9]+)\s+([0-9]+:[0-9]+:[0-9]+:[0-9]+)\s+[A-Z]+\s+\[.*\]\s+INFO\s+LSG.*LSG\s+svr\s+update:\s+cfc=([0-9]+),cpfc=[0-9]+,clc=[0-9]+,cplc=[0-9]+,load=[0-9]+,status=[0-9]+,addr=(\d+\.\d+\.\d+\.\d+-\d),inst=\d+\.\d+\.\d+\.\d+-\d`
    // re := regexp.MustCompile(pattern) // 两个唯一区别, 如错误直接panic
    re, err := regexp.Compile(pattern) // Compile 解析一个正则表达式,如果成功则返回一个可用于匹配文本的 Regexp 对象。如错误会返回一个错误
    if err != nil {
        fmt.Println(err)
    }

    // 读取每行数据,最后写入到AllAddrLoglist中.
    for {
        ch <- struct{}{} // 写入消息到缓冲区

        line, ok = <-tails.Lines
        if !ok {
            fmt.Printf("tail file close reopen, filename: %s\n", tails.Filename)
            time.Sleep(time.Second)
            continue
        }
        matchArr := re.FindStringSubmatch(line.Text)
        if matchArr != nil { // 新增判断,替代re.MatchString(line.Text),原有的影响性能
            // 信息存入结构体
            testI := ClassLogInfo{
                Host: matchArr[4],
                Date: matchArr[1],
                Time: matchArr[2],
                Cfc:  matchArr[3],
            }
            // 利用sync.map, 直接使用, 无需声明, 避免读写同一个map冲突
            Sm.Store(matchArr[4], testI)
            <-ch // 从缓冲区读取消息
        }
    }
}

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

3353 次点击  
加入收藏 微博
10 回复  |  直到 2023-03-21 23:46:54
GGXXLL
GGXXLL · #1 · 2年之前

简单改一下,不知道理解的对不对哈,始终只监听最新的文件,所以 eventName 每次替换

    go func() {
        var ctx,cancel = context.WithCancel(context.Background())
        var eventName string
        for {
            select {
            case event := <-watcher.Events:
                // 如果是同一文件,跳过
                if eventName == event.Name {
                    continue
                }

                isMatch := re.MatchString(event.Name)
                if isMatch {
                    // 更换文件了,cancel 上次的 go DealLog
                    cancel()
                    // 文件名更新
                    eventName = event.Name
                    if event.Op&fsnotify.Create == fsnotify.Create { // 监控创建文件动作
                        fmt.Println(event.Name)
                        // 重新赋值
                        ctx, cancel = context.WithCancel(context.Background())
                        go DealLog(ctx, event.Name)
                    }

                    if event.Op&fsnotify.Write == fsnotify.Write { // 监控写入文件动作
                        fmt.Println(event.Name)
                        ctx, cancel = context.WithCancel(context.Background())
                        go DealLog(ctx, event.Name)
                    }
                }

            case err := <-watcher.Errors:
                log.Println(err)
            }
            time.Sleep(time.Second * 1)
        }
    }()
    // 读取每行数据,最后写入到AllAddrLoglist中.
    for {
        select {
        case line, ok = <-tails.Lines:
            ....
        case <-ctx.Done():
            return

        }

    }
buguang01
buguang01 · #2 · 2年之前

从楼主的问题中我理解,你当前只希望监听一个文件,只是这个文件随时可能会更换新的;同时新的内容可能是写在新文件中,也可能是旧文件的修改;

但是我看到楼主使用的是tail进行文件的读取,其实存在一个问题,就是对方文件都写完了,你再tail其实会丢失一些数据;

一般第三方的日志系统都是写同一个文件,然后文件写到一定大小的时候,修改文件名字,然后再创建一个原来的文件;

这样监听文件的系统就不用频繁换文件了;


但如果只是就楼主的问题进行探讨的话,我先说一下,楼主当前程序的几个问题:

  1. DealLog函数的ch 是没有意义的。因为他是局部变量不会起到锁的作用;
  2. 假设你理解的是对的,这样使用tail不会丢数据,那也不应该是每收到一个event就开一个协程,而是在收到event的时候,直接把那个文件的数据读到文件结束,然后保存到你的map中,然后继续等一次的event过来;

因为watcher底层开着一个协程监听你的目录中所有文件的变化;每次收到变化的时候,他就会发一个event给你;你直接处理就好了。

总结:如果是这样修改,你不会有开多个协程,也就不存在楼主关于协程泄漏的问题了;


再假设楼主的环境是可能同时多个文件被写入,希望可以并发的获取数据;方案如下:

  1. 首先程序启动的时候,应该对目录下所有的文件,启动协程进行tail;
  2. 然后watcher目录是否有新的文件生成,如果有,就为此启动协程进行tail;

总结:如果是这样修改,也就是需求需要一个文件一个协程。你的额外的协程数就是你的目录下的文件数,且还都不能关闭;

breadHood
breadHood · #3 · 2年之前
GGXXLLGGXXLL #1 回复

简单改一下,不知道理解的对不对哈,始终只监听最新的文件,所以 `eventName` 每次替换 ```go go func() { var ctx,cancel = context.WithCancel(context.Background()) var eventName string for { select { case event := <-watcher.Events: // 如果是同一文件,跳过 if eventName == event.Name { continue } isMatch := re.MatchString(event.Name) if isMatch { // 更换文件了,cancel 上次的 go DealLog cancel() // 文件名更新 eventName = event.Name if event.Op&fsnotify.Create == fsnotify.Create { // 监控创建文件动作 fmt.Println(event.Name) // 重新赋值 ctx, cancel = context.WithCancel(context.Background()) go DealLog(ctx, event.Name) } if event.Op&fsnotify.Write == fsnotify.Write { // 监控写入文件动作 fmt.Println(event.Name) ctx, cancel = context.WithCancel(context.Background()) go DealLog(ctx, event.Name) } } case err := <-watcher.Errors: log.Println(err) } time.Sleep(time.Second * 1) } }() ``` ```go // 读取每行数据,最后写入到AllAddrLoglist中. for { select { case line, ok = <-tails.Lines: .... case <-ctx.Done(): return } } ```

谢谢大佬, 这个方法可以的, 学习了!

breadHood
breadHood · #4 · 2年之前
buguang01buguang01 #2 回复

从楼主的问题中我理解,你当前只希望监听一个文件,只是这个文件随时可能会更换新的;同时新的内容可能是写在新文件中,也可能是旧文件的修改; 但是我看到楼主使用的是tail进行文件的读取,其实存在一个问题,就是对方文件都写完了,你再tail其实会丢失一些数据; 一般第三方的日志系统都是写同一个文件,然后文件写到一定大小的时候,修改文件名字,然后再创建一个原来的文件; 这样监听文件的系统就不用频繁换文件了; --- 但如果只是就楼主的问题进行探讨的话,我先说一下,楼主当前程序的几个问题: 1. DealLog函数的ch 是没有意义的。因为他是局部变量不会起到锁的作用; 2. 假设你理解的是对的,这样使用tail不会丢数据,那也不应该是每收到一个event就开一个协程,而是在收到event的时候,直接把那个文件的数据读到文件结束,然后保存到你的map中,然后继续等一次的event过来; 因为watcher底层开着一个协程监听你的目录中所有文件的变化;每次收到变化的时候,他就会发一个event给你;你直接处理就好了。 总结:如果是这样修改,你不会有开多个协程,也就不存在楼主关于协程泄漏的问题了; --- 再假设楼主的环境是可能同时多个文件被写入,希望可以并发的获取数据;方案如下: 1. 首先程序启动的时候,应该对目录下所有的文件,启动协程进行tail; 2. 然后watcher目录是否有新的文件生成,如果有,就为此启动协程进行tail; 总结:如果是这样修改,也就是需求需要一个文件一个协程。你的额外的协程数就是你的目录下的文件数,且还都不能关闭;

感谢楼主建议! 其实我这个就是监控最新生成的日志文件, 日志文件到达指定大小时会, 业务程序会生成一个新的日志文件, 并将旧日志文件归档, 所以要实现自动切换日志文件.

关于您提出的程序几个问题:

  1. DealLog函数的ch 是没有意义的。因为他是局部变量不会起到锁的作用;===> 是的, 这个是没有什么用, 已经去掉了.
  2. 假设你理解的是对的,这样使用tail不会丢数据,那也不应该是每收到一个event就开一个协程,而是在收到event的时候,直接把那个文件的数据读到文件结束,然后保存到你的map中,然后继续等一次的event过来;====> 我这个逻辑确实有问题, 已经采用1楼的方法解决了.

目前已经采用1楼大佬的方法解决问题了, 其实之前写的逻辑确实存在这两个问题, 一个是没有通知tail里的协程什么时候可以退出, 还有一个是每次有write事件时, 都会去启动一个协程, 这样是不对的, 按照1楼提供的方法已经解决, 感谢!

EddyLee1010
EddyLee1010 · #5 · 2年之前

我是来看大佬秀的。

buguang01
buguang01 · #6 · 2年之前
breadHoodbreadHood #4 回复

#2楼 @buguang01 感谢楼主建议! 其实我这个就是监控最新生成的日志文件, 日志文件到达指定大小时会, 业务程序会生成一个新的日志文件, 并将旧日志文件归档, 所以要实现自动切换日志文件. 关于您提出的程序几个问题: 1. DealLog函数的ch 是没有意义的。因为他是局部变量不会起到锁的作用;===> 是的, 这个是没有什么用, 已经去掉了. 2. 假设你理解的是对的,这样使用tail不会丢数据,那也不应该是每收到一个event就开一个协程,而是在收到event的时候,直接把那个文件的数据读到文件结束,然后保存到你的map中,然后继续等一次的event过来;====> 我这个逻辑确实有问题, 已经采用1楼的方法解决了. 目前已经采用1楼大佬的方法解决问题了, 其实之前写的逻辑确实存在这两个问题, 一个是没有通知tail里的协程什么时候可以退出, 还有一个是每次有write事件时, 都会去启动一个协程, 这样是不对的, 按照1楼提供的方法已经解决, 感谢!

他那个方法中还会存在一个极限情况下的bug;

就是在你已在监听一个文件的时候,那个文件在某一刻写满了,然后系统开始写新的文件了;而你的程序可能第一个文件还没有读完呢,这个时候你收到了ctx.Done(),会导致最后数据没有采集完整;

for {
    select {
    case line, ok := <-tails.Lines:
        ...
    case <-ctx.Done():
        select {
        case line, ok := <-tails.Lines:
            ...
        default:
            return
        }
    }
}

在收到ctx.Done()的时候,继续读数据,如果读完了,就会运行default分支,就可以退出了。

GGXXLL
GGXXLL · #7 · 2年之前
breadHoodbreadHood #3 回复

#1楼 @GGXXLL 谢谢大佬, 这个方法可以的, 学习了!

@buguang01 说的也对,看你在不在意那点数据了。记得调用 defer tails.Stop(), 如果你采用 @buguang01 说的了,记得 ctx.Done 下面直接调用Stop,然后 for line := range tails.Lines ,不然文件一直在写的话,其实tails.Lines读不完。

breadHood
breadHood · #8 · 2年之前
GGXXLLGGXXLL #7 回复

#3楼 @breadHood @buguang01 说的也对,看你在不在意那点数据了。记得调用 `defer tails.Stop()`, 如果你采用 @buguang01 说的了,记得 `ctx.Done` 下面直接调用`Stop`,然后 `for line := range tails.Lines` ,不然文件一直在写的话,其实`tails.Lines`读不完。

是的, 谢谢大佬, 调试的时候发现了, 已加入tails.stop()

breadHood
breadHood · #9 · 2年之前
buguang01buguang01 #6 回复

#4楼 @breadHood 他那个方法中还会存在一个极限情况下的bug; 就是在你已在监听一个文件的时候,那个文件在某一刻写满了,然后系统开始写新的文件了;而你的程序可能第一个文件还没有读完呢,这个时候你收到了ctx.Done(),会导致最后数据没有采集完整; ```go for { select { case line, ok := <-tails.Lines: ... case <-ctx.Done(): select { case line, ok := <-tails.Lines: ... default: return } } } ``` 在收到ctx.Done()的时候,继续读数据,如果读完了,就会运行default分支,就可以退出了。

大佬们考虑非常细致, 感谢!

starlion
starlion · #10 · 2年之前

:bowtie:

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传