prometheus tsdb部分源码阅读1.0

zwb_f754 · · 1051 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

从两周前我就开始阅读prometheus时序数据库存储部分。此前我采用了顺序阅读的方式,虽然这种方式遵循着代码执行顺序,但经常由于不理解各模块的作用,以及各模块调用过程又比较复杂,整个阅读过程困难重重。必须改变阅读方式,模块化地阅读代码,或者才是正道。也决心写点文章,记录下阅读代码的过程。
今天首先阅读的wal.go的代码,这部分代码是WriteAheadLog部分的代码实现。

// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
type WAL interface {
    Reader() WALReader
    LogSeries([]RefSeries) error
    LogSamples([]RefSample) error
    LogDeletes([]Stone) error
    Truncate(mint int64, keep func(uint64) bool) error
    Close() error
}

该接口有两个实现noWAL和SegmentWAL。noWAL即什么也不干的wal,所有方法几乎都是直接return,重点来看下SegmentWAL。

// SegmentWAL is a write ahead log for series data.
type SegmentWAL struct {
    mtx     sync.Mutex
    metrics *walMetrics

    dirFile *os.File
    files   []*segmentFile

    logger        log.Logger
    flushInterval time.Duration
    segmentSize   int64

    crc32 hash.Hash32
    cur   *bufio.Writer
    curN  int64

    stopc   chan struct{}
    donec   chan struct{}
    actorc  chan func() error // sequentialized background operations
    buffers sync.Pool
}

先不要去管这些结构体成员, 等后面方法中用到了再去理解,这里贴图只是为了后面代码提到的时候方便翻阅。首先来看看SegementWAL实现的func (w *SegmentWAL)LogSeries(series []RefSeries)error 方法。这个方法接收一个slice,类型为结构体RefSeries,看名字,这个结构体似乎是用户指向一个Series,下图给出了这个结构体的定义。

// RefSeries is the series labels with the series ID.
type RefSeries struct {
    Ref    uint64
    Labels labels.Labels
}

结构很简单,由一个uint64和lables.labels组成。Labels是时序数据的标签,其实质是KV组成的数组,prometheus里面将时序数据的metric也包含在Labels里面,即name: $name的形式。也就是说Lables能够唯一地表示一条时间序列。uint64则是这条时间序列的id。也就是说LogSeries方法就是存储标签名称以及id使用的。接下来看看这个方法是如何实现的。

// LogSeries writes a batch of new series labels to the log.
// The series have to be ordered.
func (w *SegmentWAL) LogSeries(series []RefSeries) error {
    //获得一个buffer,至于buffer如何实现,后面再表
    buf := w.getBuffer()
    //将series存储到buffer,返回一个uint8类型的flag
    flag := w.encodeSeries(buf, series)
    //加锁,读写可能会竞争
    w.mtx.Lock()
    defer w.mtx.Unlock()
    //将buf内的内容写入到xx
    err := w.write(WALEntrySeries, flag, buf.get())

    w.putBuffer(buf)

    if err != nil {
        return errors.Wrap(err, "log series")
    }

    tf := w.head()

    for _, s := range series {
        if tf.minSeries > s.Ref {
            tf.minSeries = s.Ref
        }
    }
    return nil
}

这里用到了几个SegmentWAL结构体成员变量,首先是

type SegmentWAL struct {
    ......
    buffers sync.Pool
}

sync.Pool是官方的包,这里给出一段介绍

众所周知,go是自动垃圾回收的(garbage collector),这大大减少了程序编程负担。但gc是一把双刃剑,带来了编程的方便但同时也增加了运行时开销,使用不当甚至会严重影响程序的性能。因此性能要求高的场景不能任意产生太多的垃圾(有gc但又不能完全依赖它挺恶心的),如何解决呢?那就是要重用对象了,我们可以简单的使用一个chan把这些可重用的对象缓存起来,但如果很多goroutine竞争一个chan性能肯定是问题.....由于golang团队认识到这个问题普遍存在,为了避免大家重造车轮,因此官方统一出了一个包Pool。原文:https://blog.csdn.net/yongjian_lian/article/details/42058893

简单地说,就是缓存对象, 不会被gc清理掉。需要用的时候,再取出来。这里重用的对象或者说结构体是encbuf,是一个结构体,

// encbuf is a helper type to populate a byte slice with various types.
type encbuf struct {
    b []byte
    c [binary.MaxVarintLen64]byte
}

encbuf使用了binary包,简单的数字与字节序列的转换以及变长值的编解码,prometheus采用了BigEndian方式进行编解码。下面来看下encodeSeries方法

func (w *SegmentWAL) encodeSeries(buf *encbuf, series []RefSeries) uint8 {
    for _, s := range series {
        buf.putBE64(s.Ref)
        buf.putUvarint(len(s.Labels))

        for _, l := range s.Labels {
            buf.putUvarintStr(l.Name)
            buf.putUvarintStr(l.Value)
        }
    }
    return walSeriesSimple
}

对于多组RefSeries,首先写入series的RefId,然后用变长编码写入labels个数,最后用变长字符串的形式分别写入标签的名字和值。
写到buf里面之后,调用SegmentWAL的write方法

func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
    // Cut to the next segment if the entry exceeds the file size unless it would also
    // exceed the size of a new segment.
    // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.
    var (
        sz    = int64(len(buf)) + 6
        newsz = w.curN + sz
    )
    // XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened.
    // Probably fine in general but may yield a lot of short files in some cases.
    if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
        if err := w.cut(); err != nil {
            return err
        }
    }
    n, err := w.writeTo(w.cur, w.crc32, t, flag, buf)

    w.curN += int64(n)

    return err
}

func (w *SegmentWAL) writeTo(wr io.Writer, crc32 hash.Hash, t WALEntryType, flag uint8, buf []byte) (int, error) {
    if len(buf) == 0 {
        return 0, nil
    }
    crc32.Reset()
    wr = io.MultiWriter(crc32, wr)

    var b [6]byte
    b[0] = byte(t)
    b[1] = flag

    binary.BigEndian.PutUint32(b[2:], uint32(len(buf)))

    n1, err := wr.Write(b[:])
    if err != nil {
        return n1, err
    }
    n2, err := wr.Write(buf)
    if err != nil {
        return n1 + n2, err
    }
    n3, err := wr.Write(crc32.Sum(b[:0]))

    return n1 + n2 + n3, err
}

首先来看一下一个结构体成员

type SegmentWAL struct {
    ......
    cur   *bufio.Writer
    curN  int64
    segmentSize   int64 // default 256 * 1024 * 1024 // 256 MB
}

先将这个bufio.Writer如何创建的过程暂且不表,只要知道这是一个bufio.Writer类型的即可。回到SegmentWAL的write方法中。这个方法主要执行的是w.writeTo()方法,但是当条件一成立的时候,先执行w.cut()方法。这个条件解释如下:

w.cur为nil
m.curN超过了最大的segmentSize,
加上新数据的长度超过segmentSize且新数据规模小于segmentSize

只要符合其中一个条件,就会执行w.cut(),接着再执行wr.WriteTo()方法。在wr.WriteTo()方法中主要有几个知识点:

crc32包实现了32位循环冗余校验(CRC-32)的校验和算法,参见:http://en.wikipedia.org/wiki/Cyclic_redundancy_check
wr = io.MultiWriter(crc32, wr), 类似于linux tee命令,详见http://man.linuxde.net/tee
写入到wr中的是这么五段 WALEntryType类型+ flag + buf长度 + buf本身 + 校验和
返回写入的总长度


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:zwb_f754

查看原文:prometheus tsdb部分源码阅读1.0

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1051 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传