-
Notifications
You must be signed in to change notification settings - Fork 5k
Support running multiple log groups in cloudwatch input #29695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
kaiyan-sheng
merged 13 commits into
elastic:master
from
kaiyan-sheng:fix_cloudwatch_input
Jan 25, 2022
Merged
Changes from 11 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
3db6a2a
Support running multiple log groups in cloudwatch input
263c76a
fix go vet
c91a8a8
add number_of_workers into doc
b03a459
add terraform file
0411a2f
move to use Filebeat V2
970dab9
continue refactoring using filebeat/input/v2
710dc41
fix go vet
fad9936
add integration test for aws cloudwatch input
c64cdb1
Merge remote-tracking branch 'upstream/master' into fix_cloudwatch_input
d32d11a
move some functions into separate files
c7c302a
add changelog
acd260a
add workerWg.Add(availableWorkers)
3e4060f
Merge remote-tracking branch 'upstream/master' into fix_cloudwatch_input
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
3 changes: 3 additions & 0 deletions
3
x-pack/filebeat/input/awscloudwatch/_meta/terraform/.gitignore
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| terraform/ | ||
| outputs.yml | ||
| *.tfstate* |
57 changes: 57 additions & 0 deletions
57
x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
46 changes: 46 additions & 0 deletions
46
x-pack/filebeat/input/awscloudwatch/_meta/terraform/README.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| # Terraform setup for AWS CloudWatch Input Integration Tests | ||
|
|
||
| This directory contains a Terraform module that creates the AWS resources needed | ||
| for executing the integration tests for the `aws-cloudwatch` Filebeat input. It | ||
| creates two CloudWatch log groups, and one log stream under each log group. | ||
|
|
||
| It outputs configuration information that is consumed by the tests to | ||
| `outputs.yml`. The AWS resources are randomly named to prevent name collisions | ||
| between multiple users. | ||
|
|
||
| ### Usage | ||
|
|
||
| You must have the appropriate AWS environment variables for authentication set | ||
| before running Terraform or the integration tests. The AWS key must be | ||
| authorized to create and destroy AWS CloudWatch log groups. | ||
|
|
||
| 1. Initialize a working directory containing Terraform configuration files. | ||
|
|
||
| `terraform init` | ||
|
|
||
| 2. Execute terraform in this directory to create the resources. This will also | ||
| write the `outputs.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order | ||
| to match the AWS region of the profile you are using. | ||
|
|
||
| `terraform apply` | ||
|
|
||
|
|
||
| 2. (Optional) View the output configuration. | ||
|
|
||
| ```yaml | ||
| "aws_region": "us-east-1" | ||
| "log_group_name_1": "filebeat-cloudwatch-integtest-1-417koa" | ||
| "log_group_name_2": "filebeat-cloudwatch-integtest-2-417koa" | ||
| ``` | ||
|
|
||
| 3. Execute the integration test. | ||
|
|
||
| ``` | ||
| cd x-pack/filebeat/input/awss3 | ||
| go test -tags aws,integration -run TestInputRun.+ -v . | ||
| ``` | ||
|
|
||
| 4. Cleanup AWS resources. Execute terraform to delete the log groups created for | ||
| testing. | ||
|
|
||
| `terraform destroy` |
44 changes: 44 additions & 0 deletions
44
x-pack/filebeat/input/awscloudwatch/_meta/terraform/main.tf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| terraform { | ||
| required_providers { | ||
| aws = { | ||
| source = "hashicorp/aws" | ||
| version = "~> 3.52" | ||
| } | ||
| } | ||
| } | ||
|
|
||
| provider "aws" { | ||
| region = var.aws_region | ||
| } | ||
|
|
||
| resource "random_string" "random" { | ||
| length = 6 | ||
| special = false | ||
| upper = false | ||
| } | ||
|
|
||
| resource "aws_cloudwatch_log_group" "filebeat-integtest-1" { | ||
| name = "filebeat-log-group-integtest-1-${random_string.random.result}" | ||
|
|
||
| tags = { | ||
| Environment = "test" | ||
| } | ||
| } | ||
|
|
||
| resource "aws_cloudwatch_log_group" "filebeat-integtest-2" { | ||
| name = "filebeat-log-group-integtest-2-${random_string.random.result}" | ||
|
|
||
| tags = { | ||
| Environment = "test" | ||
| } | ||
| } | ||
|
|
||
| resource "aws_cloudwatch_log_stream" "filebeat-integtest-1" { | ||
| name = "filebeat-log-stream-integtest-1-${random_string.random.result}" | ||
| log_group_name = aws_cloudwatch_log_group.filebeat-integtest-1.name | ||
| } | ||
|
|
||
| resource "aws_cloudwatch_log_stream" "filebeat-integtest-2" { | ||
| name = "filebeat-log-stream-integtest-2-${random_string.random.result}" | ||
| log_group_name = aws_cloudwatch_log_group.filebeat-integtest-2.name | ||
| } |
11 changes: 11 additions & 0 deletions
11
x-pack/filebeat/input/awscloudwatch/_meta/terraform/outputs.tf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| resource "local_file" "secrets" { | ||
| content = yamlencode({ | ||
| "log_group_name_1" : aws_cloudwatch_log_group.filebeat-integtest-1.name | ||
| "log_group_name_2" : aws_cloudwatch_log_group.filebeat-integtest-2.name | ||
| "log_stream_name_1" : aws_cloudwatch_log_stream.filebeat-integtest-1.name | ||
| "log_stream_name_2" : aws_cloudwatch_log_stream.filebeat-integtest-2.name | ||
| "aws_region" : var.aws_region | ||
| }) | ||
| filename = "${path.module}/outputs.yml" | ||
| file_permission = "0644" | ||
| } | ||
5 changes: 5 additions & 0 deletions
5
x-pack/filebeat/input/awscloudwatch/_meta/terraform/variables.tf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| variable "aws_region" { | ||
| description = "AWS Region" | ||
| type = string | ||
| default = "us-east-1" | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,125 @@ | ||
| // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| // or more contributor license agreements. Licensed under the Elastic License; | ||
| // you may not use this file except in compliance with the Elastic License. | ||
|
|
||
| package awscloudwatch | ||
|
|
||
| import ( | ||
| "context" | ||
| "sync" | ||
| "time" | ||
|
|
||
| awssdk "github.com/aws/aws-sdk-go-v2/aws" | ||
| "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" | ||
| "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface" | ||
| "github.com/pkg/errors" | ||
|
|
||
| "github.com/elastic/beats/v7/libbeat/logp" | ||
| "github.com/elastic/beats/v7/libbeat/monitoring" | ||
| "github.com/elastic/beats/v7/libbeat/statestore" | ||
| awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" | ||
| ) | ||
|
|
||
| type cloudwatchPoller struct { | ||
| numberOfWorkers int | ||
| apiSleep time.Duration | ||
| region string | ||
| logStreams []string | ||
| logStreamPrefix string | ||
| startTime int64 | ||
| endTime int64 | ||
| prevEndTime int64 | ||
| workerSem *awscommon.Sem | ||
| log *logp.Logger | ||
| metrics *inputMetrics | ||
| store *statestore.Store | ||
| workersListingMap *sync.Map | ||
| workersProcessingMap *sync.Map | ||
| } | ||
|
|
||
| func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, | ||
| store *statestore.Store, | ||
| awsRegion string, apiSleep time.Duration, | ||
| numberOfWorkers int, logStreams []string, logStreamPrefix string) *cloudwatchPoller { | ||
| if metrics == nil { | ||
| metrics = newInputMetrics(monitoring.NewRegistry(), "") | ||
| } | ||
|
|
||
| return &cloudwatchPoller{ | ||
| numberOfWorkers: numberOfWorkers, | ||
| apiSleep: apiSleep, | ||
| region: awsRegion, | ||
| logStreams: logStreams, | ||
| logStreamPrefix: logStreamPrefix, | ||
| startTime: int64(0), | ||
| endTime: int64(0), | ||
| workerSem: awscommon.NewSem(numberOfWorkers), | ||
| log: log, | ||
| metrics: metrics, | ||
| store: store, | ||
| workersListingMap: new(sync.Map), | ||
| workersProcessingMap: new(sync.Map), | ||
| } | ||
| } | ||
|
|
||
| func (p *cloudwatchPoller) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) { | ||
| err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) | ||
| if err != nil { | ||
| var err *awssdk.RequestCanceledError | ||
| if errors.As(err, &err) { | ||
| p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err) | ||
| } | ||
| p.log.Error("getLogEventsFromCloudWatch failed: ", err) | ||
| } | ||
| } | ||
|
|
||
| // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch | ||
| func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { | ||
| // construct FilterLogEventsInput | ||
| filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) | ||
|
|
||
| // make API request | ||
| req := svc.FilterLogEventsRequest(filterLogEventsInput) | ||
| paginator := cloudwatchlogs.NewFilterLogEventsPaginator(req) | ||
| for paginator.Next(context.TODO()) { | ||
| page := paginator.CurrentPage() | ||
| p.metrics.apiCallsTotal.Inc() | ||
|
|
||
| logEvents := page.Events | ||
| p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents))) | ||
| p.log.Debugf("Processing #%v events", len(logEvents)) | ||
| err := logProcessor.processLogEvents(logEvents, logGroup, p.region) | ||
| if err != nil { | ||
| err = errors.Wrap(err, "processLogEvents failed") | ||
| p.log.Error(err) | ||
| } | ||
| } | ||
|
|
||
| if err := paginator.Err(); err != nil { | ||
| return errors.Wrap(err, "error FilterLogEvents with Paginator") | ||
| } | ||
|
|
||
| // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region). | ||
| p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep) | ||
|
francescayeye marked this conversation as resolved.
Outdated
|
||
| time.Sleep(p.apiSleep) | ||
| p.log.Debug("done sleeping") | ||
| return nil | ||
| } | ||
|
|
||
| func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { | ||
| filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ | ||
| LogGroupName: awssdk.String(logGroup), | ||
| StartTime: awssdk.Int64(startTime), | ||
| EndTime: awssdk.Int64(endTime), | ||
| Limit: awssdk.Int64(100), | ||
| } | ||
|
|
||
| if len(p.logStreams) > 0 { | ||
| filterLogEventsInput.LogStreamNames = p.logStreams | ||
| } | ||
|
|
||
| if p.logStreamPrefix != "" { | ||
| filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix) | ||
| } | ||
| return filterLogEventsInput | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.