Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Feb 26, 2024
1 parent d153f30 commit cb0c1aa
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 12 deletions.
4 changes: 2 additions & 2 deletions docs/cn/plugins/processor/extended/processor-rate-limit.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

## 简介

`processor_rate_limit processor`插件可以实现对日志的限速。在限定时间内,相同索引值下的日志只有低于等于限定速率数量的日志才会被采集,否则直接丢弃
例如,当索引字段为"ip"时,`'{"ip": "10.**.**.**", "method": "POST", "brower": "aliyun-sdk-java"}'``'{"ip": "10.**.**.**", "method": "GET", "brower": "aliyun-sdk-c++"}'`具有相同的索引值。因此,会累积计算`"ip": "10.**.**.**"`的日志数量
`processor_rate_limit processor`插件用于对日志进行限速处理,确保在设定的时间窗口内,具有相同索引值的日志条目的数量不超过预定的速率限制。若某索引值下的日志条目数量超出限定速率,则超额的日志条目将被丢弃不予采集
"ip"字段作为索引的情况为例,考虑两条日志:`{"ip": "10.**.**.**", "method": "POST", "browser": "aliyun-sdk-java"}``{"ip": "10.**.**.**", "method": "GET", "browser": "aliyun-sdk-c++"}`。这两条日志有相同的"ip"索引值(即 "10...")。在此情形下,系统将对所有"ip"为"10..."的日志条目进行累计,确保其数量在限定时间窗口内不超过设定的速率限制

## 版本

Expand Down
11 changes: 4 additions & 7 deletions plugins/processor/ratelimit/processor_rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ func (p *ProcessorRateLimit) ProcessLogs(logArray []*protocol.Log) []*protocol.L
totalLen := len(logArray)
nextIdx := 0
for idx := 0; idx < totalLen; idx++ {
key, err := p.makeKey(logArray[idx])
if err != nil {
continue
}
key := p.makeKey(logArray[idx])
if p.Algorithm.IsAllowed(key) {
if idx != nextIdx {
logArray[nextIdx] = logArray[idx]
Expand All @@ -77,9 +74,9 @@ func (p *ProcessorRateLimit) ProcessLogs(logArray []*protocol.Log) []*protocol.L
return logArray
}

func (p *ProcessorRateLimit) makeKey(log *protocol.Log) (string, error) {
func (p *ProcessorRateLimit) makeKey(log *protocol.Log) string {
if len(p.Fields) == 0 {
return "", nil
return ""
}

sort.Strings(p.Fields)
Expand All @@ -98,7 +95,7 @@ func (p *ProcessorRateLimit) makeKey(log *protocol.Log) (string, error) {
}
}

return strings.Join(values, "_"), nil
return strings.Join(values, "_")
}

func init() {
Expand Down
6 changes: 3 additions & 3 deletions plugins/processor/ratelimit/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"github.com/alibaba/ilogtail/pkg/logger"
"go.uber.org/atomic"
)

type bucket struct {
Expand Down Expand Up @@ -144,10 +144,10 @@ func (t *tokenBucket) runGC() {

// Add tokens to all buckets according to the rate limit
// and flag full buckets for deletion.
toDelete := make([]uint64, 0)
toDelete := make([]string, 0)
numBucketsBefore := 0
t.buckets.Range(func(k, v interface{}) bool {
key := k.(uint64)
key := k.(string)
b := v.(*bucket)

bucketFull := b.replenish(t.limit)
Expand Down

0 comments on commit cb0c1aa

Please sign in to comment.