聊聊golang的zap的ZapKafkaWriter

codecraft · · 489 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

本文主要研究一下golang的zap的ZapKafkaWriter

ZapKafkaWriter

package logger

import (
    "errors"
    "sync"
    "sync/atomic"
    "syscall"
)

// ZapKafkaWriter is a zap WriteSyncer (io.Writer) that writes messages to Kafka
type ZapKafkaWriter struct {
    kp        *KafkaProducer
    ce        *CloudEvents
    closed    int32          // Nonzero if closing, must access atomically
    pendingWg sync.WaitGroup // WaitGroup for pending messages
    closeMut  sync.Mutex
}

// newZapKafkaWriter returns a kafka io.writer instance
func newZapKafkaWriter(
    kpCfg ProducerConfiguration, cloudEvents *CloudEvents,
    ceCfg CloudEventsConfiguration) (*ZapKafkaWriter, error) {

    // create an async producer
    kp, err := newKafkaProducer(kpCfg, cloudEvents, ceCfg)
    if err != nil {
        return nil, err
    }

    zw := &ZapKafkaWriter{
        kp: kp,
        ce: cloudEvents,
    }
    return zw, nil
}
ZapKafkaWriter定义了KafkaProducer、CloudEvents、closed、pendingWg、closeMut属性,其newZapKafkaWriter方法根据ProducerConfiguration、cloudEvents、CloudEventsConfiguration来创建KafkaProducer,然后根据KafkaProducer来创建ZapKafkaWriter

zapcore.WriteSyncer

// Sync satisfies zapcore.WriteSyncer interface, zapcore.AddSync works as well
func (zw *ZapKafkaWriter) Sync() error {
    return nil
}

// Write sends byte slices to Kafka ignoring error responses (Thread-safe)
// Write might block if the Input() channel of the AsyncProducer is full
func (zw *ZapKafkaWriter) Write(msg []byte) (int, error) {
    if zw.Closed() {
        return 0, syscall.EINVAL
    }

    if zw.kp.producer == nil {
        return 0, errors.New("No producer defined")
    }

    zw.pendingWg.Add(1)
    defer zw.pendingWg.Done()

    err := zw.kp.sendMessage(msg)
    return len(msg), err
}

// Closed returns true if the writer is closed, false otherwise (Thread-safe)
func (zw *ZapKafkaWriter) Closed() bool {
    return atomic.LoadInt32(&zw.closed) != 0
}

// Close must be called when the writer is no longer needed (Thread-safe)
func (zw *ZapKafkaWriter) Close() (err error) {
    zw.closeMut.Lock()
    defer zw.closeMut.Unlock()

    if zw.Closed() {
        return syscall.EINVAL
    }

    atomic.StoreInt32(&zw.closed, 1)

    zw.pendingWg.Wait()
    return nil
}
ZapKafkaWriter实现了zapcore.WriteSyncer接口,其Write方法使用KafkaProducer发送消息,其Sync方法目前不做任何操作,它还提供了Close方法,也就是也实现了Sink接口

小结

WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。

doc


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:codecraft

查看原文:聊聊golang的zap的ZapKafkaWriter

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

489 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传