Loki被限流了,Limits_Config到底限了个啥?

云原生小白 · · 960 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

road-690347_1920.jpg

Loki中拥有这众多的limit策略,有的已经开放到配置文件中,还有的配置代码中已经实现但还没开放出来。大部分情况下开发者给了出一些默认参数足够优秀,不过有的时候我们也不免需要微调。那么小白这次先简单捡几个比较重要的策略来说明下Limits_Config中到底限制了什么。

1. 限流器

小白前段时间无意间重启升级了下Loki的服务,由于过程持续了一段时间,当服务恢复时客户端在push日志时总是会收到如下的报错,

image.png
429 Too Many Requests Ingestion rate limit exceeded (limit 10485760 bytes/sec)

这直接导致我们部分服务器上的日志采集器缓冲区一直处于拥堵,引起日志采集的延迟。

看起来这个是触发了Loki的限流策略了,查了下官方文档,发现在limits_config中两个参数控制Loki Distributor的日志接收的速率:

limits_config:
  #令牌桶注入token的速率
  ingestion_rate_mb: | default = 4]
 
  #令牌桶的容量
  ingestion_burst_size_mb: | default = 6]

经过摸索发现Loki的Distributor中关于限速的方法也是采用Golang标准库的time/rate限流器来实现的。它的大致逻辑如下:

首先distributor处理日志push请求时声明了protobuf的编码,其中可以包括多个日志流,以及每个流里面的label信息和Entry中的日志时间戳和条目信息。


type PushRequest struct {
    Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"`
}

type Stream struct {
    Labels  string  `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
    Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"`
}

type Entry struct {
    Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
    Line      string    `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
}

从日志流的Entry中取出日志条目并计算长度得到validatedSamplesSize,之后将本次日志的长度传给限流器

func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {

    for _, stream := range req.Streams {
        if err := d.validator.ValidateLabels(userID, stream); err != nil {
            validationErr = err
            continue
        }

        entries := make([]logproto.Entry, 0, len(stream.Entries))
        for _, entry := range stream.Entries {
            if err := d.validator.ValidateEntry(userID, stream.Labels, entry); err != nil {
                validationErr = err
                continue
            }
            entries = append(entries, entry)
            validatedSamplesSize += len(entry.Line)
            validatedSamplesCount++
        }

获取当前时间,以及将租户ID和validatedSamplesSize传给ingestionRateLimiter.AllowN限流器处理,如果限流成功则忽略这次提交,同时打印上述截图中的报错,

now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) {
    validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount))
    validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
    //抛出429 Too Many Requests异常
    return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize))
    }

接下来就是具体的限流器内的处理,我们看到Distributor主要通过RateLimiter.AllowN的方式获取令牌。也就是说截止当前时间,判断限流器桶中令牌数与日志条目容量数N(validatedSamplesSize),满足则返回true,同时从桶中消费N个令牌,反正则返回失败。这部分我们可以在这里面找到

https://github.com/cortexproject/cortex/blob/master/pkg/util/limiter/rate_limiter.go

到这里,我们需要改变的就是令牌桶的大小和速率即可,也就是一开始limits_config中的两个参数。我们将其适当调大就可以立马得到效果:

limits_config:
  ingestion_rate_mb: 32
  ingestion_burst_size_mb: 64
image.png

可以看到在调整了rate和burst之后,绿色框框内的客户端的日志缓冲区很快的就被消费清空了。

扩展

除此之外,Loki的限流器还有一个全局的动态配置

limits_config:
  ingestion_rate_strategy: | default = "local"

默认情况下为local,如果你的Loki是一个分布式系统话,local会将上述的限流器的令牌桶作用在每个distributor中,这会极大提高Loki日志收取的吞吐量。如果配置的为global的话,distributor则会用
ingestion_rate_mb / ring.HealthyInstancesCount得到全局的每个distributor的租户限制速率。这部分代码在ingestion_rate_strategy.go得以实现

type globalStrategy struct {
    limits *validation.Overrides
    ring   ReadLifecycler
}

func newGlobalIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy {
    return &globalStrategy{
        limits: limits,
        ring:   ring,
    }
}

func (s *globalStrategy) Limit(userID string) float64 {
    numDistributors := s.ring.HealthyInstancesCount()

    if numDistributors == 0 {
        return s.limits.IngestionRateBytes(userID)
    }

    return s.limits.IngestionRateBytes(userID) / float64(numDistributors)
}

根据租户开启全局的日志限速策略,目前应该还没有人用于实践吧

总结:

限流器是后台服务中非常重要的一个组件,它可以通过限制请求数或流量的方式来保护后台服务避免过载,令牌桶是一个常见的实现方法。Loki显然在这里是利用令牌桶的方式来对日志流的容量进行限制。

更多关于Golang标准库中time/rate的使用,可以查考https://godoc.org/golang.org/x/time/rate

2. 标签长度限制

Loki的Limit_Config关于长度的限制包括对label键值对的大小限制等,其主要包含以下几个:

limits_config:
  # label的key最大长度
  max_label_name_length:   | default = 1024
 
  # label的value最大长度
  max_label_value_length:  | default = 2048
 
  # 每个流中的最大label个数
  max_label_names_per_series: | default = 30

标签限制的这部分代码在distributor的validator.go中,比如在这里计算labels内的键值对个数,超过配置最大数则放弃同时返回BadRequest

ls, err := util.ToClientLabels(stream.Labels)
numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
   validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
   bytes := 0
   for _, e := range stream.Entries {
       bytes += len(e.Line)
   }
   validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Add(float64(bytes))
   return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(cortex_client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, v.MaxLabelNamesPerSeries(userID)))
   }

比如下面这里distributor会取出labels中的name和value的长度分别和配置中的最大长度进行对比,以及对label中的name进行比较,防止重名。

maxLabelNameLength := v.MaxLabelNameLength(userID)
maxLabelValueLength := v.MaxLabelValueLength(userID)
lastLabelName := ""
for _, l := range ls {
   if len(l.Name) > maxLabelNameLength {
       updateMetrics(validation.LabelNameTooLong, userID, stream)
       return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name))       } else if len(l.Value) > maxLabelValueLength {
       updateMetrics(validation.LabelValueTooLong, userID, stream)
       return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value))
   } else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
       updateMetrics(validation.DuplicateLabelNames, userID, stream)
       return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name))
   }
   lastLabelName = l.Name
}

总结:

loki中对于日志标签的长度限制并没有特别的强调,不过秉着无界的label会拖垮Loki性能的准则上,我们不宜将max_label_names_per_series配置得过大

3. 租户限制

默认情况下我们部署的Loki都只有一个租户,它使用的是一个叫fake的租户id。那么在Loki的限制策略中,涉及到租户的包含如下几个参数:

limits_config:
  # 每个租户的最大日志流个数
  max_streams_per_user: | default = 10000
  
  #启用全局的租户最大日志流个数,默认0关闭
  #一旦配置,每个租户日志流将有ingester注册到hash环上同时状态为
  #HEALTH的个数动态计算得出,任何ingester的数量变化都会动态生效到这个值
  max_global_streams_per_user:  default = 0

在这里我们可以看到max_streams_per_user的local和global之间的计算方法,算法如下

(max_global_streams_per_user / max_streams_per_user) * replication_factor

复制因子的配置来至ingest_config.replication_factor

以下是limiter.go中对于全局最大日志流数的代码实现:

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
    if globalLimit == 0 {
        return 0
    }
    //从环中得到健康的ingester数
    numIngesters := l.ring.HealthyInstancesCount()
    if numIngesters > 0 {
    
    //全局的最大值除以健康的ingester个数,再乘个复制因子
        return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
    }
    return 0
}

在AssertMaxStreamsPerUser函数中简单处理local和global的限制数据后,对日志流个数判断,低于计算后的limit则返回空,否则抛出超过租户最大限制数的异常

func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
    localLimit := l.limits.MaxLocalStreamsPerUser(userID)

    globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
    //根据上述算法计算出全局的限制
    adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit)

   //当adjustedGlobalLimit不为0且localLimit大于adjustedGlobalLimit时
   //calculatedLimit就为动态生成的limit,反之则为localLimit的限制数
    calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit)
    
    if calculatedLimit == 0 {
        calculatedLimit = math.MaxInt32
    }

   //如果日志流小于calculatedLimit则返回空,否则返回超过租户日志流个数异常
    if streams < calculatedLimit {
        return nil
    }
    return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
}

func (l *Limiter) minNonZero(first, second int) int {
    if first == 0 || (second != 0 && first > second) {
        return second
    }
    return first
}

在ingester的instance.go中,loki对调用上述AssertMaxStreamsPerUser方法来判断租户流的限制,如果返回不为空,则放弃本次处理,并抛出TooManyRequests异常

err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
    validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
    bytes := 0
    for _, e := range pushReqStream.Entries {
        bytes += len(e.Line)
    }
    validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
    return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg())
    }

总结:

可以看到Loki对于全局的动态配置计算还是比较简单,如果启用max_global_streams_per_user的话,应要合理的配置复制因子的个数。如果复制因子的个数 > 健康的ingester个数,则计算出来的租户限制会比配置文件中定义要大许多。

4. 日志条目大小限制

在Loki中,对于客户端push到distributor中产生的每条日志流是可以对其做条目的大小限制的,这个在配置里面默认是不限制,也就是说每行的日志大小可以是无限😄,当然大部分情况下我们都不会去限制这个,如果有的同学环境特殊,可以考虑开启对每行日志的大小限制。

limits_config:
# 日志条目的大小限制,默认不限制
  max_line_size:  | default = none

这部分的实现也比较简单,在distributor的validator.go中,ValidateEntry函数会对获取日志流中的条目长度进行比较,当MaxLineSize不为0且日志流中的条目长度大于MaxLineSize则放弃本次处理,并抛出LineTooLong异常

func (v Validator) ValidateEntry(userID string, labels string, entry logproto.Entry) error {

    if maxSize := v.MaxLineSize(userID); maxSize != 0 && len(entry.Line) > maxSize {
       validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, userID).Inc()
       validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, userID).Add(float64(len(entry.Line)))
       return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels))
    }
}

5. 查询限制

在Loki的查询中有一个限制日志返回行数的限制,它直接控制了你在grafana或者其他平台里面能够从loki里面拿到的日志行数,大部分情况下5000行可以满足需求,如果你觉得不够可以将其调大或者设置成0

limits_config:
  # 查询返回条目的限制
  max_entries_limit_per_query:  | default = 5000 

除此之外,对于单次查询的限制还有的chunk、stream和series的,不过大部分场景我们不会去对此做调整,小白这里就不在详细分析。

limits_config:
  # 单个查询最多匹配的chunk个数
  max_chunks_per_query: | default = 2000000
  
  # 限制查询是匹配到的chunk大小,默认0为不限制
  max_query_length:  | default = 0
  
  # 单词查询最多匹配到的日志流个数
  max_streams_matchers_per_query: | default = 1000
  
  # 限制查询时最大的日志度量个数
  max_query_series: | default = 500
  
  # 查询的并发数
  max_query_parallelism  | default = 14
  
  # 允许租户缓存结果的有效时间
  max_cache_freshness_per_query   |default = 1m.

总结

事实上Loki中关于limits的部分大都不需要进行额外的调整,除非我们对Loki的日志存储有明确的场景定位,这才有机会去调整这些参数。

比如你的Loki是一个内部的日志,不涉及到多租户,那你的limit策略值大都配置的比较高,如果你的loki是一个多租户场景下日志,每个租户都有自己明确的日志,这个时候你可能关注的就是合理限制单个租户的日志接受和查询,避免单个租户的行为影响集群。不过目前Loki似乎还没有针对租户身份信息来做shard服务,或许后面还有更好的方式来应对多租户的场景


关注公众号「云原生小白」,获取更多精彩内容


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:云原生小白

查看原文:Loki被限流了,Limits_Config到底限了个啥?

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

960 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传