Skip to content

Commit

Permalink
feat: add flusher_http queue buffer and async interceptor settings (#…
Browse files Browse the repository at this point in the history
…1203)

* set buffer size of the queue and allow to use interceptor asynchronously

* update doc

* remove log

* fix lint issue
shunjiazhu authored Oct 26, 2023
1 parent e3fee8e commit d68ae5f
Showing 3 changed files with 118 additions and 9 deletions.
2 changes: 2 additions & 0 deletions docs/cn/data-pipeline/flusher/flusher-http.md
Original file line number Diff line number Diff line change
@@ -29,6 +29,8 @@
| Convert.TagFieldsRename | Map<String,String> || 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> || ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags``time` |
| Concurrency | Int || 向url发起请求的并发数,默认为`1` |
| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024
| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否

## 样例

24 changes: 19 additions & 5 deletions plugins/flusher/http/flusher_http.go
Original file line number Diff line number Diff line change
@@ -67,7 +67,9 @@ type FlusherHTTP struct {
Concurrency int // How many requests can be performed in concurrent
Authenticator *extensions.ExtensionConfig // name and options of the extensions.ClientAuthenticator extension to use
FlushInterceptor *extensions.ExtensionConfig // name and options of the extensions.FlushInterceptor extension to use
AsyncIntercept bool // intercept the event asynchronously
RequestInterceptors []extensions.ExtensionConfig // custom request interceptor settings
QueueCapacity int // capacity of channel

varKeys []string

@@ -127,7 +129,10 @@ func (f *FlusherHTTP) Init(context pipeline.Context) error {
return err
}

f.queue = make(chan interface{})
if f.QueueCapacity <= 0 {
f.QueueCapacity = 1024
}
f.queue = make(chan interface{}, f.QueueCapacity)
for i := 0; i < f.Concurrency; i++ {
go f.runFlushTask()
}
@@ -148,12 +153,14 @@ func (f *FlusherHTTP) Flush(projectName string, logstoreName string, configName

func (f *FlusherHTTP) Export(groupEventsArray []*models.PipelineGroupEvents, ctx pipeline.PipelineContext) error {
for _, groupEvents := range groupEventsArray {
if f.interceptor != nil {
if !f.AsyncIntercept && f.interceptor != nil {
groupEvents = f.interceptor.Intercept(groupEvents)
if groupEvents == nil {
// skip groupEvents that is nil or empty.
if groupEvents == nil || len(groupEvents.Events) == 0 {
continue
}
}

f.addTask(groupEvents)
}
return nil
@@ -269,6 +276,12 @@ func (f *FlusherHTTP) convertAndFlush(data interface{}) error {
case *protocol.LogGroup:
logs, varValues, err = f.converter.ToByteStreamWithSelectedFields(v, f.varKeys)
case *models.PipelineGroupEvents:
if f.AsyncIntercept && f.interceptor != nil {
v = f.interceptor.Intercept(v)
if v == nil || len(v.Events) == 0 {
return nil
}
}
logs, varValues, err = f.converter.ToByteStreamWithSelectedFieldsV2(v, f.varKeys)
default:
return fmt.Errorf("unsupport data type")
@@ -452,8 +465,9 @@ func (f *FlusherHTTP) fillRequestContentType() {
func init() {
pipeline.Flushers["flusher_http"] = func() pipeline.Flusher {
return &FlusherHTTP{
Timeout: defaultTimeout,
Concurrency: 1,
QueueCapacity: 1024,
Timeout: defaultTimeout,
Concurrency: 1,
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolCustomSingle,
Encoding: converter.EncodingJSON,
101 changes: 97 additions & 4 deletions plugins/flusher/http/flusher_http_test.go
Original file line number Diff line number Diff line change
@@ -669,16 +669,86 @@ func TestGetNextRetryDelay(t *testing.T) {
}
}

func TestHttpFlusherFlushWithInterceptor(t *testing.T) {
Convey("Given a http flusher with sync intercepter", t, func() {
mockIntercepter := &mockInterceptor{}
flusher := &FlusherHTTP{
RemoteURL: "http://test.com/write",
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolInfluxdb,
Encoding: converter.EncodingCustom,
},
interceptor: mockIntercepter,
AsyncIntercept: false,
Timeout: defaultTimeout,
Concurrency: 1,
queue: make(chan interface{}, 10),
}

Convey("should discard all events", func() {
groupEvents := models.PipelineGroupEvents{
Events: []models.PipelineEvent{&models.Metric{
Name: "cpu.load.short",
Timestamp: 1672321328000000000,
Tags: models.NewTagsWithKeyValues("host", "server01", "region", "cn"),
Value: &models.MetricSingleValue{Value: 0.64},
}},
}
err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil)
So(err, ShouldBeNil)
So(flusher.queue, ShouldBeEmpty)
})
})

Convey("Given a http flusher with async intercepter", t, func() {
mockIntercepter := &mockInterceptor{}
flusher := &FlusherHTTP{
RemoteURL: "http://test.com/write",
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolInfluxdb,
Encoding: converter.EncodingCustom,
},
interceptor: mockIntercepter,
AsyncIntercept: true,
Timeout: defaultTimeout,
Concurrency: 1,
queue: make(chan interface{}, 10),
}

Convey("should discard all events", func() {
groupEvents := models.PipelineGroupEvents{
Events: []models.PipelineEvent{&models.Metric{
Name: "cpu.load.short",
Timestamp: 1672321328000000000,
Tags: models.NewTagsWithKeyValues("host", "server01", "region", "cn"),
Value: &models.MetricSingleValue{Value: 0.64},
}},
}
err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil)
So(err, ShouldBeNil)
So(len(flusher.queue), ShouldEqual, 1)
err = flusher.convertAndFlush(<-flusher.queue)
So(err, ShouldBeNil)
})

})
}

type mockContext struct {
pipeline.Context
basicAuth *basicAuth
basicAuth *basicAuth
interceptor *mockInterceptor
}

func (c mockContext) GetExtension(name string, cfg any) (pipeline.Extension, error) {
if c.basicAuth == nil {
return nil, fmt.Errorf("basicAuth not set")
if c.basicAuth != nil {
return c.basicAuth, nil
}
return c.basicAuth, nil

if c.interceptor != nil {
return c.interceptor, nil
}
return nil, fmt.Errorf("basicAuth not set")
}

func (c mockContext) GetConfigName() string {
@@ -723,3 +793,26 @@ func (b *basicAuthRoundTripper) RoundTrip(request *http.Request) (*http.Response
request.SetBasicAuth(b.auth.Username, b.auth.Password)
return b.base.RoundTrip(request)
}

type mockInterceptor struct {
}

func (mi *mockInterceptor) Description() string {
return "a filter that discard all events"
}

func (mi *mockInterceptor) Init(context pipeline.Context) error {
return nil
}

func (mi *mockInterceptor) Stop() error {
return nil
}

func (mi *mockInterceptor) Intercept(group *models.PipelineGroupEvents) *models.PipelineGroupEvents {
if group == nil {
return nil
}
group.Events = group.Events[:0]
return group
}

0 comments on commit d68ae5f

Please sign in to comment.