日志采集项目之logagent开发(一)
项目结构
-
项目分为如下部分:
-
logagent
- conf: 配置文件
- kafka: kafka集成模块
- tailf: 日志读取模块
- main.go: 程序入口
- xlog: 日志打印模块,参考https://blog.51cto.com/13812615/2490744
- oconfig: 配置文件解析模块,参考:https://blog.51cto.com/13812615/2492150
-
logCollect/
├── logagent
│ ├── conf
│ │ └── config.ini
│ ├── kafka
│ │ └── kafka.go
│ ├── logs
│ │ └── logagent.log
│ ├── main.go
│ └── tailf
│ └── tail.go
├── oconfig
│ └── config.go
└── xlog
├── console.go
├── file.go
├── level.go
├── log.go
├── log_base.go
└── tool.go
logagent代码:
- config/config.ini
[kafka]
address=192.168.56.11:9092
queue_size=10000
[collect_log_conf]
log_filenames=/Users/wanghui/go/src/oldBoy/day11/my.log
[logs]
#level类型有debug,info,trace,warn,error,fatal
log_level=debug
filename=./logs/logagent.log
#log_type=file,console
log_type=console
module=logagent
- main.go
package main
import (
"fmt"
"oldBoy/logagent/kafka"
"oldBoy/logagent/tailf"
"oldBoy/oconfig"
"oldBoy/xlog"
"strings"
)
var (
appConfig AppConfig
)
// 配置文件结构体
type AppConfig struct {
KafkaConf KafkaConfig `ini:"kafka"`
CollectLogConf CollectLogConfig `ini:"collect_log_conf"`
LogConf LogConfig `ini:"logs"`
}
type KafkaConfig struct {
Address string `ini:"address"`
QueueSize int `ini:"queue_size"`
}
type CollectLogConfig struct {
LogFilenames string `ini:"log_filenames"`
}
type LogConfig struct {
LogLevel string `ini:"log_level"`
Filename string `ini:"filename"`
LogType string `ini:"log_type"`
Module string `ini:"module"`
}
func initConfig(filename string) (err error) {
err = oconfig.UnMarshalFile(filename,&appConfig)
if err != nil {
return
}
//打印配置文件内容
xlog.LogDebug("config:%#v",appConfig)
return
}
func initLog() (err error) {
var logType int
var level int
//转换格式
if appConfig.LogConf.LogType == "console" {
logType = xlog.XLogTypeConsole
}else {
logType = xlog.XLogTypeFile
}
switch appConfig.LogConf.LogLevel {
case "debug":
level = xlog.XLogLevelDebug
case "trace":
level = xlog.XLogLevelTrace
case "info":
level = xlog.XLogLevelInfo
case "warn":
level = xlog.XLogLevelWarn
case "error":
level = xlog.XLogLevelError
case "fatal":
level = xlog.XLogLevelFatal
default:
level = xlog.XLogLevelDebug
}
//初始化日志库
err = xlog.Init(logType,level,appConfig.LogConf.Filename,appConfig.LogConf.Module)
return
}
func run() (err error) {
//1. 从日志文件读取日志
for {
line,err := tailf.ReadLine()
if err != nil {
continue
}
xlog.LogDebug("line:%s",line.Text)
//2. 发送日志数据到kafka
msg := &kafka.Message{
Line: line.Text,
Topic: "nginx_log",
}
err = kafka.SendLog(msg)
if err != nil {
xlog.LogWarn("send log to kafka faild,err:%v",err)
}
xlog.LogDebug("send to kafka succ")
}
return
}
func main() {
//初始化配置
err := initConfig("./conf/config.ini")
if err != nil {
panic(fmt.Sprintf("init config faild,err:%v",err))
}
//初始化日志库
err = initLog()
if err != nil{
panic(fmt.Sprintf("init logs faild,err:%v",err))
}
xlog.LogDebug("init log success")
//kafka初始化
address := strings.Split(appConfig.KafkaConf.Address,",")
err = kafka.Init(address,appConfig.KafkaConf.QueueSize)
if err != nil {
panic(fmt.Sprintf("init kafka faild,err:%v",err))
}
xlog.LogDebug("init kafka succ")
//tail初始化
err = tailf.Init(appConfig.CollectLogConf.LogFilenames)
if err != nil {
panic(fmt.Sprintf("init tail faild,err:%v",err))
}
xlog.LogDebug("init tailf succ")
err = run()
if err != nil {
xlog.LogError("run faild,err:%v",err)
return
}
xlog.LogDebug("run finished\n")
}
- kafka/kafka.go
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
"oldBoy/xlog" //这个取决于goPath的配置
)
var (
client sarama.SyncProducer
msgChan chan *Message
)
type Message struct {
Line string
Topic string
}
func Init(address []string,chanSize int) (err error){
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//连接配置
client,err = sarama.NewSyncProducer(address,config)
if err != nil {
xlog.LogError("send message faild,error:%v",err)
return
}
msgChan = make(chan *Message,chanSize)
go SendKafka()
return
}
func SendKafka() {
//从管道获取数据,并发送出去
for msg := range msgChan {
kafkaMsg := &sarama.ProducerMessage{}
kafkaMsg.Topic = msg.Topic
kafkaMsg.Value = sarama.StringEncoder(msg.Line)
//发送数据
pid,offset,err := client.SendMessage(kafkaMsg)
if err != nil {
xlog.LogError("send message faild,err:%v",err)
continue //持续发日志
}
xlog.LogDebug("pid:%v,offset:%v",pid,offset)
}
}
func SendLog(msg *Message) (err error) {
if len(msg.Line) == 0 {
//过滤空行
return
}
select {
case msgChan <- msg:
default:
err = fmt.Errorf("chan is full")
}
return
}
- tailf/tail.go
package tailf
import (
"fmt"
"github.com/hpcloud/tail"
"oldBoy/xlog"
)
var (
tailObj *tail.Tail
)
func Init(filename string) (err error) {
tailObj,err = tail.TailFile(filename,tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2,},
MustExist: true,
Poll: true,
})
if err != nil {
xlog.LogError("tail file error:%v",err)
return
}
return
}
func ReadLine() (msg *tail.Line,err error) {
var ok bool
msg,ok = <- tailObj.Lines
if !ok {
err = fmt.Errorf("read line faild")
return
}
return
}
有疑问加站长微信联系(非本文作者)