From cb0c1aab803b99c581bd2be07e6f1739d0257623 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Sun, 25 Feb 2024 23:57:52 +0800 Subject: [PATCH] fix --- .../processor/extended/processor-rate-limit.md | 4 ++-- plugins/processor/ratelimit/processor_rate_limit.go | 11 ++++------- plugins/processor/ratelimit/token_bucket.go | 6 +++--- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/docs/cn/plugins/processor/extended/processor-rate-limit.md b/docs/cn/plugins/processor/extended/processor-rate-limit.md index aa7d945135..f21f1fecb1 100644 --- a/docs/cn/plugins/processor/extended/processor-rate-limit.md +++ b/docs/cn/plugins/processor/extended/processor-rate-limit.md @@ -2,8 +2,8 @@ ## 简介 -`processor_rate_limit processor`插件可以实现对日志的限速。在限定时间内,相同索引值下的日志只有低于等于限定速率数量的日志才会被采集,否则直接丢弃。 -例如,当索引字段为"ip"时,`'{"ip": "10.**.**.**", "method": "POST", "brower": "aliyun-sdk-java"}'`和`'{"ip": "10.**.**.**", "method": "GET", "brower": "aliyun-sdk-c++"}'`具有相同的索引值。因此,会累积计算`"ip": "10.**.**.**"`的日志数量。 +`processor_rate_limit processor`插件用于对日志进行限速处理,确保在设定的时间窗口内,具有相同索引值的日志条目的数量不超过预定的速率限制。若某索引值下的日志条目数量超出限定速率,则超额的日志条目将被丢弃不予采集。 +以"ip"字段作为索引的情况为例,考虑两条日志:`{"ip": "10.**.**.**", "method": "POST", "browser": "aliyun-sdk-java"}` 和`{"ip": "10.**.**.**", "method": "GET", "browser": "aliyun-sdk-c++"}`。这两条日志有相同的"ip"索引值(即 "10...")。在此情形下,系统将对所有"ip"为"10..."的日志条目进行累计,确保其数量在限定时间窗口内不超过设定的速率限制。 ## 版本 diff --git a/plugins/processor/ratelimit/processor_rate_limit.go b/plugins/processor/ratelimit/processor_rate_limit.go index 1f5441a6e9..9bc928cfa1 100644 --- a/plugins/processor/ratelimit/processor_rate_limit.go +++ b/plugins/processor/ratelimit/processor_rate_limit.go @@ -59,10 +59,7 @@ func (p *ProcessorRateLimit) ProcessLogs(logArray []*protocol.Log) []*protocol.L totalLen := len(logArray) nextIdx := 0 for idx := 0; idx < totalLen; idx++ { - key, err := p.makeKey(logArray[idx]) - if err != nil { - continue - } + key := p.makeKey(logArray[idx]) if p.Algorithm.IsAllowed(key) { if idx != nextIdx { logArray[nextIdx] = logArray[idx] @@ -77,9 +74,9 @@ func (p *ProcessorRateLimit) ProcessLogs(logArray []*protocol.Log) []*protocol.L return logArray } -func (p *ProcessorRateLimit) makeKey(log *protocol.Log) (string, error) { +func (p *ProcessorRateLimit) makeKey(log *protocol.Log) string { if len(p.Fields) == 0 { - return "", nil + return "" } sort.Strings(p.Fields) @@ -98,7 +95,7 @@ func (p *ProcessorRateLimit) makeKey(log *protocol.Log) (string, error) { } } - return strings.Join(values, "_"), nil + return strings.Join(values, "_") } func init() { diff --git a/plugins/processor/ratelimit/token_bucket.go b/plugins/processor/ratelimit/token_bucket.go index 1ffa21c70d..c643289f72 100644 --- a/plugins/processor/ratelimit/token_bucket.go +++ b/plugins/processor/ratelimit/token_bucket.go @@ -17,10 +17,10 @@ import ( "context" "math" "sync" + "sync/atomic" "time" "github.com/alibaba/ilogtail/pkg/logger" - "go.uber.org/atomic" ) type bucket struct { @@ -144,10 +144,10 @@ func (t *tokenBucket) runGC() { // Add tokens to all buckets according to the rate limit // and flag full buckets for deletion. - toDelete := make([]uint64, 0) + toDelete := make([]string, 0) numBucketsBefore := 0 t.buckets.Range(func(k, v interface{}) bool { - key := k.(uint64) + key := k.(string) b := v.(*bucket) bucketFull := b.replenish(t.limit)