diff --git a/core/plugin/PluginRegistry.cpp b/core/plugin/PluginRegistry.cpp index 836d767401..b2320df97b 100644 --- a/core/plugin/PluginRegistry.cpp +++ b/core/plugin/PluginRegistry.cpp @@ -142,6 +142,7 @@ PluginRegistry::PluginRegistry() { "processor_otel_trace", "processor_packjson", "processor_pick_key", + "processor_rate_limit", "processor_regex", "processor_rename", "processor_split_char", diff --git a/docs/cn/plugins/processor/extended/processor-rate-limit.md b/docs/cn/plugins/processor/extended/processor-rate-limit.md new file mode 100644 index 0000000000..f21f1fecb1 --- /dev/null +++ b/docs/cn/plugins/processor/extended/processor-rate-limit.md @@ -0,0 +1,63 @@ +# 日志限速 + +## 简介 + +`processor_rate_limit processor`插件用于对日志进行限速处理,确保在设定的时间窗口内,具有相同索引值的日志条目的数量不超过预定的速率限制。若某索引值下的日志条目数量超出限定速率,则超额的日志条目将被丢弃不予采集。 +以"ip"字段作为索引的情况为例,考虑两条日志:`{"ip": "10.**.**.**", "method": "POST", "browser": "aliyun-sdk-java"}` 和`{"ip": "10.**.**.**", "method": "GET", "browser": "aliyun-sdk-c++"}`。这两条日志有相同的"ip"索引值(即 "10...")。在此情形下,系统将对所有"ip"为"10..."的日志条目进行累计,确保其数量在限定时间窗口内不超过设定的速率限制。 + +## 版本 + +[Stable](../stability-level.md) + +## 配置参数 + +| 参数 | 类型,默认值 | 说明 | +| ---------------------- | ------- | ------------------------------------------------- | +| Fields | []string,`[]` | 限速的索引字段。processor会根据这些字段的值所组合得到的结果,进行分别限速。| +| Limit | string,`[]` | 限速速率。格式为 `数字/时间单位`。支持的时间单位为 `s`(每秒),`m`(每分钟),`h`(每小时) +| + +## 样例 + +* 输入 + +```bash +echo '{"ip": "10.**.**.**", "method": "POST", "brower": "aliyun-sdk-java"}' >> /home/test-log/proccessor-rate-limit.log +echo '{"ip": "10.**.**.**", "method": "POST", "brower": "aliyun-sdk-java"}' >> /home/test-log/proccessor-rate-limit.log +echo '{"ip": "10.**.**.**", "method": "POST", "brower": "aliyun-sdk-java"}' >> /home/test-log/proccessor-rate-limit.log +``` + +* 采集配置 + +```yaml +enable: true +inputs: + - Type: input_file + FilePaths: + - /home/test-log/*.log +processors: + - Type: processor_json + SourceKey: content + KeepSource: false + ExpandDepth: 1 + ExpandConnector: "" + - Type: processor_rate_limit + Fields: + - "ip" + Limit: "1/s" +flushers: + - Type: flusher_stdout + OnlyStdout: true +``` + +* 输出 + +```json +{ + "__tag__:__path__": "/home/test-log/proccessor-rate-limit.log", + "__time__": "1658837955", + "brower": "aliyun-sdk-java", + "ip": "10.**.**.**", + "method": "POST" +} +``` diff --git a/plugins.yml b/plugins.yml index b3e5644a2f..03981c5d55 100644 --- a/plugins.yml +++ b/plugins.yml @@ -100,6 +100,7 @@ plugins: - import: "github.com/alibaba/ilogtail/plugins/processor/pickkey" - import: "github.com/alibaba/ilogtail/plugins/processor/regex" - import: "github.com/alibaba/ilogtail/plugins/processor/rename" + - import: "github.com/alibaba/ilogtail/plugins/processor/ratelimit" - import: "github.com/alibaba/ilogtail/plugins/processor/split/char" - import: "github.com/alibaba/ilogtail/plugins/processor/split/keyvalue" - import: "github.com/alibaba/ilogtail/plugins/processor/split/logregex" diff --git a/plugins/processor/ratelimit/algorithm.go b/plugins/processor/ratelimit/algorithm.go new file mode 100644 index 0000000000..59952a657b --- /dev/null +++ b/plugins/processor/ratelimit/algorithm.go @@ -0,0 +1,85 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package ratelimit + +import ( + "fmt" + "strconv" + "strings" +) + +const ( + unitPerSecond string = "s" + unitPerMinute string = "m" + unitPerHour string = "h" +) + +type algorithm interface { + IsAllowed(string) bool +} + +type rate struct { + value float64 + unit string + valuePerSecond float64 +} + +// Unpack creates a rate from the given string +func (r *rate) Unpack(str string) error { + parts := strings.Split(str, "/") + if len(parts) != 2 { + return fmt.Errorf(`rate in invalid format: %v. Must be specified as "number/unit"`, str) + } + + valueStr := strings.TrimSpace(parts[0]) + unitStr := strings.TrimSpace(parts[1]) + + v, err := strconv.ParseFloat(valueStr, 32) + if err != nil { + return fmt.Errorf(`rate's value component is not numeric: %v`, valueStr) + } + + if !IsValidUnit(unitStr) { + return fmt.Errorf(`rate's unit component is not valid: %v`, unitStr) + } + + r.value = v + r.unit = unitStr + r.valuePerSecond = r.getValuePerSecond() + + return nil +} + +func (r *rate) getValuePerSecond() float64 { + switch r.unit { + case unitPerSecond: + return r.value + case unitPerMinute: + return r.value / 60 + case unitPerHour: + return r.value / (60 * 60) + } + + return 0 +} + +func IsValidUnit(candidate string) bool { + for _, a := range []string{unitPerSecond, unitPerMinute, unitPerHour} { + if candidate == a { + return true + } + } + + return false +} diff --git a/plugins/processor/ratelimit/processor_rate_limit.go b/plugins/processor/ratelimit/processor_rate_limit.go new file mode 100644 index 0000000000..85b6abf9a9 --- /dev/null +++ b/plugins/processor/ratelimit/processor_rate_limit.go @@ -0,0 +1,106 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package ratelimit + +import ( + "fmt" + "sort" + "strings" + + "github.com/alibaba/ilogtail/pkg/helper" + "github.com/alibaba/ilogtail/pkg/pipeline" + "github.com/alibaba/ilogtail/pkg/protocol" +) + +type ProcessorRateLimit struct { + Fields []string `comment:"Optional. Fields of value to be limited, for each unique result from combining these field values."` + Limit string `comment:"Optional. Limit rate in the format of (number)/(time unit). Supported time unit: 's' (per second), 'm' (per minute), and 'h' (per hour)."` + + Algorithm algorithm + limitMetric pipeline.CounterMetric + processedMetric pipeline.CounterMetric + context pipeline.Context +} + +const pluginName = "processor_rate_limit" + +func (p *ProcessorRateLimit) Init(context pipeline.Context) error { + p.context = context + + limit := rate{} + err := limit.Unpack(p.Limit) + if err != nil { + return err + } + p.Algorithm = newTokenBucket(limit) + p.limitMetric = helper.NewCounterMetric(fmt.Sprintf("%v_limited", pluginName)) + p.context.RegisterCounterMetric(p.limitMetric) + p.processedMetric = helper.NewCounterMetric(fmt.Sprintf("%v_processed", pluginName)) + p.context.RegisterCounterMetric(p.processedMetric) + return nil +} + +func (*ProcessorRateLimit) Description() string { + return "rate limit processor for logtail" +} + +// V1 +func (p *ProcessorRateLimit) ProcessLogs(logArray []*protocol.Log) []*protocol.Log { + totalLen := len(logArray) + nextIdx := 0 + for idx := 0; idx < totalLen; idx++ { + key := p.makeKey(logArray[idx]) + if p.Algorithm.IsAllowed(key) { + if idx != nextIdx { + logArray[nextIdx] = logArray[idx] + } + nextIdx++ + } else { + p.limitMetric.Add(1) + } + p.processedMetric.Add(1) + } + logArray = logArray[:nextIdx] + return logArray +} + +func (p *ProcessorRateLimit) makeKey(log *protocol.Log) string { + if len(p.Fields) == 0 { + return "" + } + + sort.Strings(p.Fields) + values := make([]string, len(p.Fields)) + for _, field := range p.Fields { + exist := false + for _, logContent := range log.Contents { + if field == logContent.GetKey() { + values = append(values, fmt.Sprintf("%v", logContent.GetValue())) + exist = true + break + } + } + if !exist { + values = append(values, "") + } + } + + return strings.Join(values, "_") +} + +func init() { + pipeline.Processors[pluginName] = func() pipeline.Processor { + return &ProcessorRateLimit{} + } +} diff --git a/plugins/processor/ratelimit/processor_rate_limit_test.go b/plugins/processor/ratelimit/processor_rate_limit_test.go new file mode 100644 index 0000000000..2021cdc240 --- /dev/null +++ b/plugins/processor/ratelimit/processor_rate_limit_test.go @@ -0,0 +1,181 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package ratelimit + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/check" + + "github.com/alibaba/ilogtail/pkg/logger" + "github.com/alibaba/ilogtail/pkg/pipeline" + "github.com/alibaba/ilogtail/pkg/protocol" + "github.com/alibaba/ilogtail/plugins/test" + "github.com/alibaba/ilogtail/plugins/test/mock" +) + +var _ = check.Suite(&processorTestSuite{}) + +func Test(t *testing.T) { + logger.InitTestLogger() + check.TestingT(t) +} + +type processorTestSuite struct { + processor pipeline.ProcessorV1 +} + +func (s *processorTestSuite) SetUpTest(c *check.C) { + s.processor = pipeline.Processors["processor_rate_limit"]().(pipeline.ProcessorV1) + _ = s.processor.Init(mock.NewEmptyContext("p", "l", "c")) + logger.Info(context.Background(), "set up", s.processor.Description()) +} + +func (s *processorTestSuite) TearDownTest(c *check.C) { + +} + +func (s *processorTestSuite) TestDefault(c *check.C) { + { + // case: no configuration + var log = "xxxx\nyyyy\nzzzz" + processor, _ := s.processor.(*ProcessorRateLimit) + processor.Limit = "3/s" + _ = s.processor.Init(mock.NewEmptyContext("p", "l", "c")) + logArray := []*protocol.Log{ + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value2", "key2", "value1"), + } + outLogs := s.processor.ProcessLogs(logArray) + c.Assert(len(outLogs), check.Equals, 3) + c.Assert(len(outLogs[0].Contents), check.Equals, 3) + time.Sleep(time.Second) + outLogs = s.processor.ProcessLogs(logArray) + c.Assert(len(outLogs), check.Equals, 3) + c.Assert(len(outLogs[0].Contents), check.Equals, 3) + // metric + c.Assert(processor.limitMetric.Get(), check.Equals, int64(2)) + c.Assert(processor.processedMetric.Get(), check.Equals, int64(8)) + } +} + +func (s *processorTestSuite) TestField(c *check.C) { + { + // case: single field + var log = "xxxx\nyyyy\nzzzz" + processor, _ := s.processor.(*ProcessorRateLimit) + processor.Limit = "3/s" + processor.Fields = []string{"key1"} + _ = s.processor.Init(mock.NewEmptyContext("p", "l", "c")) + logArray := []*protocol.Log{ + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + + test.CreateLogs("content", log, "key1", "value2", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value2", "key2", "value2"), + } + outLogs := s.processor.ProcessLogs(logArray) + c.Assert(len(outLogs), check.Equals, 5) + // metric + c.Assert(processor.limitMetric.Get(), check.Equals, int64(1)) + c.Assert(processor.processedMetric.Get(), check.Equals, int64(6)) + } + { + // case: multiple fields + var log = "xxxx\nyyyy\nzzzz" + processor, _ := s.processor.(*ProcessorRateLimit) + processor.Limit = "3/s" + processor.Fields = []string{"key1", "key2"} + _ = s.processor.Init(mock.NewEmptyContext("p", "l", "c")) + logArray := []*protocol.Log{ + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value1", "key2", "value2"), + + test.CreateLogs("content", log, "key1", "value2", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value2", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value2", "key2", "value2"), + test.CreateLogs("content", log, "key1", "value2", "key2", "value2"), + + test.CreateLogs("content", log, "key1", "value2", "key2", "value1"), + test.CreateLogs("content", log, "key1", "value2", "key2", "value1"), + test.CreateLogs("content", log, "key1", "value2", "key2", "value1"), + test.CreateLogs("content", log, "key1", "value2", "key2", "value1"), + } + outLogs := s.processor.ProcessLogs(logArray) + c.Assert(len(outLogs), check.Equals, 9) + // metric + c.Assert(processor.limitMetric.Get(), check.Equals, int64(3)) + c.Assert(processor.processedMetric.Get(), check.Equals, int64(12)) + } +} + +func (s *processorTestSuite) TestGC(c *check.C) { + { + // case: gc in single process + var log = "xxxx\nyyyy\nzzzz" + processor, _ := s.processor.(*ProcessorRateLimit) + processor.Limit = "3/s" + processor.Fields = []string{"key1"} + _ = s.processor.Init(mock.NewEmptyContext("p", "l", "c")) + logArray := make([]*protocol.Log, 10010) + for i := 0; i < 5; i++ { + logArray[i] = test.CreateLogs("content", log, "key1", "value1", "key2", "value2") + } + for i := 5; i < 10005; i++ { + logArray[i] = test.CreateLogs("content", log, "key1", "value2", "key2", "value2") + } + for i := 10005; i < 10010; i++ { + logArray[i] = test.CreateLogs("content", log, "key1", "value1", "key2", "value2") + } + outLogs := s.processor.ProcessLogs(logArray) + c.Assert(len(outLogs), check.Equals, 6) + // metric + c.Assert(processor.limitMetric.Get(), check.Equals, int64(10004)) + c.Assert(processor.processedMetric.Get(), check.Equals, int64(10010)) + } + { + // case: gc in multiple process + var log = "xxxx\nyyyy\nzzzz" + processor, _ := s.processor.(*ProcessorRateLimit) + processor.Limit = "3/s" + processor.Fields = []string{"key1"} + _ = s.processor.Init(mock.NewEmptyContext("p", "l", "c")) + logArray := make([]*protocol.Log, 10005) + for i := 0; i < 5; i++ { + logArray[i] = test.CreateLogs("content", log, "key1", "value1", "key2", "value2") + } + for i := 5; i < 10005; i++ { + logArray[i] = test.CreateLogs("content", log, "key1", "value2", "key2", "value2") + } + outLogs := s.processor.ProcessLogs(logArray) + c.Assert(len(outLogs), check.Equals, 6) + logArray = make([]*protocol.Log, 5) + for i := 0; i < 5; i++ { + logArray[i] = test.CreateLogs("content", log, "key1", "value1", "key2", "value2") + } + outLogs = s.processor.ProcessLogs(logArray) + c.Assert(len(outLogs), check.Equals, 0) + // metric + c.Assert(processor.limitMetric.Get(), check.Equals, int64(10004)) + c.Assert(processor.processedMetric.Get(), check.Equals, int64(10010)) + } +} diff --git a/plugins/processor/ratelimit/token_bucket.go b/plugins/processor/ratelimit/token_bucket.go new file mode 100644 index 0000000000..1a58547fef --- /dev/null +++ b/plugins/processor/ratelimit/token_bucket.go @@ -0,0 +1,176 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package ratelimit + +import ( + "context" + "math" + "sync" + "time" + + "github.com/alibaba/ilogtail/pkg/logger" + + "go.uber.org/atomic" +) + +type bucket struct { + mu sync.Mutex // Avoid replenish racing + tokens atomic.Float64 + lastReplenish time.Time +} + +type tokenBucket struct { + mu sync.Mutex // Avoid conflict GC + + limit rate + buckets sync.Map + + gc gcConfig +} + +// GC thresholds and metrics +type gcConfig struct { + thresholds tokenBucketGCConfig + metrics struct { + numCalls atomic.Int32 + } +} + +type tokenBucketGCConfig struct { + // NumCalls is the number of calls made to IsAllowed. When more than + // the specified number of calls are made, GC is performed. + NumCalls int32 `config:"num_calls"` +} + +type tokenBucketConfig struct { + // GC governs when completely filled token buckets must be deleted + // to free up memory. GC is performed when _any_ of the GC conditions + // below are met. After each GC, counters corresponding to _each_ of + // the GC conditions below are reset. + GC tokenBucketGCConfig `config:"gc"` +} + +func newTokenBucket(rate rate) algorithm { + cfg := tokenBucketConfig{ + GC: tokenBucketGCConfig{ + NumCalls: 10000, + }, + } + + return &tokenBucket{ + limit: rate, + buckets: sync.Map{}, + gc: gcConfig{ + thresholds: tokenBucketGCConfig{ + NumCalls: cfg.GC.NumCalls, + }, + }, + mu: sync.Mutex{}, + } +} + +func (t *tokenBucket) IsAllowed(key string) bool { + t.runGC() + + b := t.getBucket(key) + allowed := b.withdraw() + + t.gc.metrics.numCalls.Add(1) + return allowed +} + +func (t *tokenBucket) getBucket(key string) *bucket { + v, exists := t.buckets.LoadOrStore(key, &bucket{ + tokens: *atomic.NewFloat64(t.limit.value), + lastReplenish: time.Now(), + }) + b := v.(*bucket) + + if exists { + b.replenish(t.limit) + return b + } + + return b +} + +func (b *bucket) withdraw() bool { + if b.tokens.Load() < 1 { + return false + } + b.tokens.Add(-1) + return true +} + +// Replenish token to the bucket +// Return true if the bucket is full. +func (b *bucket) replenish(rate rate) bool { + b.mu.Lock() + defer b.mu.Unlock() + secsSinceLastReplenish := time.Since(b.lastReplenish).Seconds() + tokensToReplenish := secsSinceLastReplenish * rate.valuePerSecond + + b.tokens.Store(math.Min(b.tokens.Load()+tokensToReplenish, rate.value)) + b.lastReplenish = time.Now() + + return b.tokens.Load() >= rate.value +} + +func (t *tokenBucket) runGC() { + // Don't run GC if thresholds haven't been crossed. + if t.gc.metrics.numCalls.Load() < t.gc.thresholds.NumCalls { + return + } + + if !t.mu.TryLock() { + return + } + + go func() { + defer t.mu.Unlock() + gcStartTime := time.Now() + + // Add tokens to all buckets according to the rate limit + // and flag full buckets for deletion. + toDelete := make([]string, 0) + numBucketsBefore := 0 + t.buckets.Range(func(k, v interface{}) bool { + key := k.(string) + b := v.(*bucket) + + bucketFull := b.replenish(t.limit) + + if bucketFull { + toDelete = append(toDelete, key) + } + + numBucketsBefore++ + return true + }) + + // Cleanup full buckets to free up memory + for _, key := range toDelete { + t.buckets.Delete(key) + } + + // Reset GC metrics + t.gc.metrics.numCalls = atomic.Int32{} + + gcDuration := time.Since(gcStartTime) + numBucketsDeleted := len(toDelete) + numBucketsAfter := numBucketsBefore - numBucketsDeleted + logger.Debugf(context.Background(), "gc duration: %v, buckets: (before: %v, deleted: %v, after: %v)", + gcDuration, numBucketsBefore, numBucketsDeleted, numBucketsAfter) + }() +}