From 3db6a2a1b16c743a5dc5acd856bd59789a9b41f8 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 4 Jan 2022 15:47:55 -0700 Subject: [PATCH 01/11] Support running multiple log groups in cloudwatch input --- x-pack/filebeat/input/awscloudwatch/config.go | 10 +- x-pack/filebeat/input/awscloudwatch/input.go | 175 ++++++++++++------ x-pack/filebeat/input/awss3/s3.go | 5 +- x-pack/filebeat/input/awss3/sqs.go | 5 +- .../awss3 => libbeat/common/aws}/semaphore.go | 16 +- .../common/aws}/semaphore_test.go | 2 +- 6 files changed, 135 insertions(+), 78 deletions(-) rename x-pack/{filebeat/input/awss3 => libbeat/common/aws}/semaphore.go (81%) rename x-pack/{filebeat/input/awss3 => libbeat/common/aws}/semaphore_test.go (98%) diff --git a/x-pack/filebeat/input/awscloudwatch/config.go b/x-pack/filebeat/input/awscloudwatch/config.go index 3f04813e78c4..7744b2007544 100644 --- a/x-pack/filebeat/input/awscloudwatch/config.go +++ b/x-pack/filebeat/input/awscloudwatch/config.go @@ -25,6 +25,7 @@ type config struct { APITimeout time.Duration `config:"api_timeout" validate:"min=0,nonzero"` APISleep time.Duration `config:"api_sleep" validate:"min=0,nonzero"` Latency time.Duration `config:"latency"` + NumberOfWorkers int `config:"number_of_workers"` AwsConfig awscommon.ConfigAWS `config:",inline"` } @@ -33,10 +34,11 @@ func defaultConfig() config { ForwarderConfig: harvester.ForwarderConfig{ Type: "aws-cloudwatch", }, - StartPosition: "beginning", - ScanFrequency: 10 * time.Second, - APITimeout: 120 * time.Second, - APISleep: 200 * time.Millisecond, // FilterLogEvents has a limit of 5 transactions per second (TPS)/account/Region: 1s / 5 = 200 ms + StartPosition: "beginning", + ScanFrequency: 10 * time.Second, + APITimeout: 120 * time.Second, + APISleep: 200 * time.Millisecond, // FilterLogEvents has a limit of 5 transactions per second (TPS)/account/Region: 1s / 5 = 200 ms + NumberOfWorkers: 5, } } diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 22f9efe15c68..651b0a88865b 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -42,21 +42,25 @@ func init() { } } -// awsCloudWatchInput is a input for AWS CloudWatch logs +// awsCloudWatchInput is an input for AWS CloudWatch logs type awsCloudWatchInput struct { config config awsConfig awssdk.Config logger *logp.Logger outlet channel.Outleter // Output of received aws-cloudwatch logs. - inputCtx *channelContext + inputCtx context.Context - workerOnce sync.Once // Guarantees that the worker goroutine is only started once. - workerWg sync.WaitGroup // Waits on aws-cloudwatch worker goroutine. - stopOnce sync.Once - close chan struct{} + workerWg sync.WaitGroup // Waits on aws-cloudwatch worker goroutine. + workerCtx context.Context // Worker goroutine context. It's cancelled when the input stops or the worker exits. + workerCancel context.CancelFunc // Used to signal that the worker should stop. + stopOnce sync.Once + close chan struct{} + startTime int64 + endTime int64 prevEndTime int64 // track previous endTime for each iteration. + workerSem *awscommon.Sem } // channelContext implements context.Context by wrapping a channel @@ -77,8 +81,8 @@ func (c *channelContext) Err() error { func (c *channelContext) Value(key interface{}) interface{} { return nil } // NewInput creates a new aws-cloudwatch input -func NewInput(cfg *common.Config, connector channel.Connector, context input.Context) (input.Input, error) { - cfgwarn.Beta("aws-clouwatch input type is used") +func NewInput(cfg *common.Config, connector channel.Connector, ctx input.Context) (input.Input, error) { + cfgwarn.Beta("aws-cloudwatch input type is used") logger := logp.NewLogger(inputName) // Extract and validate the input's configuration. @@ -109,13 +113,33 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con awsConfig.Region = config.RegionName closeChannel := make(chan struct{}) + // Wrap input.Context's Done channel with a context.Context. This goroutine + // stops with the parent closes the Done channel. + inputCtx, cancelInputCtx := context.WithCancel(context.Background()) + go func() { + defer cancelInputCtx() + select { + case <-ctx.Done: + case <-inputCtx.Done(): + } + }() + + // If the input ever needs to be made restartable, then context would need + // to be recreated with each restart. + workerCtx, workerCancel := context.WithCancel(inputCtx) + in := &awsCloudWatchInput{ - config: config, - awsConfig: awsConfig, - logger: logger, - close: closeChannel, - inputCtx: &channelContext{closeChannel}, - prevEndTime: int64(0), + config: config, + awsConfig: awsConfig, + logger: logger, + close: closeChannel, + inputCtx: inputCtx, + workerCtx: workerCtx, + workerCancel: workerCancel, + startTime: int64(0), + endTime: int64(0), + prevEndTime: int64(0), + workerSem: awscommon.NewSem(config.NumberOfWorkers), } // Build outlet for events. @@ -135,48 +159,80 @@ func (in *awsCloudWatchInput) Run() { cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AwsConfig.Endpoint, logsServiceName, in.config.RegionName, in.awsConfig) svc := cloudwatchlogs.New(cwConfig) - var logGroupNames []string - var err error - if in.config.LogGroupNamePrefix != "" { - logGroupNames, err = in.getLogGroupNames(svc) + logGroupNames, err := in.getLogGroupNames(svc) + if err != nil { + in.logger.Error("getLogGroupNames failed: ", err) + return + } + + // This loop tries to keep the workers busy as much as possible while + // honoring the number in config opposed to a simpler loop that does one + // listing, sequentially processes every object and then does another listing + logGroupCount := 0 + start := true + for in.inputCtx.Err() == nil { + if logGroupCount == 0 { + currentTime := time.Now() + in.startTime, in.endTime = getStartPosition(in.config.StartPosition, currentTime, in.prevEndTime, in.config.ScanFrequency, in.config.Latency) + in.logger.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(in.startTime/1000, 0), time.Unix(in.endTime/1000, 0)) + in.prevEndTime = in.endTime + } + + if start == false { + in.logger.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) + time.Sleep(in.config.ScanFrequency) + in.logger.Debug("done sleeping") + } + start = false + + // Determine how many workers are available. + availableWorkers, err := in.workerSem.AcquireContext(in.config.NumberOfWorkers, in.inputCtx) if err != nil { - in.logger.Error("getLogGroupNames failed: ", err) - return + break + } + + if availableWorkers == 0 { + continue } - } else { - logGroupNames = []string{in.config.LogGroupName} - } - for _, logGroup := range logGroupNames { - in.config.LogGroupName = logGroup - in.workerOnce.Do(func() { + // Process each log group name asynchronously with a goroutine. + for i := 0; i < in.config.NumberOfWorkers; i++ { + if logGroupCount >= len(logGroupNames) { + // reset logGroupCount + logGroupCount = 0 + break + } + + lg := logGroupNames[logGroupCount] in.workerWg.Add(1) - go func() { - in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", in.config.LogGroupName) - defer in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", in.config.LogGroupName) - defer in.workerWg.Done() - in.run(svc) - }() - }) + go func(logGroup string, startTime int64, endTime int64) { + defer func() { + in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) + in.workerWg.Done() + in.workerSem.Release(1) + in.workerCancel() + }() + in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) + in.run(svc, logGroup, startTime, endTime) + }(lg, in.startTime, in.endTime) + logGroupCount++ + } } + // Wait for all workers to finish. + in.workerWg.Wait() } -func (in *awsCloudWatchInput) run(svc cloudwatchlogsiface.ClientAPI) { - for in.inputCtx.Err() == nil { - err := in.getLogEventsFromCloudWatch(svc) - if err != nil { - var aerr *awssdk.RequestCanceledError - if errors.As(err, &aerr) { - continue - } - in.logger.Error("getLogEventsFromCloudWatch failed: ", err) - continue +func (in *awsCloudWatchInput) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64) error { + err := in.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime) + if err != nil { + var err *awssdk.RequestCanceledError + if errors.As(err, &err) { + return err } - - in.logger.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) - time.Sleep(in.config.ScanFrequency) - in.logger.Debug("done sleeping") + in.logger.Error("getLogEventsFromCloudWatch failed: ", err) + return err } + return nil } func parseARN(logGroupARN string) (string, string, error) { @@ -196,6 +252,10 @@ func parseARN(logGroupARN string) (string, string, error) { // getLogGroupNames uses DescribeLogGroups API to retrieve all log group names func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI) ([]string, error) { + if in.config.LogGroupNamePrefix == "" { + return []string{in.config.LogGroupName}, nil + } + // construct DescribeLogGroupsInput filterLogEventsInput := &cloudwatchlogs.DescribeLogGroupsInput{ LogGroupNamePrefix: awssdk.String(in.config.LogGroupNamePrefix), @@ -221,16 +281,9 @@ func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI } // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch -func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI) error { - currentTime := time.Now() - startTime, endTime := getStartPosition(in.config.StartPosition, currentTime, in.prevEndTime, in.config.ScanFrequency, in.config.Latency) - in.logger.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(startTime/1000, 0), time.Unix(endTime/1000, 0)) - - // overwrite prevEndTime using new endTime - in.prevEndTime = endTime - +func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64) error { // construct FilterLogEventsInput - filterLogEventsInput := in.constructFilterLogEventsInput(startTime, endTime) + filterLogEventsInput := in.constructFilterLogEventsInput(startTime, endTime, logGroup) // make API request req := svc.FilterLogEventsRequest(filterLogEventsInput) @@ -240,7 +293,7 @@ func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface logEvents := page.Events in.logger.Debugf("Processing #%v events", len(logEvents)) - err := in.processLogEvents(logEvents) + err := in.processLogEvents(logEvents, logGroup) if err != nil { err = errors.Wrap(err, "processLogEvents failed") in.logger.Error(err) @@ -258,9 +311,9 @@ func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface return nil } -func (in *awsCloudWatchInput) constructFilterLogEventsInput(startTime int64, endTime int64) *cloudwatchlogs.FilterLogEventsInput { +func (in *awsCloudWatchInput) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: awssdk.String(in.config.LogGroupName), + LogGroupName: awssdk.String(logGroup), StartTime: awssdk.Int64(startTime), EndTime: awssdk.Int64(endTime), } @@ -296,9 +349,9 @@ func getStartPosition(startPosition string, currentTime time.Time, prevEndTime i return } -func (in *awsCloudWatchInput) processLogEvents(logEvents []cloudwatchlogs.FilteredLogEvent) error { +func (in *awsCloudWatchInput) processLogEvents(logEvents []cloudwatchlogs.FilteredLogEvent, logGroup string) error { for _, logEvent := range logEvents { - event := createEvent(logEvent, in.config.LogGroupName, in.config.RegionName) + event := createEvent(logEvent, logGroup, in.config.RegionName) err := in.forwardEvent(event) if err != nil { err = errors.Wrap(err, "forwardEvent failed") diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 1688ca7ebc83..d2ad378d6c7e 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/statestore" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/go-concert/timed" ) @@ -44,7 +45,7 @@ type s3Poller struct { region string provider string bucketPollInterval time.Duration - workerSem *sem + workerSem *awscommon.Sem s3 s3API log *logp.Logger metrics *inputMetrics @@ -77,7 +78,7 @@ func newS3Poller(log *logp.Logger, region: awsRegion, provider: provider, bucketPollInterval: bucketPollInterval, - workerSem: newSem(numberOfWorkers), + workerSem: awscommon.NewSem(numberOfWorkers), s3: s3, log: log, metrics: metrics, diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 1f13ec010cf7..f1fc7588e376 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/go-concert/timed" ) @@ -23,7 +24,7 @@ const ( type sqsReader struct { maxMessagesInflight int - workerSem *sem + workerSem *awscommon.Sem sqs sqsAPI msgHandler sqsProcessor log *logp.Logger @@ -36,7 +37,7 @@ func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessag } return &sqsReader{ maxMessagesInflight: maxMessagesInflight, - workerSem: newSem(maxMessagesInflight), + workerSem: awscommon.NewSem(maxMessagesInflight), sqs: sqs, msgHandler: msgHandler, log: log, diff --git a/x-pack/filebeat/input/awss3/semaphore.go b/x-pack/libbeat/common/aws/semaphore.go similarity index 81% rename from x-pack/filebeat/input/awss3/semaphore.go rename to x-pack/libbeat/common/aws/semaphore.go index 2a695f4c6210..28343bcbd32e 100644 --- a/x-pack/filebeat/input/awss3/semaphore.go +++ b/x-pack/libbeat/common/aws/semaphore.go @@ -2,22 +2,22 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package awss3 +package aws import ( "context" "sync" ) -type sem struct { +type Sem struct { mutex *sync.Mutex cond sync.Cond available int } -func newSem(n int) *sem { +func NewSem(n int) *Sem { var m sync.Mutex - return &sem{ + return &Sem{ available: n, mutex: &m, cond: sync.Cond{ @@ -26,7 +26,7 @@ func newSem(n int) *sem { } } -func (s *sem) AcquireContext(n int, ctx context.Context) (int, error) { +func (s *Sem) AcquireContext(n int, ctx context.Context) (int, error) { acquireC := make(chan int, 1) go func() { defer close(acquireC) @@ -41,7 +41,7 @@ func (s *sem) AcquireContext(n int, ctx context.Context) (int, error) { } } -func (s *sem) Acquire(n int) int { +func (s *Sem) Acquire(n int) int { if n <= 0 { return 0 } @@ -63,7 +63,7 @@ func (s *sem) Acquire(n int) int { return n } -func (s *sem) Release(n int) { +func (s *Sem) Release(n int) { if n <= 0 { return } @@ -75,7 +75,7 @@ func (s *sem) Release(n int) { s.cond.Signal() } -func (s *sem) Available() int { +func (s *Sem) Available() int { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/x-pack/filebeat/input/awss3/semaphore_test.go b/x-pack/libbeat/common/aws/semaphore_test.go similarity index 98% rename from x-pack/filebeat/input/awss3/semaphore_test.go rename to x-pack/libbeat/common/aws/semaphore_test.go index d71252ffc788..690371d6efae 100644 --- a/x-pack/filebeat/input/awss3/semaphore_test.go +++ b/x-pack/libbeat/common/aws/semaphore_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package awss3 +package aws import ( "sync" From 263c76a4e0f33bc4904365ef5a34497cbc39a39d Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 5 Jan 2022 11:52:40 -0700 Subject: [PATCH 02/11] fix go vet --- x-pack/filebeat/input/awss3/s3_test.go | 11 ++++------- x-pack/filebeat/input/awss3/sqs_test.go | 4 ++-- x-pack/libbeat/common/aws/semaphore_test.go | 2 +- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index b41349c1c8b5..ef39e085e1fa 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -9,18 +9,15 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/libbeat/statestore" - "github.com/elastic/beats/v7/libbeat/statestore/storetest" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" ) func TestS3Poller(t *testing.T) { @@ -135,7 +132,7 @@ func TestS3Poller(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) - assert.Equal(t, numberOfWorkers, receiver.workerSem.available) + assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) }) t.Run("retry after Poll error", func(t *testing.T) { @@ -265,6 +262,6 @@ func TestS3Poller(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) - assert.Equal(t, numberOfWorkers, receiver.workerSem.available) + assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) }) } diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index a8b6e7b5f2a1..a2414736198b 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -70,7 +70,7 @@ func TestSQSReceiver(t *testing.T) { // Execute sqsReader and verify calls/state. receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) require.NoError(t, receiver.Receive(ctx)) - assert.Equal(t, maxMessages, receiver.workerSem.available) + assert.Equal(t, maxMessages, receiver.workerSem.Available()) }) t.Run("retry after ReceiveMessage error", func(t *testing.T) { @@ -103,7 +103,7 @@ func TestSQSReceiver(t *testing.T) { // Execute SQSReceiver and verify calls/state. receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) require.NoError(t, receiver.Receive(ctx)) - assert.Equal(t, maxMessages, receiver.workerSem.available) + assert.Equal(t, maxMessages, receiver.workerSem.Available()) }) } diff --git a/x-pack/libbeat/common/aws/semaphore_test.go b/x-pack/libbeat/common/aws/semaphore_test.go index 690371d6efae..f91831ef8a0b 100644 --- a/x-pack/libbeat/common/aws/semaphore_test.go +++ b/x-pack/libbeat/common/aws/semaphore_test.go @@ -12,7 +12,7 @@ import ( ) func TestSemaphore(t *testing.T) { - s := newSem(5) + s := NewSem(5) assert.Equal(t, s.Acquire(5), 5) From c91a8a88f987490b394d1588624a8a5e50abab03 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 6 Jan 2022 12:40:11 -0700 Subject: [PATCH 03/11] add number_of_workers into doc --- x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc | 8 +++++++- x-pack/filebeat/input/awscloudwatch/config.go | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc index 71f666743879..1df0fb99a2e9 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc @@ -50,12 +50,18 @@ log_group_name is given. ==== `log_group_name_prefix` The prefix for a group of log group names. Note: `region_name` is required when log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix` -cannot be given at the same time. +cannot be given at the same time. The number of workers that will process the +log groups under this prefix is set through the `number_of_workers` config. [float] ==== `region_name` Region that the specified log group or log group prefix belongs to. +[float] +==== `number_of_workers` +Number of workers that will process the log groups with the given `log_group_name_prefix`. +Default value is 1. + [float] ==== `log_streams` A list of strings of log streams names that Filebeat collect log events from. diff --git a/x-pack/filebeat/input/awscloudwatch/config.go b/x-pack/filebeat/input/awscloudwatch/config.go index 7744b2007544..a200dc53d634 100644 --- a/x-pack/filebeat/input/awscloudwatch/config.go +++ b/x-pack/filebeat/input/awscloudwatch/config.go @@ -38,7 +38,7 @@ func defaultConfig() config { ScanFrequency: 10 * time.Second, APITimeout: 120 * time.Second, APISleep: 200 * time.Millisecond, // FilterLogEvents has a limit of 5 transactions per second (TPS)/account/Region: 1s / 5 = 200 ms - NumberOfWorkers: 5, + NumberOfWorkers: 1, } } From b03a459a7c917836ba2c91addee0a49e989ceab2 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 11 Jan 2022 12:26:38 -0700 Subject: [PATCH 04/11] add terraform file --- .../awscloudwatch/_meta/terraform/.gitignore | 3 + .../_meta/terraform/.terraform.lock.hcl | 57 +++++++++++++++++ .../awscloudwatch/_meta/terraform/README.md | 42 +++++++++++++ .../awscloudwatch/_meta/terraform/main.tf | 44 +++++++++++++ .../awscloudwatch/_meta/terraform/outputs.tf | 11 ++++ .../_meta/terraform/variables.tf | 5 ++ x-pack/filebeat/input/awscloudwatch/input.go | 62 ++++++------------- 7 files changed, 181 insertions(+), 43 deletions(-) create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/.gitignore create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf create mode 100644 x-pack/filebeat/input/awscloudwatch/_meta/terraform/variables.tf diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.gitignore b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.gitignore new file mode 100644 index 000000000000..0825744a7760 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.gitignore @@ -0,0 +1,3 @@ +terraform/ +outputs.yml +*.tfstate* diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl new file mode 100644 index 000000000000..7f6381c60af8 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl @@ -0,0 +1,57 @@ +# This file is maintained automatically by "terraform init". +# Manual edits may be lost in future updates. + +provider "registry.terraform.io/hashicorp/aws" { + version = "3.70.0" + constraints = "~> 3.52" + hashes = [ + "h1:jn4ImGMZJ9rQdaVSbcCBqUqnhRSpyaM1DivqaNuP+eg=", + "zh:0af710e528e21b930899f0ac295b0ceef8ad7b623dd8f38e92c8ec4bc7af0321", + "zh:4cabcd4519c0aae474d91ae67a8e3a4a8c39c3945c289a9cf7c1409f64409abe", + "zh:58da1a436facb4e4f95cd2870d211ed7bcb8cf721a4a61970aa8da191665f2aa", + "zh:6465339475c1cd3c16a5c8fee61304dcad2c4a27740687d29c6cdc90d2e6423d", + "zh:7a821ed053c355d70ebe33185590953fa5c364c1f3d66fe3f9b4aba3961646b1", + "zh:7c3656cc9cc1739dcb298e7930c9a76ccfce738d2070841d7e6c62fbdae74eef", + "zh:9d9da9e3c60a0c977e156da8590f36a219ae91994bb3df5a1208de2ab3ceeba7", + "zh:a3138817c86bf3e4dca7fd3a92e099cd1bf1d45ee7c7cc9e9773ba04fc3b315a", + "zh:a8603044e935dfb3cb9319a46d26276162c6aea75e02c4827232f9c6029a3182", + "zh:aef9482332bf43d0b73317f5909dec9e95b983c67b10d72e75eacc7c4f37d084", + "zh:fc3f3cad84f2eebe566dd0b65904c934093007323b9b85e73d9dd4535ceeb29d", + ] +} + +provider "registry.terraform.io/hashicorp/local" { + version = "2.1.0" + hashes = [ + "h1:KfieWtVyGWwplSoLIB5usKAUnrIkDQBkWaR5TI+4WYg=", + "zh:0f1ec65101fa35050978d483d6e8916664b7556800348456ff3d09454ac1eae2", + "zh:36e42ac19f5d68467aacf07e6adcf83c7486f2e5b5f4339e9671f68525fc87ab", + "zh:6db9db2a1819e77b1642ec3b5e95042b202aee8151a0256d289f2e141bf3ceb3", + "zh:719dfd97bb9ddce99f7d741260b8ece2682b363735c764cac83303f02386075a", + "zh:7598bb86e0378fd97eaa04638c1a4c75f960f62f69d3662e6d80ffa5a89847fe", + "zh:ad0a188b52517fec9eca393f1e2c9daea362b33ae2eb38a857b6b09949a727c1", + "zh:c46846c8df66a13fee6eff7dc5d528a7f868ae0dcf92d79deaac73cc297ed20c", + "zh:dc1a20a2eec12095d04bf6da5321f535351a594a636912361db20eb2a707ccc4", + "zh:e57ab4771a9d999401f6badd8b018558357d3cbdf3d33cc0c4f83e818ca8e94b", + "zh:ebdcde208072b4b0f8d305ebf2bfdc62c926e0717599dcf8ec2fd8c5845031c3", + "zh:ef34c52b68933bedd0868a13ccfd59ff1c820f299760b3c02e008dc95e2ece91", + ] +} + +provider "registry.terraform.io/hashicorp/random" { + version = "3.1.0" + hashes = [ + "h1:rKYu5ZUbXwrLG1w81k7H3nce/Ys6yAxXhWcbtk36HjY=", + "zh:2bbb3339f0643b5daa07480ef4397bd23a79963cc364cdfbb4e86354cb7725bc", + "zh:3cd456047805bf639fbf2c761b1848880ea703a054f76db51852008b11008626", + "zh:4f251b0eda5bb5e3dc26ea4400dba200018213654b69b4a5f96abee815b4f5ff", + "zh:7011332745ea061e517fe1319bd6c75054a314155cb2c1199a5b01fe1889a7e2", + "zh:738ed82858317ccc246691c8b85995bc125ac3b4143043219bd0437adc56c992", + "zh:7dbe52fac7bb21227acd7529b487511c91f4107db9cc4414f50d04ffc3cab427", + "zh:a3a9251fb15f93e4cfc1789800fc2d7414bbc18944ad4c5c98f466e6477c42bc", + "zh:a543ec1a3a8c20635cf374110bd2f87c07374cf2c50617eee2c669b3ceeeaa9f", + "zh:d9ab41d556a48bd7059f0810cf020500635bfc696c9fc3adab5ea8915c1d886b", + "zh:d9e13427a7d011dbd654e591b0337e6074eef8c3b9bb11b2e39eaaf257044fd7", + "zh:f7605bd1437752114baf601bdf6931debe6dc6bfe3006eb7e9bb9080931dca8a", + ] +} diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md new file mode 100644 index 000000000000..2f1add174e93 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md @@ -0,0 +1,42 @@ +# Terraform setup for AWS CloudWatch Input Integration Tests + +This directory contains a Terraform module that creates the AWS resources needed +for executing the integration tests for the `aws-cloudwatch` Filebeat input. It +creates two CloudWatch log groups, and one log stream under each log group. + +It outputs configuration information that is consumed by the tests to +`outputs.yml`. The AWS resources are randomly named to prevent name collisions +between multiple users. + +### Usage + +You must have the appropriate AWS environment variables for authentication set +before running Terraform or the integration tests. The AWS key must be +authorized to create and destroy AWS CloudWatch log groups. + +1. Execute terraform in this directory to create the resources. This will also + write the `outputs.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order + to match the AWS region of the profile you are using. + + `terraform apply` + + +2. (Optional) View the output configuration. + + ```yaml + "aws_region": "us-east-1" + "log_group_name_1": "filebeat-cloudwatch-integtest-1-417koa" + "log_group_name_2": "filebeat-cloudwatch-integtest-2-417koa" + ``` + +3. Execute the integration test. + + ``` + cd x-pack/filebeat/input/awss3 + go test -tags aws,integration -run TestInputRun.+ -v . + ``` + +4. Cleanup AWS resources. Execute terraform to delete the log groups created for +testing. + + `terraform destroy` diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf new file mode 100644 index 000000000000..bb3b24593028 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf @@ -0,0 +1,44 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 3.52" + } + } +} + +provider "aws" { + region = var.aws_region +} + +resource "random_string" "random" { + length = 6 + special = false + upper = false +} + +resource "aws_cloudwatch_log_group" "filebeat-integtest-1" { + name = "filebeat-log-group-integtest-1-${random_string.random.result}" + + tags = { + Environment = "test" + } +} + +resource "aws_cloudwatch_log_group" "filebeat-integtest-2" { + name = "filebeat-log-group-integtest-2-${random_string.random.result}" + + tags = { + Environment = "test" + } +} + +resource "aws_cloudwatch_log_stream" "filebeat-integtest-1" { + name = "filebeat-log-stream-integtest-1-${random_string.random.result}" + log_group_name = aws_cloudwatch_log_group.filebeat-integtest-1.name +} + +resource "aws_cloudwatch_log_stream" "filebeat-integtest-2" { + name = "filebeat-log-stream-integtest-2-${random_string.random.result}" + log_group_name = aws_cloudwatch_log_group.filebeat-integtest-2.name +} diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf new file mode 100644 index 000000000000..1641340f88cd --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf @@ -0,0 +1,11 @@ +resource "local_file" "secrets" { + content = yamlencode({ + "log_group_1" : aws_cloudwatch_log_group.filebeat-integtest-1.name + "log_group_2" : aws_cloudwatch_log_group.filebeat-integtest-2.name + "log_stream_1" : aws_cloudwatch_log_stream.filebeat-integtest-1.name + "log_stream_2" : aws_cloudwatch_log_stream.filebeat-integtest-2.name + "aws_region" : var.aws_region + }) + filename = "${path.module}/outputs.yml" + file_permission = "0644" +} diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/variables.tf b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/variables.tf new file mode 100644 index 000000000000..2c4fb00786bc --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/variables.tf @@ -0,0 +1,5 @@ +variable "aws_region" { + description = "AWS Region" + type = string + default = "us-east-1" +} diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 651b0a88865b..fee04ae0d3b4 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -51,11 +51,8 @@ type awsCloudWatchInput struct { outlet channel.Outleter // Output of received aws-cloudwatch logs. inputCtx context.Context - workerWg sync.WaitGroup // Waits on aws-cloudwatch worker goroutine. - workerCtx context.Context // Worker goroutine context. It's cancelled when the input stops or the worker exits. - workerCancel context.CancelFunc // Used to signal that the worker should stop. - stopOnce sync.Once - close chan struct{} + stopOnce sync.Once + close chan struct{} startTime int64 endTime int64 @@ -63,23 +60,6 @@ type awsCloudWatchInput struct { workerSem *awscommon.Sem } -// channelContext implements context.Context by wrapping a channel -type channelContext struct { - done <-chan struct{} -} - -func (c *channelContext) Deadline() (time.Time, bool) { return time.Time{}, false } -func (c *channelContext) Done() <-chan struct{} { return c.done } -func (c *channelContext) Err() error { - select { - case <-c.done: - return context.Canceled - default: - return nil - } -} -func (c *channelContext) Value(key interface{}) interface{} { return nil } - // NewInput creates a new aws-cloudwatch input func NewInput(cfg *common.Config, connector channel.Connector, ctx input.Context) (input.Input, error) { cfgwarn.Beta("aws-cloudwatch input type is used") @@ -126,20 +106,18 @@ func NewInput(cfg *common.Config, connector channel.Connector, ctx input.Context // If the input ever needs to be made restartable, then context would need // to be recreated with each restart. - workerCtx, workerCancel := context.WithCancel(inputCtx) + // workerCtx, workerCancel := context.WithCancel(inputCtx) in := &awsCloudWatchInput{ - config: config, - awsConfig: awsConfig, - logger: logger, - close: closeChannel, - inputCtx: inputCtx, - workerCtx: workerCtx, - workerCancel: workerCancel, - startTime: int64(0), - endTime: int64(0), - prevEndTime: int64(0), - workerSem: awscommon.NewSem(config.NumberOfWorkers), + config: config, + awsConfig: awsConfig, + logger: logger, + close: closeChannel, + inputCtx: inputCtx, + startTime: int64(0), + endTime: int64(0), + prevEndTime: int64(0), + workerSem: awscommon.NewSem(config.NumberOfWorkers), } // Build outlet for events. @@ -170,6 +148,8 @@ func (in *awsCloudWatchInput) Run() { // listing, sequentially processes every object and then does another listing logGroupCount := 0 start := true + workerWg := new(sync.WaitGroup) + for in.inputCtx.Err() == nil { if logGroupCount == 0 { currentTime := time.Now() @@ -204,13 +184,12 @@ func (in *awsCloudWatchInput) Run() { } lg := logGroupNames[logGroupCount] - in.workerWg.Add(1) + workerWg.Add(1) go func(logGroup string, startTime int64, endTime int64) { defer func() { in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) - in.workerWg.Done() + workerWg.Done() in.workerSem.Release(1) - in.workerCancel() }() in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) in.run(svc, logGroup, startTime, endTime) @@ -219,20 +198,18 @@ func (in *awsCloudWatchInput) Run() { } } // Wait for all workers to finish. - in.workerWg.Wait() + workerWg.Wait() } -func (in *awsCloudWatchInput) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64) error { +func (in *awsCloudWatchInput) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64) { err := in.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime) if err != nil { var err *awssdk.RequestCanceledError if errors.As(err, &err) { - return err + in.logger.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) } in.logger.Error("getLogEventsFromCloudWatch failed: ", err) - return err } - return nil } func parseARN(logGroupARN string) (string, string, error) { @@ -408,5 +385,4 @@ func (in *awsCloudWatchInput) Stop() { // Wait is an alias for Stop. func (in *awsCloudWatchInput) Wait() { in.Stop() - in.workerWg.Wait() } From 0411a2fc7f007c677f5f5ce8d727ddba3abe7222 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 11 Jan 2022 15:50:03 -0700 Subject: [PATCH 05/11] move to use Filebeat V2 --- x-pack/filebeat/input/awscloudwatch/config.go | 2 +- x-pack/filebeat/input/awscloudwatch/input.go | 317 ++++++++++-------- .../input/awscloudwatch/input_test.go | 9 - x-pack/filebeat/input/awss3/input.go | 2 +- x-pack/filebeat/input/awss3/s3_objects.go | 7 +- .../input/default-inputs/inputs_other.go | 2 + .../awss3 => libbeat/common/aws}/acker.go | 22 +- .../common/aws}/acker_test.go | 4 +- 8 files changed, 202 insertions(+), 163 deletions(-) rename x-pack/{filebeat/input/awss3 => libbeat/common/aws}/acker.go (80%) rename x-pack/{filebeat/input/awss3 => libbeat/common/aws}/acker_test.go (97%) diff --git a/x-pack/filebeat/input/awscloudwatch/config.go b/x-pack/filebeat/input/awscloudwatch/config.go index a200dc53d634..0d8a225866c0 100644 --- a/x-pack/filebeat/input/awscloudwatch/config.go +++ b/x-pack/filebeat/input/awscloudwatch/config.go @@ -26,7 +26,7 @@ type config struct { APISleep time.Duration `config:"api_sleep" validate:"min=0,nonzero"` Latency time.Duration `config:"latency"` NumberOfWorkers int `config:"number_of_workers"` - AwsConfig awscommon.ConfigAWS `config:",inline"` + AWSConfig awscommon.ConfigAWS `config:",inline"` } func defaultConfig() config { diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index fee04ae0d3b4..88d3e04e8049 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -6,18 +6,23 @@ package awscloudwatch import ( "context" + "fmt" "strings" "sync" "time" + "github.com/elastic/beats/v7/filebeat/beater" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/go-concert/unison" + awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/arn" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" "github.com/pkg/errors" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" @@ -26,54 +31,99 @@ import ( ) const ( - inputName = "aws-cloudwatch" - oldInputName = "awscloudwatch" + inputName = "aws-cloudwatch" ) -func init() { - err := input.Register(inputName, NewInput) - if err != nil { - panic(errors.Wrapf(err, "failed to register %v input", inputName)) +func Plugin(store beater.StateStore) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Stable, + Deprecated: false, + Info: "Collect logs from cloudwatch", + Manager: &cloudwatchInputManager{store: store}, } +} - err = input.Register(oldInputName, NewInput) - if err != nil { - panic(errors.Wrapf(err, "failed to register %v input", oldInputName)) +type cloudwatchInputManager struct { + store beater.StateStore +} + +func (im *cloudwatchInputManager) Init(grp unison.Group, mode v2.Mode) error { + return nil +} + +func (im *cloudwatchInputManager) Create(cfg *common.Config) (v2.Input, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, err } + + return newInput(config, im.store) } -// awsCloudWatchInput is an input for AWS CloudWatch logs -type awsCloudWatchInput struct { +// cloudwatchInput is an input for reading logs from CloudWatch periodically. +type cloudwatchInput struct { config config awsConfig awssdk.Config + store beater.StateStore +} - logger *logp.Logger - outlet channel.Outleter // Output of received aws-cloudwatch logs. - inputCtx context.Context - - stopOnce sync.Once - close chan struct{} +type cloudwatchPoller struct { + numberOfWorkers int + apiSleep time.Duration + region string + logStreams []string + logStreamPrefix string + startTime int64 + endTime int64 + prevEndTime int64 + workerSem *awscommon.Sem + log *logp.Logger + store *statestore.Store + workersListingMap *sync.Map + workersProcessingMap *sync.Map +} - startTime int64 - endTime int64 - prevEndTime int64 // track previous endTime for each iteration. - workerSem *awscommon.Sem +type logProcessor struct { + log *logp.Logger + publisher beat.Client + ack *awscommon.EventACKTracker } -// NewInput creates a new aws-cloudwatch input -func NewInput(cfg *common.Config, connector channel.Connector, ctx input.Context) (input.Input, error) { - cfgwarn.Beta("aws-cloudwatch input type is used") - logger := logp.NewLogger(inputName) +func newLogProcessor(log *logp.Logger, publisher beat.Client, ctx context.Context) *logProcessor { + return &logProcessor{ + log: log, + publisher: publisher, + ack: awscommon.NewEventACKTracker(ctx), + } +} - // Extract and validate the input's configuration. - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { - return nil, errors.Wrap(err, "failed unpacking config") +func newCloudwatchPoller(log *logp.Logger, + store *statestore.Store, + awsRegion string, apiSleep time.Duration, + numberOfWorkers int, logStreams []string, logStreamPrefix string) *cloudwatchPoller { + return &cloudwatchPoller{ + numberOfWorkers: numberOfWorkers, + apiSleep: apiSleep, + region: awsRegion, + logStreams: logStreams, + logStreamPrefix: logStreamPrefix, + startTime: int64(0), + endTime: int64(0), + prevEndTime: int64(0), + workerSem: awscommon.NewSem(numberOfWorkers), + log: log, + store: store, + workersListingMap: new(sync.Map), + workersProcessingMap: new(sync.Map), } - logger.Debug("aws-cloudwatch input config = ", config) +} - if config.Type == oldInputName { - logger.Warnf("%s input name is deprecated, please use %s instead", oldInputName, inputName) +func newInput(config config, store beater.StateStore) (*cloudwatchInput, error) { + cfgwarn.Beta("aws-cloudwatch input type is used") + awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } if config.LogGroupARN != "" { @@ -86,61 +136,64 @@ func NewInput(cfg *common.Config, connector channel.Connector, ctx input.Context config.RegionName = regionName } - awsConfig, err := awscommon.InitializeAWSConfig(config.AwsConfig) + awsConfig, err = awscommon.InitializeAWSConfig(config.AWSConfig) if err != nil { return nil, errors.Wrap(err, "InitializeAWSConfig failed") } awsConfig.Region = config.RegionName - closeChannel := make(chan struct{}) - // Wrap input.Context's Done channel with a context.Context. This goroutine - // stops with the parent closes the Done channel. - inputCtx, cancelInputCtx := context.WithCancel(context.Background()) + return &cloudwatchInput{ + config: config, + awsConfig: awsConfig, + store: store, + }, nil +} + +func (in *cloudwatchInput) Name() string { return inputName } + +func (in *cloudwatchInput) Test(ctx v2.TestContext) error { + return nil +} + +func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) error { + var err error + + persistentStore, err := in.store.Access() + if err != nil { + return fmt.Errorf("can not access persistent store: %w", err) + } + + defer persistentStore.Close() + + // Wrap input Context's cancellation Done channel a context.Context. This + // goroutine stops with the parent closes the Done channel. + ctx, cancelInputCtx := context.WithCancel(context.Background()) go func() { defer cancelInputCtx() select { - case <-ctx.Done: - case <-inputCtx.Done(): + case <-inputContext.Cancelation.Done(): + case <-ctx.Done(): } }() + defer cancelInputCtx() - // If the input ever needs to be made restartable, then context would need - // to be recreated with each restart. - // workerCtx, workerCancel := context.WithCancel(inputCtx) - - in := &awsCloudWatchInput{ - config: config, - awsConfig: awsConfig, - logger: logger, - close: closeChannel, - inputCtx: inputCtx, - startTime: int64(0), - endTime: int64(0), - prevEndTime: int64(0), - workerSem: awscommon.NewSem(config.NumberOfWorkers), - } - - // Build outlet for events. - in.outlet, err = connector.Connect(cfg) + // Create client for publishing events and receive notification of their ACKs. + client, err := pipeline.ConnectWith(beat.ClientConfig{ + CloseRef: inputContext.Cancelation, + ACKHandler: awscommon.NewEventACKHandler(), + }) if err != nil { - return nil, err + return fmt.Errorf("failed to create pipeline client: %w", err) } + defer client.Close() - in.logger.Info("Initialized AWS CloudWatch input.") - return in, nil -} - -// Run runs the input -func (in *awsCloudWatchInput) Run() { - // Please see https://docs.aws.amazon.com/general/latest/gr/cwl_region.html for more info on Amazon CloudWatch Logs endpoints. - logsServiceName := awscommon.CreateServiceName("logs", in.config.AwsConfig.FIPSEnabled, in.config.RegionName) - cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AwsConfig.Endpoint, logsServiceName, in.config.RegionName, in.awsConfig) + logsServiceName := awscommon.CreateServiceName("logs", in.config.AWSConfig.FIPSEnabled, in.config.RegionName) + cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, logsServiceName, in.config.RegionName, in.awsConfig) svc := cloudwatchlogs.New(cwConfig) - logGroupNames, err := in.getLogGroupNames(svc) + logGroupNames, err := getLogGroupNames(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName) if err != nil { - in.logger.Error("getLogGroupNames failed: ", err) - return + return fmt.Errorf("failed to get log group names: %w", err) } // This loop tries to keep the workers busy as much as possible while @@ -149,24 +202,34 @@ func (in *awsCloudWatchInput) Run() { logGroupCount := 0 start := true workerWg := new(sync.WaitGroup) - - for in.inputCtx.Err() == nil { + log := inputContext.Logger.With("aws-cloudwatch") + cwPoller := newCloudwatchPoller( + log.Named("cloudwatch_poller"), + persistentStore, + in.awsConfig.Region, + in.config.APISleep, + in.config.NumberOfWorkers, + in.config.LogStreams, + in.config.LogStreamPrefix) + logProcessor := newLogProcessor(log.Named("log_processor"), client, ctx) + + for ctx.Err() == nil { if logGroupCount == 0 { currentTime := time.Now() - in.startTime, in.endTime = getStartPosition(in.config.StartPosition, currentTime, in.prevEndTime, in.config.ScanFrequency, in.config.Latency) - in.logger.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(in.startTime/1000, 0), time.Unix(in.endTime/1000, 0)) - in.prevEndTime = in.endTime + cwPoller.startTime, cwPoller.endTime = getStartPosition(in.config.StartPosition, currentTime, cwPoller.prevEndTime, in.config.ScanFrequency, in.config.Latency) + cwPoller.log.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(cwPoller.startTime/1000, 0), time.Unix(cwPoller.endTime/1000, 0)) + cwPoller.prevEndTime = cwPoller.endTime } if start == false { - in.logger.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) + cwPoller.log.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) time.Sleep(in.config.ScanFrequency) - in.logger.Debug("done sleeping") + cwPoller.log.Debug("done sleeping") } start = false // Determine how many workers are available. - availableWorkers, err := in.workerSem.AcquireContext(in.config.NumberOfWorkers, in.inputCtx) + availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx) if err != nil { break } @@ -187,28 +250,33 @@ func (in *awsCloudWatchInput) Run() { workerWg.Add(1) go func(logGroup string, startTime int64, endTime int64) { defer func() { - in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) + cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) workerWg.Done() - in.workerSem.Release(1) + cwPoller.workerSem.Release(1) }() - in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) - in.run(svc, logGroup, startTime, endTime) - }(lg, in.startTime, in.endTime) + cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) + cwPoller.run(svc, logGroup, startTime, endTime, logProcessor) + }(lg, cwPoller.startTime, cwPoller.endTime) logGroupCount++ } } // Wait for all workers to finish. workerWg.Wait() + if errors.Is(ctx.Err(), context.Canceled) { + // A canceled context is a normal shutdown. + return nil + } + return ctx.Err() } -func (in *awsCloudWatchInput) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64) { - err := in.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime) +func (p *cloudwatchPoller) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) { + err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) if err != nil { var err *awssdk.RequestCanceledError if errors.As(err, &err) { - in.logger.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) + p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) } - in.logger.Error("getLogEventsFromCloudWatch failed: ", err) + p.log.Error("getLogEventsFromCloudWatch failed: ", err) } } @@ -228,14 +296,14 @@ func parseARN(logGroupARN string) (string, string, error) { } // getLogGroupNames uses DescribeLogGroups API to retrieve all log group names -func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI) ([]string, error) { - if in.config.LogGroupNamePrefix == "" { - return []string{in.config.LogGroupName}, nil +func getLogGroupNames(svc cloudwatchlogsiface.ClientAPI, logGroupNamePrefix string, logGroupName string) ([]string, error) { + if logGroupNamePrefix == "" { + return []string{logGroupName}, nil } // construct DescribeLogGroupsInput filterLogEventsInput := &cloudwatchlogs.DescribeLogGroupsInput{ - LogGroupNamePrefix: awssdk.String(in.config.LogGroupNamePrefix), + LogGroupNamePrefix: awssdk.String(logGroupNamePrefix), } // make API request @@ -244,23 +312,21 @@ func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI var logGroupNames []string for p.Next(context.TODO()) { page := p.CurrentPage() - in.logger.Debugf("Collecting #%v log group names", len(page.LogGroups)) for _, lg := range page.LogGroups { logGroupNames = append(logGroupNames, *lg.LogGroupName) } } if err := p.Err(); err != nil { - in.logger.Error("failed DescribeLogGroupsRequest: ", err) return logGroupNames, err } return logGroupNames, nil } // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch -func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64) error { +func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { // construct FilterLogEventsInput - filterLogEventsInput := in.constructFilterLogEventsInput(startTime, endTime, logGroup) + filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) // make API request req := svc.FilterLogEventsRequest(filterLogEventsInput) @@ -269,11 +335,11 @@ func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface page := paginator.CurrentPage() logEvents := page.Events - in.logger.Debugf("Processing #%v events", len(logEvents)) - err := in.processLogEvents(logEvents, logGroup) + p.log.Debugf("Processing #%v events", len(logEvents)) + err := logProcessor.processLogEvents(logEvents, logGroup, p.region) if err != nil { err = errors.Wrap(err, "processLogEvents failed") - in.logger.Error(err) + p.log.Error(err) } } @@ -282,25 +348,25 @@ func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface } // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). - in.logger.Debugf("sleeping for %v before making FilterLogEvents API call again", in.config.APISleep) - time.Sleep(in.config.APISleep) - in.logger.Debug("done sleeping") + p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) + time.Sleep(p.apiSleep) + p.log.Debug("done sleeping") return nil } -func (in *awsCloudWatchInput) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { +func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ LogGroupName: awssdk.String(logGroup), StartTime: awssdk.Int64(startTime), EndTime: awssdk.Int64(endTime), } - if len(in.config.LogStreams) > 0 { - filterLogEventsInput.LogStreamNames = in.config.LogStreams + if len(p.logStreams) > 0 { + filterLogEventsInput.LogStreamNames = p.logStreams } - if in.config.LogStreamPrefix != "" { - filterLogEventsInput.LogStreamNamePrefix = awssdk.String(in.config.LogStreamPrefix) + if p.logStreamPrefix != "" { + filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix) } return filterLogEventsInput } @@ -326,15 +392,10 @@ func getStartPosition(startPosition string, currentTime time.Time, prevEndTime i return } -func (in *awsCloudWatchInput) processLogEvents(logEvents []cloudwatchlogs.FilteredLogEvent, logGroup string) error { +func (p *logProcessor) processLogEvents(logEvents []cloudwatchlogs.FilteredLogEvent, logGroup string, regionName string) error { for _, logEvent := range logEvents { - event := createEvent(logEvent, logGroup, in.config.RegionName) - err := in.forwardEvent(event) - if err != nil { - err = errors.Wrap(err, "forwardEvent failed") - in.logger.Error(err) - return err - } + event := createEvent(logEvent, logGroup, regionName) + p.publish(p.ack, &event) } return nil } @@ -365,24 +426,8 @@ func createEvent(logEvent cloudwatchlogs.FilteredLogEvent, logGroup string, regi return event } -func (in *awsCloudWatchInput) forwardEvent(event beat.Event) error { - ok := in.outlet.OnEvent(event) - if !ok { - return errors.New("OnEvent returned false. Stopping input worker") - } - return nil -} - -// Stop stops the aws-cloudwatch input -func (in *awsCloudWatchInput) Stop() { - in.stopOnce.Do(func() { - defer in.outlet.Close() - close(in.close) - in.logger.Info("Stopping aws-cloudwatch input") - }) -} - -// Wait is an alias for Stop. -func (in *awsCloudWatchInput) Wait() { - in.Stop() +func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { + ack.Add() + event.Private = ack + p.publisher.Publish(*event) } diff --git a/x-pack/filebeat/input/awscloudwatch/input_test.go b/x-pack/filebeat/input/awscloudwatch/input_test.go index 7d8b45f7d443..c094a1cddb5a 100644 --- a/x-pack/filebeat/input/awscloudwatch/input_test.go +++ b/x-pack/filebeat/input/awscloudwatch/input_test.go @@ -15,7 +15,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" - "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/common" ) @@ -197,11 +196,3 @@ func TestParseARN(t *testing.T) { assert.Equal(t, "us-east-1", regionName) assert.NoError(t, err) } - -func TestNewInputDone(t *testing.T) { - config := common.MapStr{ - "log_group_name": "some-group", - "region_name": "eu-west-1", - } - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) -} diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 8d673cabbac4..e1558b552a06 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -111,7 +111,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { // Create client for publishing events and receive notification of their ACKs. client, err := pipeline.ConnectWith(beat.ClientConfig{ CloseRef: inputContext.Cancelation, - ACKHandler: newEventACKHandler(), + ACKHandler: awscommon.NewEventACKHandler(), }) if err != nil { return fmt.Errorf("failed to create pipeline client: %w", err) diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 7fe6b193fa45..7a5fdbd50103 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -13,6 +13,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "io" "io/ioutil" "net/http" @@ -74,7 +75,7 @@ func (f *s3ObjectProcessorFactory) findReaderConfig(key string) *readerConfig { // Create returns a new s3ObjectProcessor. It returns nil when no file selectors // match the S3 object key. -func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, ack *eventACKTracker, obj s3EventV2) s3ObjectHandler { +func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, ack *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler { log = log.With( "bucket_arn", obj.S3.Bucket.Name, "object_key", obj.S3.Object.Key) @@ -101,7 +102,7 @@ type s3ObjectProcessor struct { log *logp.Logger ctx context.Context - acker *eventACKTracker // ACKer tied to the SQS message (multiple S3 readers share an ACKer when the S3 notification event contains more than one S3 object). + acker *awscommon.EventACKTracker // ACKer tied to the SQS message (multiple S3 readers share an ACKer when the S3 notification event contains more than one S3 object). readerConfig *readerConfig // Config about how to process the object. s3Obj s3EventV2 // S3 object information. s3ObjHash string @@ -314,7 +315,7 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { return nil } -func (p *s3ObjectProcessor) publish(ack *eventACKTracker, event *beat.Event) { +func (p *s3ObjectProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { ack.Add() event.Private = ack p.metrics.s3EventsCreatedTotal.Inc() diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index c31106c3baa2..5a268fa47584 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -12,6 +12,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" @@ -26,5 +27,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 httpjson.Plugin(log, store), o365audit.Plugin(log, store), awss3.Plugin(store), + awscloudwatch.Plugin(store), } } diff --git a/x-pack/filebeat/input/awss3/acker.go b/x-pack/libbeat/common/aws/acker.go similarity index 80% rename from x-pack/filebeat/input/awss3/acker.go rename to x-pack/libbeat/common/aws/acker.go index db88c23f7d18..30dcdbd02401 100644 --- a/x-pack/filebeat/input/awss3/acker.go +++ b/x-pack/libbeat/common/aws/acker.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package awss3 +package aws import ( "context" @@ -12,31 +12,31 @@ import ( "github.com/elastic/beats/v7/libbeat/common/acker" ) -// eventACKTracker tracks the publishing state of S3 objects. Specifically +// EventACKTracker tracks the publishing state of S3 objects. Specifically // it tracks the number of message acknowledgements that are pending from the // output. It can be used to wait until all ACKs have been received for one or // more S3 objects. -type eventACKTracker struct { +type EventACKTracker struct { sync.Mutex pendingACKs int64 ctx context.Context cancel context.CancelFunc } -func newEventACKTracker(ctx context.Context) *eventACKTracker { +func NewEventACKTracker(ctx context.Context) *EventACKTracker { ctx, cancel := context.WithCancel(ctx) - return &eventACKTracker{ctx: ctx, cancel: cancel} + return &EventACKTracker{ctx: ctx, cancel: cancel} } // Add increments the number of pending ACKs. -func (a *eventACKTracker) Add() { +func (a *EventACKTracker) Add() { a.Lock() a.pendingACKs++ a.Unlock() } // ACK decrements the number of pending ACKs. -func (a *eventACKTracker) ACK() { +func (a *EventACKTracker) ACK() { a.Lock() defer a.Unlock() @@ -55,7 +55,7 @@ func (a *eventACKTracker) ACK() { // `Add` calls are made. Failing to do so could reset the pendingACKs // property to 0 and would results in Wait returning after additional // calls to `Add` are made without a corresponding `ACK` call. -func (a *eventACKTracker) Wait() { +func (a *EventACKTracker) Wait() { // If there were never any pending ACKs then cancel the context. (This can // happen when a document contains no events or cannot be read due to an error). a.Lock() @@ -68,15 +68,15 @@ func (a *eventACKTracker) Wait() { <-a.ctx.Done() } -// newEventACKHandler returns a beat ACKer that can receive callbacks when +// NewEventACKHandler returns a beat ACKer that can receive callbacks when // an event has been ACKed an output. If the event contains a private metadata // pointing to an eventACKTracker then it will invoke the trackers ACK() method // to decrement the number of pending ACKs. -func newEventACKHandler() beat.ACKer { +func NewEventACKHandler() beat.ACKer { return acker.ConnectionOnly( acker.EventPrivateReporter(func(_ int, privates []interface{}) { for _, private := range privates { - if ack, ok := private.(*eventACKTracker); ok { + if ack, ok := private.(*EventACKTracker); ok { ack.ACK() } } diff --git a/x-pack/filebeat/input/awss3/acker_test.go b/x-pack/libbeat/common/aws/acker_test.go similarity index 97% rename from x-pack/filebeat/input/awss3/acker_test.go rename to x-pack/libbeat/common/aws/acker_test.go index a038e8a39e44..d9c956bf4106 100644 --- a/x-pack/filebeat/input/awss3/acker_test.go +++ b/x-pack/libbeat/common/aws/acker_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package awss3 +package aws import ( "context" @@ -45,7 +45,7 @@ func TestEventACKHandler(t *testing.T) { acker.Add() // Create an ACK handler and simulate one ACKed event. - ackHandler := newEventACKHandler() + ackHandler := NewEventACKHandler() ackHandler.AddEvent(beat.Event{Private: acker}, true) ackHandler.ACKEvents(1) From 970dab90ebd833330c7d8235906a85d0a91d0455 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 11 Jan 2022 21:56:13 -0700 Subject: [PATCH 06/11] continue refactoring using filebeat/input/v2 --- x-pack/filebeat/input/awscloudwatch/input.go | 61 ++++++++------------ x-pack/filebeat/input/awss3/interfaces.go | 4 +- x-pack/filebeat/input/awss3/s3.go | 2 +- x-pack/filebeat/input/awss3/s3_objects.go | 7 ++- x-pack/filebeat/input/awss3/sqs_s3_event.go | 4 +- 5 files changed, 34 insertions(+), 44 deletions(-) diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 88d3e04e8049..413a1e709084 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -110,7 +110,6 @@ func newCloudwatchPoller(log *logp.Logger, logStreamPrefix: logStreamPrefix, startTime: int64(0), endTime: int64(0), - prevEndTime: int64(0), workerSem: awscommon.NewSem(numberOfWorkers), log: log, store: store, @@ -196,12 +195,6 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) return fmt.Errorf("failed to get log group names: %w", err) } - // This loop tries to keep the workers busy as much as possible while - // honoring the number in config opposed to a simpler loop that does one - // listing, sequentially processes every object and then does another listing - logGroupCount := 0 - start := true - workerWg := new(sync.WaitGroup) log := inputContext.Logger.With("aws-cloudwatch") cwPoller := newCloudwatchPoller( log.Named("cloudwatch_poller"), @@ -212,15 +205,16 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) in.config.LogStreams, in.config.LogStreamPrefix) logProcessor := newLogProcessor(log.Named("log_processor"), client, ctx) + return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames) +} +func (in *cloudwatchInput) Receive(svc cloudwatchlogsiface.ClientAPI, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupNames []string) error { + // This loop tries to keep the workers busy as much as possible while + // honoring the number in config opposed to a simpler loop that does one + // listing, sequentially processes every object and then does another listing + start := true + workerWg := new(sync.WaitGroup) for ctx.Err() == nil { - if logGroupCount == 0 { - currentTime := time.Now() - cwPoller.startTime, cwPoller.endTime = getStartPosition(in.config.StartPosition, currentTime, cwPoller.prevEndTime, in.config.ScanFrequency, in.config.Latency) - cwPoller.log.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(cwPoller.startTime/1000, 0), time.Unix(cwPoller.endTime/1000, 0)) - cwPoller.prevEndTime = cwPoller.endTime - } - if start == false { cwPoller.log.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) time.Sleep(in.config.ScanFrequency) @@ -228,25 +222,16 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) } start = false - // Determine how many workers are available. - availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx) - if err != nil { - break - } - - if availableWorkers == 0 { - continue - } - - // Process each log group name asynchronously with a goroutine. - for i := 0; i < in.config.NumberOfWorkers; i++ { - if logGroupCount >= len(logGroupNames) { - // reset logGroupCount - logGroupCount = 0 - break + currentTime := time.Now() + cwPoller.startTime, cwPoller.endTime = getStartPosition(in.config.StartPosition, currentTime, cwPoller.endTime, in.config.ScanFrequency, in.config.Latency) + cwPoller.log.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(cwPoller.startTime/1000, 0), time.Unix(cwPoller.endTime/1000, 0)) + for _, lg := range logGroupNames { + // Determine how many workers are available. + availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx) + if err != nil || availableWorkers == 0 { + continue } - lg := logGroupNames[logGroupCount] workerWg.Add(1) go func(logGroup string, startTime int64, endTime int64) { defer func() { @@ -257,9 +242,9 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) cwPoller.run(svc, logGroup, startTime, endTime, logProcessor) }(lg, cwPoller.startTime, cwPoller.endTime) - logGroupCount++ } } + // Wait for all workers to finish. workerWg.Wait() if errors.Is(ctx.Err(), context.Canceled) { @@ -371,7 +356,7 @@ func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTim return filterLogEventsInput } -func getStartPosition(startPosition string, currentTime time.Time, prevEndTime int64, scanFrequency time.Duration, latency time.Duration) (startTime int64, endTime int64) { +func getStartPosition(startPosition string, currentTime time.Time, endTime int64, scanFrequency time.Duration, latency time.Duration) (int64, int64) { if latency != 0 { // add latency if config is not 0 currentTime = currentTime.Add(latency * -1) @@ -379,17 +364,17 @@ func getStartPosition(startPosition string, currentTime time.Time, prevEndTime i switch startPosition { case "beginning": - if prevEndTime != int64(0) { - return prevEndTime, currentTime.UnixNano() / int64(time.Millisecond) + if endTime != int64(0) { + return endTime, currentTime.UnixNano() / int64(time.Millisecond) } return 0, currentTime.UnixNano() / int64(time.Millisecond) case "end": - if prevEndTime != int64(0) { - return prevEndTime, currentTime.UnixNano() / int64(time.Millisecond) + if endTime != int64(0) { + return endTime, currentTime.UnixNano() / int64(time.Millisecond) } return currentTime.Add(-scanFrequency).UnixNano() / int64(time.Millisecond), currentTime.UnixNano() / int64(time.Millisecond) } - return + return 0, 0 } func (p *logProcessor) processLogEvents(logEvents []cloudwatchlogs.FilteredLogEvent, logGroup string, regionName string) error { diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index c777072c6c93..1cd1dbf807b5 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -10,6 +10,8 @@ import ( "fmt" "time" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" @@ -79,7 +81,7 @@ type s3ObjectHandlerFactory interface { // Create returns a new s3ObjectHandler that can be used to process the // specified S3 object. If the handler is not configured to process the // given S3 object (based on key name) then it will return nil. - Create(ctx context.Context, log *logp.Logger, acker *eventACKTracker, obj s3EventV2) s3ObjectHandler + Create(ctx context.Context, log *logp.Logger, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler } type s3ObjectHandler interface { diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index d2ad378d6c7e..aa6e7be80127 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -192,7 +192,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- event.S3.Bucket.ARN = p.bucket event.S3.Object.Key = filename - acker := newEventACKTracker(ctx) + acker := awscommon.NewEventACKTracker(ctx) s3Processor := p.s3ObjectHandler.Create(ctx, p.log, acker, event) if s3Processor == nil { diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 7a5fdbd50103..9e3ea9657c97 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -13,7 +13,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "io" "io/ioutil" "net/http" @@ -21,6 +20,8 @@ import ( "strings" "time" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/pkg/errors" @@ -103,8 +104,8 @@ type s3ObjectProcessor struct { log *logp.Logger ctx context.Context acker *awscommon.EventACKTracker // ACKer tied to the SQS message (multiple S3 readers share an ACKer when the S3 notification event contains more than one S3 object). - readerConfig *readerConfig // Config about how to process the object. - s3Obj s3EventV2 // S3 object information. + readerConfig *readerConfig // Config about how to process the object. + s3Obj s3EventV2 // S3 object information. s3ObjHash string s3RequestURL string diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index d1865aec9cd5..c17efd87d534 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -14,6 +14,8 @@ import ( "sync" "time" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + "github.com/aws/aws-sdk-go-v2/aws/awserr" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/pkg/errors" @@ -275,7 +277,7 @@ func (p *sqsS3EventProcessor) processS3Events(ctx context.Context, log *logp.Log defer log.Debug("End processing SQS S3 event notifications.") // Wait for all events to be ACKed before proceeding. - acker := newEventACKTracker(ctx) + acker := awscommon.NewEventACKTracker(ctx) defer acker.Wait() var errs []error From 710dc4135173a907c4a2e6d67b1b021c5cfd50f4 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 11 Jan 2022 22:50:46 -0700 Subject: [PATCH 07/11] fix go vet --- .../filebeat/input/awss3/input_benchmark_test.go | 6 +++--- .../filebeat/input/awss3/mock_interfaces_test.go | 3 ++- x-pack/filebeat/input/awss3/s3_objects_test.go | 11 ++++++----- x-pack/filebeat/input/awss3/sqs_s3_event_test.go | 3 ++- x-pack/libbeat/common/aws/acker.go | 12 ++++++------ x-pack/libbeat/common/aws/acker_test.go | 16 ++++++++-------- 6 files changed, 27 insertions(+), 24 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index ecdc1756ce4a..ec7068bb733f 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -15,7 +15,6 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/dustin/go-humanize" @@ -29,6 +28,7 @@ import ( pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) const cloudtrailTestFile = "testdata/aws-cloudtrail.json.gz" @@ -172,7 +172,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR go func() { for event := range client.Channel { // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() } }() @@ -259,7 +259,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult s3API := newConstantS3(t) s3API.pagerConstant = newS3PagerConstant() client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) { - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() }) defer close(client.Channel) diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index 85c11e0fe806..d315258d1775 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -18,6 +18,7 @@ import ( gomock "github.com/golang/mock/gomock" logp "github.com/elastic/beats/v7/libbeat/logp" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) // MockSQSAPI is a mock of sqsAPI interface. @@ -451,7 +452,7 @@ func (m *MockS3ObjectHandlerFactory) EXPECT() *MockS3ObjectHandlerFactoryMockRec } // Create mocks base method. -func (m *MockS3ObjectHandlerFactory) Create(ctx context.Context, log *logp.Logger, acker *eventACKTracker, obj s3EventV2) s3ObjectHandler { +func (m *MockS3ObjectHandlerFactory) Create(ctx context.Context, log *logp.Logger, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Create", ctx, log, acker, obj) ret0, _ := ret[0].(s3ObjectHandler) diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 952fbb757dc8..4ab3edfaa4bc 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) func newS3Object(t testing.TB, filename, contentType string) (s3EventV2, *s3.GetObjectResponse) { @@ -162,7 +163,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil) - ack := newEventACKTracker(ctx) + ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.Error(t, err) assert.True(t, errors.Is(err, errFakeConnectivityFailure), "expected errFakeConnectivityFailure error") @@ -184,7 +185,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil) - ack := newEventACKTracker(ctx) + ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.Error(t, err) }) @@ -211,7 +212,7 @@ func TestS3ObjectProcessor(t *testing.T) { ) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, nil) - ack := newEventACKTracker(ctx) + ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() require.NoError(t, err) }) @@ -249,13 +250,13 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int, ) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, mockPublisher, selectors) - ack := newEventACKTracker(ctx) + ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), ack, s3Event).ProcessS3Object() if !expectErr { require.NoError(t, err) assert.Equal(t, numEvents, len(events)) - assert.EqualValues(t, numEvents, ack.pendingACKs) + assert.EqualValues(t, numEvents, ack.PendingACKs) } else { require.Error(t, err) } diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 6100dbe31191..fddfb3d0e74e 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/logp" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/go-concert/timed" ) @@ -104,7 +105,7 @@ func TestSQSS3EventProcessor(t *testing.T) { gomock.InOrder( mockS3HandlerFactory.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Do(func(ctx context.Context, _ *logp.Logger, _ *eventACKTracker, _ s3EventV2) { + Do(func(ctx context.Context, _ *logp.Logger, _ *awscommon.EventACKTracker, _ s3EventV2) { timed.Wait(ctx, 5*visibilityTimeout) }).Return(mockS3Handler), mockS3Handler.EXPECT().ProcessS3Object().Return(nil), diff --git a/x-pack/libbeat/common/aws/acker.go b/x-pack/libbeat/common/aws/acker.go index 30dcdbd02401..347347dde675 100644 --- a/x-pack/libbeat/common/aws/acker.go +++ b/x-pack/libbeat/common/aws/acker.go @@ -18,7 +18,7 @@ import ( // more S3 objects. type EventACKTracker struct { sync.Mutex - pendingACKs int64 + PendingACKs int64 ctx context.Context cancel context.CancelFunc } @@ -31,7 +31,7 @@ func NewEventACKTracker(ctx context.Context) *EventACKTracker { // Add increments the number of pending ACKs. func (a *EventACKTracker) Add() { a.Lock() - a.pendingACKs++ + a.PendingACKs++ a.Unlock() } @@ -40,12 +40,12 @@ func (a *EventACKTracker) ACK() { a.Lock() defer a.Unlock() - if a.pendingACKs <= 0 { + if a.PendingACKs <= 0 { panic("misuse detected: negative ACK counter") } - a.pendingACKs-- - if a.pendingACKs == 0 { + a.PendingACKs-- + if a.PendingACKs == 0 { a.cancel() } } @@ -59,7 +59,7 @@ func (a *EventACKTracker) Wait() { // If there were never any pending ACKs then cancel the context. (This can // happen when a document contains no events or cannot be read due to an error). a.Lock() - if a.pendingACKs == 0 { + if a.PendingACKs == 0 { a.cancel() } a.Unlock() diff --git a/x-pack/libbeat/common/aws/acker_test.go b/x-pack/libbeat/common/aws/acker_test.go index d9c956bf4106..3c470f0b922b 100644 --- a/x-pack/libbeat/common/aws/acker_test.go +++ b/x-pack/libbeat/common/aws/acker_test.go @@ -17,11 +17,11 @@ func TestEventACKTracker(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - acker := newEventACKTracker(ctx) + acker := NewEventACKTracker(ctx) acker.Add() acker.ACK() - assert.EqualValues(t, 0, acker.pendingACKs) + assert.EqualValues(t, 0, acker.PendingACKs) assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) } @@ -29,10 +29,10 @@ func TestEventACKTrackerNoACKs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - acker := newEventACKTracker(ctx) + acker := NewEventACKTracker(ctx) acker.Wait() - assert.EqualValues(t, 0, acker.pendingACKs) + assert.EqualValues(t, 0, acker.PendingACKs) assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) } @@ -41,7 +41,7 @@ func TestEventACKHandler(t *testing.T) { t.Cleanup(cancel) // Create acker. Add one pending ACK. - acker := newEventACKTracker(ctx) + acker := NewEventACKTracker(ctx) acker.Add() // Create an ACK handler and simulate one ACKed event. @@ -49,7 +49,7 @@ func TestEventACKHandler(t *testing.T) { ackHandler.AddEvent(beat.Event{Private: acker}, true) ackHandler.ACKEvents(1) - assert.EqualValues(t, 0, acker.pendingACKs) + assert.EqualValues(t, 0, acker.PendingACKs) assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) } @@ -58,12 +58,12 @@ func TestEventACKHandlerWait(t *testing.T) { t.Cleanup(cancel) // Create acker. Add one pending ACK. - acker := newEventACKTracker(ctx) + acker := NewEventACKTracker(ctx) acker.Add() acker.ACK() acker.Wait() acker.Add() - assert.EqualValues(t, 1, acker.pendingACKs) + assert.EqualValues(t, 1, acker.PendingACKs) assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) } From fad9936511f3d33f125fca3803becf83ce92b24c Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 12 Jan 2022 23:34:05 -0700 Subject: [PATCH 08/11] add integration test for aws cloudwatch input --- .../awscloudwatch/_meta/terraform/README.md | 6 +- .../awscloudwatch/_meta/terraform/outputs.tf | 8 +- x-pack/filebeat/input/awscloudwatch/input.go | 29 ++- .../awscloudwatch/input_integration_test.go | 232 ++++++++++++++++++ .../filebeat/input/awscloudwatch/metrics.go | 39 +++ .../input/awss3/input_integration_test.go | 6 +- 6 files changed, 308 insertions(+), 12 deletions(-) create mode 100644 x-pack/filebeat/input/awscloudwatch/input_integration_test.go create mode 100644 x-pack/filebeat/input/awscloudwatch/metrics.go diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md index 2f1add174e93..5d9e4707a4ab 100644 --- a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md @@ -14,7 +14,11 @@ You must have the appropriate AWS environment variables for authentication set before running Terraform or the integration tests. The AWS key must be authorized to create and destroy AWS CloudWatch log groups. -1. Execute terraform in this directory to create the resources. This will also +1. Initialize a working directory containing Terraform configuration files. + + `terraform init` + +2. Execute terraform in this directory to create the resources. This will also write the `outputs.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order to match the AWS region of the profile you are using. diff --git a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf index 1641340f88cd..09e0a07e4a97 100644 --- a/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf +++ b/x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf @@ -1,9 +1,9 @@ resource "local_file" "secrets" { content = yamlencode({ - "log_group_1" : aws_cloudwatch_log_group.filebeat-integtest-1.name - "log_group_2" : aws_cloudwatch_log_group.filebeat-integtest-2.name - "log_stream_1" : aws_cloudwatch_log_stream.filebeat-integtest-1.name - "log_stream_2" : aws_cloudwatch_log_stream.filebeat-integtest-2.name + "log_group_name_1" : aws_cloudwatch_log_group.filebeat-integtest-1.name + "log_group_name_2" : aws_cloudwatch_log_group.filebeat-integtest-2.name + "log_stream_name_1" : aws_cloudwatch_log_stream.filebeat-integtest-1.name + "log_stream_name_2" : aws_cloudwatch_log_stream.filebeat-integtest-2.name "aws_region" : var.aws_region }) filename = "${path.module}/outputs.yml" diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 413a1e709084..91ed45d91172 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/filebeat/beater" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/statestore" @@ -79,6 +81,7 @@ type cloudwatchPoller struct { prevEndTime int64 workerSem *awscommon.Sem log *logp.Logger + metrics *inputMetrics store *statestore.Store workersListingMap *sync.Map workersProcessingMap *sync.Map @@ -86,22 +89,31 @@ type cloudwatchPoller struct { type logProcessor struct { log *logp.Logger + metrics *inputMetrics publisher beat.Client ack *awscommon.EventACKTracker } -func newLogProcessor(log *logp.Logger, publisher beat.Client, ctx context.Context) *logProcessor { +func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Client, ctx context.Context) *logProcessor { + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } return &logProcessor{ log: log, + metrics: metrics, publisher: publisher, ack: awscommon.NewEventACKTracker(ctx), } } -func newCloudwatchPoller(log *logp.Logger, +func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, store *statestore.Store, awsRegion string, apiSleep time.Duration, numberOfWorkers int, logStreams []string, logStreamPrefix string) *cloudwatchPoller { + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + return &cloudwatchPoller{ numberOfWorkers: numberOfWorkers, apiSleep: apiSleep, @@ -112,6 +124,7 @@ func newCloudwatchPoller(log *logp.Logger, endTime: int64(0), workerSem: awscommon.NewSem(numberOfWorkers), log: log, + metrics: metrics, store: store, workersListingMap: new(sync.Map), workersProcessingMap: new(sync.Map), @@ -195,16 +208,20 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) return fmt.Errorf("failed to get log group names: %w", err) } - log := inputContext.Logger.With("aws-cloudwatch") + log := inputContext.Logger + metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() + metrics := newInputMetrics(metricRegistry, inputContext.ID) cwPoller := newCloudwatchPoller( log.Named("cloudwatch_poller"), + metrics, persistentStore, in.awsConfig.Region, in.config.APISleep, in.config.NumberOfWorkers, in.config.LogStreams, in.config.LogStreamPrefix) - logProcessor := newLogProcessor(log.Named("log_processor"), client, ctx) + logProcessor := newLogProcessor(log.Named("log_processor"), metrics, client, ctx) + cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames))) return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames) } @@ -318,8 +335,10 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.Cl paginator := cloudwatchlogs.NewFilterLogEventsPaginator(req) for paginator.Next(context.TODO()) { page := paginator.CurrentPage() + p.metrics.apiCallsTotal.Inc() logEvents := page.Events + p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents))) p.log.Debugf("Processing #%v events", len(logEvents)) err := logProcessor.processLogEvents(logEvents, logGroup, p.region) if err != nil { @@ -344,6 +363,7 @@ func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTim LogGroupName: awssdk.String(logGroup), StartTime: awssdk.Int64(startTime), EndTime: awssdk.Int64(endTime), + Limit: awssdk.Int64(100), } if len(p.logStreams) > 0 { @@ -414,5 +434,6 @@ func createEvent(logEvent cloudwatchlogs.FilteredLogEvent, logGroup string, regi func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { ack.Add() event.Private = ack + p.metrics.cloudwatchEventsCreatedTotal.Inc() p.publisher.Publish(*event) } diff --git a/x-pack/filebeat/input/awscloudwatch/input_integration_test.go b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go new file mode 100644 index 000000000000..633a0ddcada9 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/input_integration_test.go @@ -0,0 +1,232 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// See _meta/terraform/README.md for integration test usage instructions. + +//go:build integration && aws +// +build integration,aws + +package awscloudwatch + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "golang.org/x/sync/errgroup" + "gopkg.in/yaml.v2" + + awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/external" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/filebeat/beater" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" +) + +const ( + inputID = "test_id" + message1 = "test1" + message2 = "test2" + terraformOutputYML = "_meta/terraform/outputs.yml" + logGroupNamePrefix = "filebeat-log-group-integtest-" +) + +var ( + cloudwatchConfig = common.MapStr{ + "start_position": "beginning", + "scan_frequency": 10 * time.Second, + "api_timeout": 120 * time.Second, + "number_of_workers": 1, + } +) + +type terraformOutputData struct { + AWSRegion string `yaml:"aws_region"` + LogGroup1 string `yaml:"log_group_name_1"` + LogGroup2 string `yaml:"log_group_name_2"` + LogStream1 string `yaml:"log_stream_name_1"` + LogStream2 string `yaml:"log_stream_name_2"` +} + +func getTerraformOutputs(t *testing.T) terraformOutputData { + t.Helper() + + ymlData, err := ioutil.ReadFile(terraformOutputYML) + if os.IsNotExist(err) { + t.Skipf("Run 'terraform apply' in %v to setup CloudWatch log groups and log streams for the test.", filepath.Dir(terraformOutputYML)) + } + if err != nil { + t.Fatalf("failed reading terraform output data: %v", err) + } + + var rtn terraformOutputData + dec := yaml.NewDecoder(bytes.NewReader(ymlData)) + dec.SetStrict(true) + if err = dec.Decode(&rtn); err != nil { + t.Fatal(err) + } + + return rtn +} + +func assertMetric(t *testing.T, snapshot common.MapStr, name string, value interface{}) { + n, _ := snapshot.GetValue(inputID + "." + name) + assert.EqualValues(t, value, n, name) +} + +func newV2Context() (v2.Context, func()) { + ctx, cancel := context.WithCancel(context.Background()) + return v2.Context{ + Logger: logp.NewLogger(inputName).With("id", inputID), + ID: inputID, + Cancelation: ctx, + }, cancel +} + +type testInputStore struct { + registry *statestore.Registry +} + +func openTestStatestore() beater.StateStore { + return &testInputStore{ + registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), + } +} + +func (s *testInputStore) Close() { + s.registry.Close() +} + +func (s *testInputStore) Access() (*statestore.Store, error) { + return s.registry.Get("filebeat") +} + +func (s *testInputStore) CleanupInterval() time.Duration { + return 24 * time.Hour +} + +func createInput(t *testing.T, cfg *common.Config) *cloudwatchInput { + inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg) + if err != nil { + t.Fatal(err) + } + + return inputV2.(*cloudwatchInput) +} + +func makeTestConfigWithLogGroupNamePrefix(regionName string) *common.Config { + return common.MustNewConfigFrom(fmt.Sprintf(`--- +log_group_name_prefix: %s +region_name: %s +`, logGroupNamePrefix, regionName)) +} + +func uploadLogMessage(t *testing.T, svc cloudwatchlogsiface.ClientAPI, message string, timestamp int64, logGroupName string, logStreamName string) { + describeLogStreamsInput := cloudwatchlogs.DescribeLogStreamsInput{ + LogGroupName: awssdk.String(logGroupName), + LogStreamNamePrefix: awssdk.String(logStreamName), + } + + reqDescribeLogStreams := svc.DescribeLogStreamsRequest(&describeLogStreamsInput) + resp, err := reqDescribeLogStreams.Send(context.TODO()) + if err != nil { + t.Fatalf("Failed to describe log stream %q in log group %q: %v", logStreamName, logGroupName, err) + } + + if len(resp.LogStreams) != 1 { + t.Fatalf("Describe log stream %q in log group %q should return 1 and only 1 value", logStreamName, logGroupName) + } + + inputLogEvent := cloudwatchlogs.InputLogEvent{ + Message: awssdk.String(message), + Timestamp: awssdk.Int64(timestamp), + } + + reqPutLogEvents := svc.PutLogEventsRequest( + &cloudwatchlogs.PutLogEventsInput{ + LogEvents: []cloudwatchlogs.InputLogEvent{inputLogEvent}, + LogGroupName: awssdk.String(logGroupName), + LogStreamName: awssdk.String(logStreamName), + SequenceToken: resp.LogStreams[0].UploadSequenceToken, + }) + _, err = reqPutLogEvents.Send(context.TODO()) + if err != nil { + t.Fatalf("Failed to upload message %q into log stream %q in log group %q: %v", message, logStreamName, logGroupName, err) + } +} + +func TestInputWithLogGroupNamePrefix(t *testing.T) { + logp.TestingSetup() + + // Terraform is used to set up S3 and SQS and must be executed manually. + tfConfig := getTerraformOutputs(t) + + cfg, err := external.LoadDefaultAWSConfig() + if err != nil { + t.Fatal(err) + } + cfg.Region = tfConfig.AWSRegion + + // upload log messages for testing + svc := cloudwatchlogs.New(cfg) + currentTime := time.Now() + timestamp := currentTime.UnixNano() / int64(time.Millisecond) + + uploadLogMessage(t, svc, message1, timestamp, tfConfig.LogGroup1, tfConfig.LogStream1) + uploadLogMessage(t, svc, message2, timestamp, tfConfig.LogGroup2, tfConfig.LogStream2) + + // sleep for 30 seconds to wait for the log messages to show up + time.Sleep(30 * time.Second) + + cloudwatchInput := createInput(t, makeTestConfigWithLogGroupNamePrefix(tfConfig.AWSRegion)) + inputCtx, cancel := newV2Context() + t.Cleanup(cancel) + time.AfterFunc(30*time.Second, func() { + cancel() + }) + + client := pubtest.NewChanClient(0) + defer close(client.Channel) + go func() { + for event := range client.Channel { + // Fake the ACK handling that's not implemented in pubtest. + event.Private.(*awscommon.EventACKTracker).ACK() + } + }() + + var errGroup errgroup.Group + errGroup.Go(func() error { + pipeline := pubtest.PublisherWithClient(client) + return cloudwatchInput.Run(inputCtx, pipeline) + }) + + if err := errGroup.Wait(); err != nil { + t.Fatal(err) + } + + snap := common.MapStr(monitoring.CollectStructSnapshot( + monitoring.GetNamespace("dataset").GetRegistry(), + monitoring.Full, + false)) + t.Log(snap.StringToPrint()) + + assertMetric(t, snap, "log_events_received_total", 2) + assertMetric(t, snap, "log_groups_total", 2) + assertMetric(t, snap, "cloudwatch_events_created_total", 2) +} diff --git a/x-pack/filebeat/input/awscloudwatch/metrics.go b/x-pack/filebeat/input/awscloudwatch/metrics.go new file mode 100644 index 000000000000..8d53ec5700cb --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/metrics.go @@ -0,0 +1,39 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awscloudwatch + +import ( + "github.com/elastic/beats/v7/libbeat/monitoring" +) + +type inputMetrics struct { + id string // Input ID. + parent *monitoring.Registry // Parent registry holding this input's ID as a key. + + logEventsReceivedTotal *monitoring.Uint // Number of CloudWatch log events received. + logGroupsTotal *monitoring.Uint // Logs collected from number of CloudWatch log groups. + cloudwatchEventsCreatedTotal *monitoring.Uint // Number of events created from processing logs from CloudWatch. + apiCallsTotal *monitoring.Uint // Number of API calls made total. +} + +// Close removes the metrics from the registry. +func (m *inputMetrics) Close() { + m.parent.Remove(m.id) +} + +func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { + reg := parent.NewRegistry(id) + monitoring.NewString(reg, "input").Set(inputName) + monitoring.NewString(reg, "id").Set(id) + out := &inputMetrics{ + id: id, + parent: reg, + logEventsReceivedTotal: monitoring.NewUint(reg, "log_events_received_total"), + logGroupsTotal: monitoring.NewUint(reg, "log_groups_total"), + cloudwatchEventsCreatedTotal: monitoring.NewUint(reg, "cloudwatch_events_created_total"), + apiCallsTotal: monitoring.NewUint(reg, "api_calls_total"), + } + return out +} diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index a7f4f651c07c..d112e6a4c356 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -211,7 +211,7 @@ func TestInputRunSQS(t *testing.T) { go func() { for event := range client.Channel { // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() } }() @@ -274,7 +274,7 @@ func TestInputRunS3(t *testing.T) { go func() { for event := range client.Channel { // Fake the ACK handling that's not implemented in pubtest. - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() } }() @@ -479,7 +479,7 @@ func TestInputRunSNS(t *testing.T) { defer close(client.Channel) go func() { for event := range client.Channel { - event.Private.(*eventACKTracker).ACK() + event.Private.(*awscommon.EventACKTracker).ACK() } }() From d32d11a8ef8194aa0ff88c0fc45b5f7bce78331f Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 13 Jan 2022 16:27:13 -0700 Subject: [PATCH 09/11] move some functions into separate files --- .../input/awscloudwatch/cloudwatch.go | 125 +++++++++++++ x-pack/filebeat/input/awscloudwatch/input.go | 176 +----------------- .../filebeat/input/awscloudwatch/processor.go | 78 ++++++++ 3 files changed, 207 insertions(+), 172 deletions(-) create mode 100644 x-pack/filebeat/input/awscloudwatch/cloudwatch.go create mode 100644 x-pack/filebeat/input/awscloudwatch/processor.go diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go new file mode 100644 index 000000000000..fbf8365b9406 --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -0,0 +1,125 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awscloudwatch + +import ( + "context" + "sync" + "time" + + awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/libbeat/statestore" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" +) + +type cloudwatchPoller struct { + numberOfWorkers int + apiSleep time.Duration + region string + logStreams []string + logStreamPrefix string + startTime int64 + endTime int64 + prevEndTime int64 + workerSem *awscommon.Sem + log *logp.Logger + metrics *inputMetrics + store *statestore.Store + workersListingMap *sync.Map + workersProcessingMap *sync.Map +} + +func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, + store *statestore.Store, + awsRegion string, apiSleep time.Duration, + numberOfWorkers int, logStreams []string, logStreamPrefix string) *cloudwatchPoller { + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + + return &cloudwatchPoller{ + numberOfWorkers: numberOfWorkers, + apiSleep: apiSleep, + region: awsRegion, + logStreams: logStreams, + logStreamPrefix: logStreamPrefix, + startTime: int64(0), + endTime: int64(0), + workerSem: awscommon.NewSem(numberOfWorkers), + log: log, + metrics: metrics, + store: store, + workersListingMap: new(sync.Map), + workersProcessingMap: new(sync.Map), + } +} + +func (p *cloudwatchPoller) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) { + err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) + if err != nil { + var err *awssdk.RequestCanceledError + if errors.As(err, &err) { + p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) + } + p.log.Error("getLogEventsFromCloudWatch failed: ", err) + } +} + +// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch +func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { + // construct FilterLogEventsInput + filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) + + // make API request + req := svc.FilterLogEventsRequest(filterLogEventsInput) + paginator := cloudwatchlogs.NewFilterLogEventsPaginator(req) + for paginator.Next(context.TODO()) { + page := paginator.CurrentPage() + p.metrics.apiCallsTotal.Inc() + + logEvents := page.Events + p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents))) + p.log.Debugf("Processing #%v events", len(logEvents)) + err := logProcessor.processLogEvents(logEvents, logGroup, p.region) + if err != nil { + err = errors.Wrap(err, "processLogEvents failed") + p.log.Error(err) + } + } + + if err := paginator.Err(); err != nil { + return errors.Wrap(err, "error FilterLogEvents with Paginator") + } + + // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). + p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) + time.Sleep(p.apiSleep) + p.log.Debug("done sleeping") + return nil +} + +func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { + filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ + LogGroupName: awssdk.String(logGroup), + StartTime: awssdk.Int64(startTime), + EndTime: awssdk.Int64(endTime), + Limit: awssdk.Int64(100), + } + + if len(p.logStreams) > 0 { + filterLogEventsInput.LogStreamNames = p.logStreams + } + + if p.logStreamPrefix != "" { + filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix) + } + return filterLogEventsInput +} diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 91ed45d91172..c659b04619d4 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -11,25 +11,21 @@ import ( "sync" "time" - "github.com/elastic/beats/v7/libbeat/monitoring" - - "github.com/elastic/beats/v7/filebeat/beater" - "github.com/elastic/beats/v7/libbeat/feature" - "github.com/elastic/beats/v7/libbeat/statestore" - "github.com/elastic/go-concert/unison" - awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/arn" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" "github.com/pkg/errors" + "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" - "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + "github.com/elastic/go-concert/unison" ) const ( @@ -70,67 +66,6 @@ type cloudwatchInput struct { store beater.StateStore } -type cloudwatchPoller struct { - numberOfWorkers int - apiSleep time.Duration - region string - logStreams []string - logStreamPrefix string - startTime int64 - endTime int64 - prevEndTime int64 - workerSem *awscommon.Sem - log *logp.Logger - metrics *inputMetrics - store *statestore.Store - workersListingMap *sync.Map - workersProcessingMap *sync.Map -} - -type logProcessor struct { - log *logp.Logger - metrics *inputMetrics - publisher beat.Client - ack *awscommon.EventACKTracker -} - -func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Client, ctx context.Context) *logProcessor { - if metrics == nil { - metrics = newInputMetrics(monitoring.NewRegistry(), "") - } - return &logProcessor{ - log: log, - metrics: metrics, - publisher: publisher, - ack: awscommon.NewEventACKTracker(ctx), - } -} - -func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, - store *statestore.Store, - awsRegion string, apiSleep time.Duration, - numberOfWorkers int, logStreams []string, logStreamPrefix string) *cloudwatchPoller { - if metrics == nil { - metrics = newInputMetrics(monitoring.NewRegistry(), "") - } - - return &cloudwatchPoller{ - numberOfWorkers: numberOfWorkers, - apiSleep: apiSleep, - region: awsRegion, - logStreams: logStreams, - logStreamPrefix: logStreamPrefix, - startTime: int64(0), - endTime: int64(0), - workerSem: awscommon.NewSem(numberOfWorkers), - log: log, - metrics: metrics, - store: store, - workersListingMap: new(sync.Map), - workersProcessingMap: new(sync.Map), - } -} - func newInput(config config, store beater.StateStore) (*cloudwatchInput, error) { cfgwarn.Beta("aws-cloudwatch input type is used") awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) @@ -271,17 +206,6 @@ func (in *cloudwatchInput) Receive(svc cloudwatchlogsiface.ClientAPI, cwPoller * return ctx.Err() } -func (p *cloudwatchPoller) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) { - err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) - if err != nil { - var err *awssdk.RequestCanceledError - if errors.As(err, &err) { - p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) - } - p.log.Error("getLogEventsFromCloudWatch failed: ", err) - } -} - func parseARN(logGroupARN string) (string, string, error) { arnParsed, err := arn.Parse(logGroupARN) if err != nil { @@ -325,57 +249,6 @@ func getLogGroupNames(svc cloudwatchlogsiface.ClientAPI, logGroupNamePrefix stri return logGroupNames, nil } -// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch -func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { - // construct FilterLogEventsInput - filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) - - // make API request - req := svc.FilterLogEventsRequest(filterLogEventsInput) - paginator := cloudwatchlogs.NewFilterLogEventsPaginator(req) - for paginator.Next(context.TODO()) { - page := paginator.CurrentPage() - p.metrics.apiCallsTotal.Inc() - - logEvents := page.Events - p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents))) - p.log.Debugf("Processing #%v events", len(logEvents)) - err := logProcessor.processLogEvents(logEvents, logGroup, p.region) - if err != nil { - err = errors.Wrap(err, "processLogEvents failed") - p.log.Error(err) - } - } - - if err := paginator.Err(); err != nil { - return errors.Wrap(err, "error FilterLogEvents with Paginator") - } - - // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). - p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) - time.Sleep(p.apiSleep) - p.log.Debug("done sleeping") - return nil -} - -func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { - filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: awssdk.String(logGroup), - StartTime: awssdk.Int64(startTime), - EndTime: awssdk.Int64(endTime), - Limit: awssdk.Int64(100), - } - - if len(p.logStreams) > 0 { - filterLogEventsInput.LogStreamNames = p.logStreams - } - - if p.logStreamPrefix != "" { - filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix) - } - return filterLogEventsInput -} - func getStartPosition(startPosition string, currentTime time.Time, endTime int64, scanFrequency time.Duration, latency time.Duration) (int64, int64) { if latency != 0 { // add latency if config is not 0 @@ -396,44 +269,3 @@ func getStartPosition(startPosition string, currentTime time.Time, endTime int64 } return 0, 0 } - -func (p *logProcessor) processLogEvents(logEvents []cloudwatchlogs.FilteredLogEvent, logGroup string, regionName string) error { - for _, logEvent := range logEvents { - event := createEvent(logEvent, logGroup, regionName) - p.publish(p.ack, &event) - } - return nil -} - -func createEvent(logEvent cloudwatchlogs.FilteredLogEvent, logGroup string, regionName string) beat.Event { - event := beat.Event{ - Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(), - Fields: common.MapStr{ - "message": *logEvent.Message, - "log.file.path": logGroup + "/" + *logEvent.LogStreamName, - "event": common.MapStr{ - "id": *logEvent.EventId, - "ingested": time.Now(), - }, - "awscloudwatch": common.MapStr{ - "log_group": logGroup, - "log_stream": *logEvent.LogStreamName, - "ingestion_time": time.Unix(*logEvent.IngestionTime/1000, 0), - }, - "cloud": common.MapStr{ - "provider": "aws", - "region": regionName, - }, - }, - } - event.SetID(*logEvent.EventId) - - return event -} - -func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { - ack.Add() - event.Private = ack - p.metrics.cloudwatchEventsCreatedTotal.Inc() - p.publisher.Publish(*event) -} diff --git a/x-pack/filebeat/input/awscloudwatch/processor.go b/x-pack/filebeat/input/awscloudwatch/processor.go new file mode 100644 index 000000000000..558e91d5da5e --- /dev/null +++ b/x-pack/filebeat/input/awscloudwatch/processor.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awscloudwatch + +import ( + "context" + "time" + + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" +) + +type logProcessor struct { + log *logp.Logger + metrics *inputMetrics + publisher beat.Client + ack *awscommon.EventACKTracker +} + +func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Client, ctx context.Context) *logProcessor { + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + return &logProcessor{ + log: log, + metrics: metrics, + publisher: publisher, + ack: awscommon.NewEventACKTracker(ctx), + } +} + +func (p *logProcessor) processLogEvents(logEvents []cloudwatchlogs.FilteredLogEvent, logGroup string, regionName string) error { + for _, logEvent := range logEvents { + event := createEvent(logEvent, logGroup, regionName) + p.publish(p.ack, &event) + } + return nil +} + +func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) { + ack.Add() + event.Private = ack + p.metrics.cloudwatchEventsCreatedTotal.Inc() + p.publisher.Publish(*event) +} + +func createEvent(logEvent cloudwatchlogs.FilteredLogEvent, logGroup string, regionName string) beat.Event { + event := beat.Event{ + Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(), + Fields: common.MapStr{ + "message": *logEvent.Message, + "log.file.path": logGroup + "/" + *logEvent.LogStreamName, + "event": common.MapStr{ + "id": *logEvent.EventId, + "ingested": time.Now(), + }, + "awscloudwatch": common.MapStr{ + "log_group": logGroup, + "log_stream": *logEvent.LogStreamName, + "ingestion_time": time.Unix(*logEvent.IngestionTime/1000, 0), + }, + "cloud": common.MapStr{ + "provider": "aws", + "region": regionName, + }, + }, + } + event.SetID(*logEvent.EventId) + + return event +} From c7c302a0bad892c2de4f9bdbffa7d476f4fa2e1d Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 13 Jan 2022 20:43:04 -0700 Subject: [PATCH 10/11] add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e78e1bc7bce4..f76357b2697e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -106,6 +106,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `sophos` KV splitting and syslog header handling {issue}24237[24237] {pull}29331[29331] - Undo deletion of endpoint config from cloudtrail fileset in {pull}29415[29415]. {pull}29450[29450] - ibmmq: Fixed `@timestamp` not being populated with correct values. {pull}29773[29773] +- Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695] *Heartbeat* From acd260a12d2233e5b516d16483751e9aa9bd5412 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 24 Jan 2022 20:52:52 -0700 Subject: [PATCH 11/11] add workerWg.Add(availableWorkers) --- .../input/awscloudwatch/cloudwatch.go | 11 +++--- x-pack/filebeat/input/awscloudwatch/input.go | 35 +++++++++++++++---- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go index fbf8365b9406..29c119117a09 100644 --- a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -87,6 +87,12 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.Cl logEvents := page.Events p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents))) + + // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). + p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) + time.Sleep(p.apiSleep) + p.log.Debug("done sleeping") + p.log.Debugf("Processing #%v events", len(logEvents)) err := logProcessor.processLogEvents(logEvents, logGroup, p.region) if err != nil { @@ -98,11 +104,6 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.Cl if err := paginator.Err(); err != nil { return errors.Wrap(err, "error FilterLogEvents with Paginator") } - - // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). - p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) - time.Sleep(p.apiSleep) - p.log.Debug("done sleeping") return nil } diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index c659b04619d4..d11afa77ff5a 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -166,6 +166,7 @@ func (in *cloudwatchInput) Receive(svc cloudwatchlogsiface.ClientAPI, cwPoller * // listing, sequentially processes every object and then does another listing start := true workerWg := new(sync.WaitGroup) + lastLogGroupOffset := 0 for ctx.Err() == nil { if start == false { cwPoller.log.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency) @@ -177,14 +178,36 @@ func (in *cloudwatchInput) Receive(svc cloudwatchlogsiface.ClientAPI, cwPoller * currentTime := time.Now() cwPoller.startTime, cwPoller.endTime = getStartPosition(in.config.StartPosition, currentTime, cwPoller.endTime, in.config.ScanFrequency, in.config.Latency) cwPoller.log.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(cwPoller.startTime/1000, 0), time.Unix(cwPoller.endTime/1000, 0)) - for _, lg := range logGroupNames { - // Determine how many workers are available. - availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx) - if err != nil || availableWorkers == 0 { - continue + availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx) + if err != nil { + break + } + + if availableWorkers == 0 { + continue + } + + workerWg.Add(availableWorkers) + logGroupNamesLength := len(logGroupNames) + runningGoroutines := 0 + + for i := lastLogGroupOffset; i < logGroupNamesLength; i++ { + if runningGoroutines >= availableWorkers { + break + } + + runningGoroutines++ + lastLogGroupOffset = i + 1 + if lastLogGroupOffset >= logGroupNamesLength { + // release unused workers + cwPoller.workerSem.Release(availableWorkers - runningGoroutines) + for j := 0; j < availableWorkers-runningGoroutines; j++ { + workerWg.Done() + } + lastLogGroupOffset = 0 } - workerWg.Add(1) + lg := logGroupNames[i] go func(logGroup string, startTime int64, endTime int64) { defer func() { cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup)