golang新手, 最近在写个练手的小项目, 需求是收集指定日志目录下最新的日志文件, 通过正则过滤日志内容, 把想要的信息存到一个map里.
但是这个程序在实际运行中我发现, 由于在catLog包里读取文件内容时里面我必须使用for死循环实现实时读取文件, 导致在getNewFile包里监控到的系统事件信息无法发送给catLog包,这样如果有新的日志文件产生, 它无法切换文件. 于是我在getNewFile包调用catLog包时启用goroutine, 这样就可以收到通知了.
结果导致每次读取文件新内容时, 程序都会启动一个新的goroutine调用catLog包, 并且不会自动退出,因为catLog包里是for死循环, 一直在累积增加.
请大佬们帮忙看下下面的代码, 该怎么修改可以控制goroutine增长, 并且还能实时监控最新的日志文件.
```golang
package getNewFile
import (
"fmt"
"log"
"regexp"
catlog "study/prometheus/exporter_watcherLog/catLog"
"time"
"github.com/fsnotify/fsnotify"
)
// 实时监控指定目录下文件并读取
func GetNewFile(pattren, logDir string) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
fmt.Println(err)
return
}
defer watcher.Close()
done := make(chan bool)
re := regexp.MustCompile(pattren) // Compile 解析一个正则表达式,如果成功则返回一个可用于匹配文本的 Regexp 对象。如错误会panic
go func() {
for {
select {
case event := <-watcher.Events:
isMatch := re.MatchString(event.Name)
if isMatch {
if event.Op&fsnotify.Create == fsnotify.Create { // 监控创建文件动作
fmt.Println(event.Name)
go catlog.DealLog(event.Name)
}
if event.Op&fsnotify.Write == fsnotify.Write { // 监控写入文件动作
fmt.Println(event.Name)
go catlog.DealLog(event.Name)
}
}
case err := <-watcher.Errors:
log.Println(err)
}
time.Sleep(time.Second * 1)
}
}()
err = watcher.Add(logDir) // 监控指定目录
if err != nil {
log.Fatal(err)
}
<-done
}
```
```golang
package catlog
import (
"fmt"
"regexp"
"runtime"
"time"
"sync"
"github.com/hpcloud/tail"
)
var AllAddrLoglist map[string]ClassLogInfo
var Sm sync.Map
var Notify = false
type ClassLogInfo struct {
Host string
Time string
Date string
Cfc string
}
// 实时读取文件内容, 并筛选出host, date, time, cfc存入map中
func DealLog(file string) {
config := tail.Config{
ReOpen: true, // 重新打开
Follow: true, // 是否跟随
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件哪个地方开始读
MustExist: false, // 文件不存在报错
Poll: true,
// Logger: tail.DiscardingLogger, // 禁用日志记录
}
tails, err := tail.TailFile(file, config) // 打开文件, 并用上面的配置
if err != nil {
fmt.Printf("tail file failed, err: %v\n", err)
return
}
var (
line *tail.Line
ok bool
)
ch := make(chan struct{}, 1) // 创建缓冲区大小, 控制并发, 最多发送x个消息就阻塞
// 正则匹配, 最后只筛选出host;date;time;cfc, 其余的跳过.
pattern := `(^[0-9]+/[0-9]+/[0-9]+)\s+([0-9]+:[0-9]+:[0-9]+:[0-9]+)\s+[A-Z]+\s+\[.*\]\s+INFO\s+LSG.*LSG\s+svr\s+update:\s+cfc=([0-9]+),cpfc=[0-9]+,clc=[0-9]+,cplc=[0-9]+,load=[0-9]+,status=[0-9]+,addr=(\d+\.\d+\.\d+\.\d+-\d),inst=\d+\.\d+\.\d+\.\d+-\d`
// re := regexp.MustCompile(pattern) // 两个唯一区别, 如错误直接panic
re, err := regexp.Compile(pattern) // Compile 解析一个正则表达式,如果成功则返回一个可用于匹配文本的 Regexp 对象。如错误会返回一个错误
if err != nil {
fmt.Println(err)
}
// 读取每行数据,最后写入到AllAddrLoglist中.
for {
ch <- struct{}{} // 写入消息到缓冲区
line, ok = <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename: %s\n", tails.Filename)
time.Sleep(time.Second)
continue
}
matchArr := re.FindStringSubmatch(line.Text)
if matchArr != nil { // 新增判断,替代re.MatchString(line.Text),原有的影响性能
// 信息存入结构体
testI := ClassLogInfo{
Host: matchArr[4],
Date: matchArr[1],
Time: matchArr[2],
Cfc: matchArr[3],
}
// 利用sync.map, 直接使用, 无需声明, 避免读写同一个map冲突
Sm.Store(matchArr[4], testI)
<-ch // 从缓冲区读取消息
}
}
}
```
他那个方法中还会存在一个极限情况下的bug;
就是在你已在监听一个文件的时候,那个文件在某一刻写满了,然后系统开始写新的文件了;而你的程序可能第一个文件还没有读完呢,这个时候你收到了ctx.Done(),会导致最后数据没有采集完整;
```go
for {
select {
case line, ok := <-tails.Lines:
...
case <-ctx.Done():
select {
case line, ok := <-tails.Lines:
...
default:
return
}
}
}
```
在收到ctx.Done()的时候,继续读数据,如果读完了,就会运行default分支,就可以退出了。
#6
更多评论
简单改一下,不知道理解的对不对哈,始终只监听最新的文件,所以 `eventName` 每次替换
```go
go func() {
var ctx,cancel = context.WithCancel(context.Background())
var eventName string
for {
select {
case event := <-watcher.Events:
// 如果是同一文件,跳过
if eventName == event.Name {
continue
}
isMatch := re.MatchString(event.Name)
if isMatch {
// 更换文件了,cancel 上次的 go DealLog
cancel()
// 文件名更新
eventName = event.Name
if event.Op&fsnotify.Create == fsnotify.Create { // 监控创建文件动作
fmt.Println(event.Name)
// 重新赋值
ctx, cancel = context.WithCancel(context.Background())
go DealLog(ctx, event.Name)
}
if event.Op&fsnotify.Write == fsnotify.Write { // 监控写入文件动作
fmt.Println(event.Name)
ctx, cancel = context.WithCancel(context.Background())
go DealLog(ctx, event.Name)
}
}
case err := <-watcher.Errors:
log.Println(err)
}
time.Sleep(time.Second * 1)
}
}()
```
```go
// 读取每行数据,最后写入到AllAddrLoglist中.
for {
select {
case line, ok = <-tails.Lines:
....
case <-ctx.Done():
return
}
}
```
#1
从楼主的问题中我理解,你当前只希望监听一个文件,只是这个文件随时可能会更换新的;同时新的内容可能是写在新文件中,也可能是旧文件的修改;
但是我看到楼主使用的是tail进行文件的读取,其实存在一个问题,就是对方文件都写完了,你再tail其实会丢失一些数据;
一般第三方的日志系统都是写同一个文件,然后文件写到一定大小的时候,修改文件名字,然后再创建一个原来的文件;
这样监听文件的系统就不用频繁换文件了;
---
但如果只是就楼主的问题进行探讨的话,我先说一下,楼主当前程序的几个问题:
1. DealLog函数的ch 是没有意义的。因为他是局部变量不会起到锁的作用;
2. 假设你理解的是对的,这样使用tail不会丢数据,那也不应该是每收到一个event就开一个协程,而是在收到event的时候,直接把那个文件的数据读到文件结束,然后保存到你的map中,然后继续等一次的event过来;
因为watcher底层开着一个协程监听你的目录中所有文件的变化;每次收到变化的时候,他就会发一个event给你;你直接处理就好了。
总结:如果是这样修改,你不会有开多个协程,也就不存在楼主关于协程泄漏的问题了;
---
再假设楼主的环境是可能同时多个文件被写入,希望可以并发的获取数据;方案如下:
1. 首先程序启动的时候,应该对目录下所有的文件,启动协程进行tail;
2. 然后watcher目录是否有新的文件生成,如果有,就为此启动协程进行tail;
总结:如果是这样修改,也就是需求需要一个文件一个协程。你的额外的协程数就是你的目录下的文件数,且还都不能关闭;
#2