Skip to content

Commit

Permalink
feat: processor rate limit (#1321)
Browse files Browse the repository at this point in the history
* feat: processor rate limit

* fix racing
  • Loading branch information
Abingcbc authored Feb 27, 2024
1 parent ec95c66 commit ddf8f6f
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 0 deletions.
1 change: 1 addition & 0 deletions core/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ PluginRegistry::PluginRegistry() {
"processor_otel_trace",
"processor_packjson",
"processor_pick_key",
"processor_rate_limit",
"processor_regex",
"processor_rename",
"processor_split_char",
Expand Down
63 changes: 63 additions & 0 deletions docs/cn/plugins/processor/extended/processor-rate-limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# 日志限速

## 简介

`processor_rate_limit processor`插件用于对日志进行限速处理,确保在设定的时间窗口内,具有相同索引值的日志条目的数量不超过预定的速率限制。若某索引值下的日志条目数量超出限定速率,则超额的日志条目将被丢弃不予采集。
以"ip"字段作为索引的情况为例,考虑两条日志:`{"ip": "10.**.**.**", "method": "POST", "browser": "aliyun-sdk-java"}``{"ip": "10.**.**.**", "method": "GET", "browser": "aliyun-sdk-c++"}`。这两条日志有相同的"ip"索引值(即 "10...")。在此情形下,系统将对所有"ip"为"10..."的日志条目进行累计,确保其数量在限定时间窗口内不超过设定的速率限制。

## 版本

[Stable](../stability-level.md)

## 配置参数

| 参数 | 类型,默认值 | 说明 |
| ---------------------- | ------- | ------------------------------------------------- |
| Fields | []string,`[]` | 限速的索引字段。processor会根据这些字段的值所组合得到的结果,进行分别限速。|
| Limit | string,`[]` | 限速速率。格式为 `数字/时间单位`。支持的时间单位为 `s`(每秒),`m`(每分钟),`h`(每小时)
|

## 样例

* 输入

```bash
echo '{"ip": "10.**.**.**", "method": "POST", "brower": "aliyun-sdk-java"}' >> /home/test-log/proccessor-rate-limit.log
echo '{"ip": "10.**.**.**", "method": "POST", "brower": "aliyun-sdk-java"}' >> /home/test-log/proccessor-rate-limit.log
echo '{"ip": "10.**.**.**", "method": "POST", "brower": "aliyun-sdk-java"}' >> /home/test-log/proccessor-rate-limit.log
```

* 采集配置

```yaml
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
processors:
- Type: processor_json
SourceKey: content
KeepSource: false
ExpandDepth: 1
ExpandConnector: ""
- Type: processor_rate_limit
Fields:
- "ip"
Limit: "1/s"
flushers:
- Type: flusher_stdout
OnlyStdout: true
```
* 输出
```json
{
"__tag__:__path__": "/home/test-log/proccessor-rate-limit.log",
"__time__": "1658837955",
"brower": "aliyun-sdk-java",
"ip": "10.**.**.**",
"method": "POST"
}
```
1 change: 1 addition & 0 deletions plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ plugins:
- import: "github.com/alibaba/ilogtail/plugins/processor/pickkey"
- import: "github.com/alibaba/ilogtail/plugins/processor/regex"
- import: "github.com/alibaba/ilogtail/plugins/processor/rename"
- import: "github.com/alibaba/ilogtail/plugins/processor/ratelimit"
- import: "github.com/alibaba/ilogtail/plugins/processor/split/char"
- import: "github.com/alibaba/ilogtail/plugins/processor/split/keyvalue"
- import: "github.com/alibaba/ilogtail/plugins/processor/split/logregex"
Expand Down
85 changes: 85 additions & 0 deletions plugins/processor/ratelimit/algorithm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ratelimit

import (
"fmt"
"strconv"
"strings"
)

const (
unitPerSecond string = "s"
unitPerMinute string = "m"
unitPerHour string = "h"
)

type algorithm interface {
IsAllowed(string) bool
}

type rate struct {
value float64
unit string
valuePerSecond float64
}

// Unpack creates a rate from the given string
func (r *rate) Unpack(str string) error {
parts := strings.Split(str, "/")
if len(parts) != 2 {
return fmt.Errorf(`rate in invalid format: %v. Must be specified as "number/unit"`, str)
}

valueStr := strings.TrimSpace(parts[0])
unitStr := strings.TrimSpace(parts[1])

v, err := strconv.ParseFloat(valueStr, 32)
if err != nil {
return fmt.Errorf(`rate's value component is not numeric: %v`, valueStr)
}

if !IsValidUnit(unitStr) {
return fmt.Errorf(`rate's unit component is not valid: %v`, unitStr)
}

r.value = v
r.unit = unitStr
r.valuePerSecond = r.getValuePerSecond()

return nil
}

func (r *rate) getValuePerSecond() float64 {
switch r.unit {
case unitPerSecond:
return r.value
case unitPerMinute:
return r.value / 60
case unitPerHour:
return r.value / (60 * 60)
}

return 0
}

func IsValidUnit(candidate string) bool {
for _, a := range []string{unitPerSecond, unitPerMinute, unitPerHour} {
if candidate == a {
return true
}
}

return false
}
106 changes: 106 additions & 0 deletions plugins/processor/ratelimit/processor_rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ratelimit

import (
"fmt"
"sort"
"strings"

"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
)

type ProcessorRateLimit struct {
Fields []string `comment:"Optional. Fields of value to be limited, for each unique result from combining these field values."`
Limit string `comment:"Optional. Limit rate in the format of (number)/(time unit). Supported time unit: 's' (per second), 'm' (per minute), and 'h' (per hour)."`

Algorithm algorithm
limitMetric pipeline.CounterMetric
processedMetric pipeline.CounterMetric
context pipeline.Context
}

const pluginName = "processor_rate_limit"

func (p *ProcessorRateLimit) Init(context pipeline.Context) error {
p.context = context

limit := rate{}
err := limit.Unpack(p.Limit)
if err != nil {
return err
}
p.Algorithm = newTokenBucket(limit)
p.limitMetric = helper.NewCounterMetric(fmt.Sprintf("%v_limited", pluginName))
p.context.RegisterCounterMetric(p.limitMetric)
p.processedMetric = helper.NewCounterMetric(fmt.Sprintf("%v_processed", pluginName))
p.context.RegisterCounterMetric(p.processedMetric)
return nil
}

func (*ProcessorRateLimit) Description() string {
return "rate limit processor for logtail"
}

// V1
func (p *ProcessorRateLimit) ProcessLogs(logArray []*protocol.Log) []*protocol.Log {
totalLen := len(logArray)
nextIdx := 0
for idx := 0; idx < totalLen; idx++ {
key := p.makeKey(logArray[idx])
if p.Algorithm.IsAllowed(key) {
if idx != nextIdx {
logArray[nextIdx] = logArray[idx]
}
nextIdx++
} else {
p.limitMetric.Add(1)
}
p.processedMetric.Add(1)
}
logArray = logArray[:nextIdx]
return logArray
}

func (p *ProcessorRateLimit) makeKey(log *protocol.Log) string {
if len(p.Fields) == 0 {
return ""
}

sort.Strings(p.Fields)
values := make([]string, len(p.Fields))
for _, field := range p.Fields {
exist := false
for _, logContent := range log.Contents {
if field == logContent.GetKey() {
values = append(values, fmt.Sprintf("%v", logContent.GetValue()))
exist = true
break
}
}
if !exist {
values = append(values, "")
}
}

return strings.Join(values, "_")
}

func init() {
pipeline.Processors[pluginName] = func() pipeline.Processor {
return &ProcessorRateLimit{}
}
}
Loading

0 comments on commit ddf8f6f

Please sign in to comment.