Golang:将日志以Json格式输出到Kafka

bosima · · 697 次点击 · · 开始浏览    

在上一篇文章中我实现了一个支持Debug、Info、Error等多个级别的日志库,并将日志写到了磁盘文件中,代码比较简单,适合练手。有兴趣的可以通过这个链接前往:https://github.com/bosima/ylog/releases/tag/v1.0.3 工程实践中,我们往往还需要对日志进行采集,将日志归集到一起,然后用于各种处理分析,比如生产环境上的错误分析、异常告警等等。在日志消息系统领域,Kafka久负盛名,这篇文章就以将日志发送到Kafka来实现日志的采集;同时考虑到日志分析时对结构化数据的需求,这篇文章还会提供一种输出Json格式日志的方法。 这个升级版的日志库还要保持向前兼容,即还能够使用普通文本格式,以及写日志到磁盘文件,这两个特性和要新增的两个功能分别属于同类处理,因此我这里对它们进行抽象,形成两个接口:格式化接口、写日志接口。 # 格式化接口 所谓格式化,就是日志的格式处理。这个日志库目前要支持两种格式:普通文本和Json。 为了在不同格式之上提供一个统一的抽象,ylog中定义 logEntry 来代表一条日志: ``` type logEntry struct { Ts time.Time `json:"ts"` File string `json:"file"` Line int `json:"line"` Level LogLevel `json:"level"` Msg string `json:"msg"` } ``` 格式化接口的能力就是将日志从logEntry格式转化为其它某种数据格式。ylog中对它的定义是: ```go type LoggerFormatter interface { Format(*logEntry, *[]byte) error } ``` 第1个参数是一个logEntry实例,也就是要被格式化的日志,第2个参数是日志格式化之后要写入的容器。 ## 普通文本格式化器 其实现是这样的: ```go type textFormatter struct { } func NewTextFormatter() *textFormatter { return &textFormatter{} } func (f *textFormatter) Format(entry *logEntry, buf *[]byte) error { formatTime(buf, entry.Ts) *buf = append(*buf, ' ') file := toShort(entry.File) *buf = append(*buf, file...) *buf = append(*buf, ':') itoa(buf, entry.Line, -1) *buf = append(*buf, ' ') *buf = append(*buf, levelNames[entry.Level]...) *buf = append(*buf, ' ') *buf = append(*buf, entry.Msg...) return nil } ``` 可以看到它的主要功能就是将logEntry中的各个字段按照某种顺序平铺开来,中间用空格分隔。 其中的很多数据处理方法参考了Golang标准日志库中的数据格式化处理代码,有兴趣的可以去Github中详细查看。 这里对日期时间格式化为字符串做了特别的优化,在标准日志库中为了将年、月、日、时、分、秒、毫秒、微秒等格式化指定长度的字符串,使用了一个函数: ```go func itoa(buf *[]byte, i int, wid int) { // Assemble decimal in reverse order. var b [20]byte bp := len(b) - 1 for i >= 10 || wid > 1 { wid-- q := i / 10 b[bp] = byte('0' + i - q*10) bp-- i = q } // i < 10 b[bp] = byte('0' + i) *buf = append(*buf, b[bp:]...) } ``` 其逻辑大概就是将数字中的每一位转换为字符并存入byte中,注意这里初始化byte数组的时候是20位,这是int64最大的数字位数。 其实时间字符串中的每个部分位数都是固定的,比如年是4位、月日时分秒都是2位,根本不需要20位,所以这个空间可以节省;还有这里用了循环,这对于CPU的分支预测可能有那么点影响,所以我这里分别对不同位数写了专门的格式化方法,以2位数为例: ```go func itoa2(buf *[]byte, i int) { q := i / 10 s := byte('0' + i - q*10) f := byte('0' + q) *buf = append(*buf, f, s) } ``` ## Json文本格式化器 其实现是这样的: ```go type jsonFormatter struct { } func NewJsonFormatter() *jsonFormatter { return &jsonFormatter{} } func (f *jsonFormatter) Format(entry *logEntry, buf *[]byte) (err error) { entry.File = toShortFile(entry.File) jsonBuf, err := json.Marshal(entry) *buf = append(*buf, jsonBuf...) return } ``` 代码也很简单,使用标准库的json序列化方法将logEntry实例转化为Json格式的数据。 对于Json格式,后续考虑支持用户自定义Json字段,这里暂时先简单处理。 # 写日志接口 写日志就是将日志输出到别的目标,比如ylog要支持的输出到磁盘文件、输出到Kafka等。 前边格式化接口将格式化后的数据封装到了 []byte 中,写日志接口就是将格式化处理的输出 []byte 写到某种输出目标中。参考Golang中各种Writer的定义,ylog中对它的定义是: ```go type LoggerWriter interface { Ensure(*logEntry) error Write([]byte) error Sync() error Close() error } ``` 这里有4个方法: * Ensure 确保输出目标已经准备好接收数据,比如打开要写入的文件、创建Kafka连接等等。 * Write 向输出目标写数据。 * Sync 要求输出目标将缓存持久化,比如写数据到磁盘时,操作系统会有缓存,通过这个方法要求缓存数据写入磁盘。 * Close 写日志结束,关闭输出目标。 ## 写日志到文件 这里定义一个名为fileWriter的类型,它需要实现LoggerWriter的接口。 先看类型的定义: ```go type fileWriter struct { file *os.File lastHour int64 Path string } ``` 包含四个字段: * file 要输出的文件对象。 * lastHour 按照小时创建文件的需要。 * Path 日志文件的根路径。 再看其实现的接口: ```go func (w *fileWriter) Ensure(entry *logEntry) (err error) { if w.file == nil { f, err := w.createFile(w.Path, entry.Ts) if err != nil { return err } w.lastHour = w.getTimeHour(entry.Ts) w.file = f return nil } currentHour := w.getTimeHour(entry.Ts) if w.lastHour != currentHour { _ = w.file.Close() f, err := w.createFile(w.Path, entry.Ts) if err != nil { return err } w.lastHour = currentHour w.file = f } return } func (w *fileWriter) Write(buf []byte) (err error) { buf = append(buf, '\n') _, err = w.file.Write(buf) return } func (w *fileWriter) Sync() error { return w.file.Sync() } func (w *fileWriter) Close() error { return w.file.Close() } ``` Ensure 中的主要逻辑是创建当前要写入的文件对象,如果小时数变了,先把之前的关闭,再创建一个新的文件。 Write 把数据写入到文件对象,这里加了一个换行符,也就是说对于文件日志,其每条日志最后都会有一个换行符,这样比较方便阅读。 Sync 调用文件对象的Sync方法,将日志从操作系统缓存刷到磁盘。 Close 关闭当前文件对象。 ## 写日志到Kafka 这里定义一个名为kafkaWriter的类型,它也需要实现LoggerWriter的接口。 先看其结构体定义: ```go type kafkaWriter struct { Topic string Address string writer *kafka.Writer batchSize int } ``` 这里包含四个字段: Topic 写Kafka时需要一个主题,这里默认当前Logger中所有日志使用同一个主题。 Address Kafka的访问地址。 writer 向Kafka写数据时使用的Writer,这里集成的是:github.com/segmentio/kafka-go,支持自动重试和重连。 batchSize Kafka写日志的批次大小,批量写可以提高日志的写效率。 再看其实现的接口: ```go func (w *kafkaWriter) Ensure(curTime time.Time) (err error) { if w.writer == nil { w.writer = &kafka.Writer{ Addr: kafka.TCP(w.Address), Topic: w.Topic, BatchSize: w.batchSize, Async: true, } } return } func (w *kafkaWriter) Write(buf []byte) (err error) { // buf will be reused by ylog when this method return, // with aysnc write, we need copy data to a new slice kbuf := append([]byte(nil), buf...) err = w.writer.WriteMessages(context.Background(), kafka.Message{Value: kbuf}, ) return } func (w *kafkaWriter) Sync() error { return nil } func (w *kafkaWriter) Close() error { return w.writer.Close() } ``` 这里采用的是异步发送到Kafka的方式,WriteMessages方法不会阻塞,因为传入的buf要被ylog重用,所以这里copy了一下。异步还会存在的一个问题就是不会返回错误,可能丢失数据,不过对于日志这种数据,没有那么严格的要求,也可以接受。 如果采用同步发送,因为批量发送比较有效率,这里可以攒几条再发,但日志比较稀疏时,可能短时间很难攒够,就会出现长时间等不到日志的情况,所以还要有个超时机制,这有点麻烦,不过我也写了一个版本,有兴趣的可以去看看:https://github.com/bosima/ylog/blob/main/examples/kafka-writer.go # 接口的组装 有了格式化接口和写日志接口,下一步就是将它们组装起来,以实现相应的处理能力。 首先是创建它们,因为我这里也没有动态配置的需求,所以就放到创建Logger实例的时候了,这样比较简单。 ```go func NewYesLogger(opts ...Option) (logger *YesLogger) { logger = &YesLogger{} ... logger.writer = NewFileWriter("logs") logger.formatter = NewTextFormatter() for _, opt := range opts { opt(logger) } ... return } ``` 可以看到默认的formatter是textFormatter,默认的writer是fileWriter。这个函数传入的Option其实是个函数,在下边的opt(logger)中会执行它们,所以使用其它的Formatter或者Writer可以这样做: ```go logger := ylog.NewYesLogger( ... ylog.Writer(ylog.NewKafkaWriter(address, topic, writeBatchSize)), ylog.Formatter(ylog.NewJsonFormatter()), ) ``` 这里 ylog.Writer 和 ylog.Formatter 就是符合Option类型的函数,调用它们可以设置不同的Formatter和Writer。 然后怎么使用它们呢? ```go ... l.formatter.Format(entry, &buf) l.writer.Ensure(entry) err := l.writer.Write(buf) ... ``` 当 logEntry 进入消息处理环节后,首先调用formatter的Format方法格式化logEntry;然后调用了writer的Ensure方法确保writer已经准备好,最后调用writer的Write方法将格式化之后的数据输出到对应的目标。 为什么不将Ensure方法放到Write中呢?这是因为目前写文本日志的时候需要根据logEntry中的日志时间创建日志文件,这样就需要给Writer传递两个参数,有点别扭,所以这里将它们分开了。 # 如何提高日志处理的吞吐量 Kafka的吞吐量是很高的,那么如果放到ylog自身来说,如何提高它的吞吐量呢? 首先想到的就是Channel,可以使用有缓冲的Channel模拟一个队列,生产者不停的向Channel发送数据,如果Writer可以一直在缓冲被填满之前将数据取走,那么理论上说生产者就是非阻塞的,相比同步输出到某个Writer,没有直接磁盘IO、网络IO,日志处理的吞吐量必将大幅提升。 定义一个Channel,其容量默认为当前机器逻辑处理器的数量: ```go logger.pipe = make(chan *logEntry, runtime.NumCPU()) ``` 发送数据的代码: ```go entry := &logEntry{ Level: level, Msg: s, File: file, Line: line, Ts: now, } l.pipe <- entry ``` 接收数据的代码: ```go for { select { case entry := <-l.pipe: // reuse the slice memory buf = buf[:0] l.formatter.Format(entry, &buf) l.writer.Ensure(entry.Ts) err := l.writer.Write(buf) ... } } ``` 实际效果怎么样呢?看下Benchmark: ```go goos: darwin goarch: amd64 pkg: github.com/bosima/ylog cpu: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz BenchmarkInfo-8 1332333 871.6 ns/op 328 B/op 4 allocs/op ``` 这个结果可以和zerolog、zap等高性能日志库一较高下了,当然目前可以做的事情要比它们简单很多。 如果对Java有所了解的同学应该听说过log4j,在log4j2中引入了一个名为Disruptor的组件,它让日志处理飞快了起来,受到很多Java开发者的追捧。Disruptor之所以这么厉害,是因为它使用了无锁并发、环形队列、缓存行填充等多种高级技术。 相比之下,Golang的Channel虽然也使用了环形缓冲,但是还是使用了锁,作为队列来说性能并不是最优的。 Golang中有没有类似的东西呢?最近出来的ZenQ可能是一个不错的选择,不过看似还不太稳定,过段时间再尝试下。有兴趣的可以去看看:https://github.com/alphadose/ZenQ 。 --- 好了,以上就是本文的主要内容。关于ylog的介绍也告一段落了,后续会在Github上持续更新,增加更多有用的功能,并不断优化处理性能,欢迎关注:https://github.com/bosima/ylog 。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

697 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传