diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7d13a487f608..15cf420510dc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -82,6 +82,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di - Fix parsing error using GET in Jolokia module. {pull}11075[11075] {issue}11071[11071] - Add documentation about jolokia autodiscover fields. {issue}10925[10925] {pull}10979[10979] - Add missing aws.ec2.instance.state.name into fields.yml. {issue}11219[11219] {pull}11221[11221] +- Fix ec2 metricset to collect metrics from Cloudwatch with the same timestamp. {pull}11142[11142] *Packetbeat* diff --git a/metricbeat/docs/modules/aws.asciidoc b/metricbeat/docs/modules/aws.asciidoc index cd398a68f55f..a16e7d8f25d5 100644 --- a/metricbeat/docs/modules/aws.asciidoc +++ b/metricbeat/docs/modules/aws.asciidoc @@ -31,8 +31,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600 ---- -Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify -the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new +Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify +the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading] +feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order +to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable aws module making AWS api calls without have to generate new temporary credentials and update the config frequently. diff --git a/x-pack/metricbeat/module/aws/_meta/docs.asciidoc b/x-pack/metricbeat/module/aws/_meta/docs.asciidoc index 15229e679b61..a6d3427f94ef 100644 --- a/x-pack/metricbeat/module/aws/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/aws/_meta/docs.asciidoc @@ -24,8 +24,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600 ---- -Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify -the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new +Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify +the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading] +feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order +to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable aws module making AWS api calls without have to generate new temporary credentials and update the config frequently. diff --git a/x-pack/metricbeat/module/aws/ec2/ec2.go b/x-pack/metricbeat/module/aws/ec2/ec2.go index 9033195516c8..1716cd87e1c9 100644 --- a/x-pack/metricbeat/module/aws/ec2/ec2.go +++ b/x-pack/metricbeat/module/aws/ec2/ec2.go @@ -166,12 +166,22 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, // AWS EC2 Metrics mapOfMetricSetFieldResults := make(map[string]interface{}) - for _, output := range getMetricDataResults { - if len(output.Values) == 0 { - continue + + // Find a timestamp for all metrics in output + timestamp := aws.FindTimestamp(getMetricDataResults) + if !timestamp.IsZero() { + for _, output := range getMetricDataResults { + if len(output.Values) == 0 { + continue + } + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + labels := strings.Split(*output.Label, " ") + if len(output.Values) > timestampIdx { + mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx]) + } + } } - labels := strings.Split(*output.Label, " ") - mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[0]) } resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricSetFields) @@ -181,8 +191,9 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, } if len(mapOfMetricSetFieldResults) <= 11 { - info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for a new instance during the " + - "first data collection. If this shows up multiple times, please recheck the period setting in config." + info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for non-running instances or " + + "a new instance during the first data collection. If this shows up multiple times, please recheck the period " + + "setting in config." } instanceStateName, err := instanceOutput.State.Name.MarshalValue() diff --git a/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go b/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go index 6525e3f3b7c6..d4b72af18680 100644 --- a/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go +++ b/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go @@ -37,11 +37,17 @@ func TestFetch(t *testing.T) { mtest.CheckEventField("service.name", "string", event, t) mtest.CheckEventField("cloud.availability_zone", "string", event, t) mtest.CheckEventField("cloud.provider", "string", event, t) - mtest.CheckEventField("cloud.image.id", "string", event, t) mtest.CheckEventField("cloud.instance.id", "string", event, t) mtest.CheckEventField("cloud.machine.type", "string", event, t) mtest.CheckEventField("cloud.provider", "string", event, t) mtest.CheckEventField("cloud.region", "string", event, t) + mtest.CheckEventField("instance.image.id", "string", event, t) + mtest.CheckEventField("instance.state.name", "string", event, t) + mtest.CheckEventField("instance.state.code", "int", event, t) + mtest.CheckEventField("instance.monitoring.state", "string", event, t) + mtest.CheckEventField("instance.core.count", "int", event, t) + mtest.CheckEventField("instance.threads_per_core", "int", event, t) + // MetricSetField mtest.CheckEventField("cpu.total.pct", "float", event, t) mtest.CheckEventField("cpu.credit_usage", "float", event, t) diff --git a/x-pack/metricbeat/module/aws/ec2/ec2_test.go b/x-pack/metricbeat/module/aws/ec2/ec2_test.go index ddb49b884550..1f0afc5b431b 100644 --- a/x-pack/metricbeat/module/aws/ec2/ec2_test.go +++ b/x-pack/metricbeat/module/aws/ec2/ec2_test.go @@ -8,6 +8,7 @@ package ec2 import ( "testing" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" @@ -171,27 +172,32 @@ func TestCreateCloudWatchEvents(t *testing.T) { assert.Equal(t, 1, len(instanceIDs)) instanceID := instanceIDs[0] assert.Equal(t, instanceID, instanceID) + timestamp := time.Now() getMetricDataOutput := []cloudwatch.MetricDataResult{ { - Id: &id1, - Label: &label1, - Values: []float64{0.25}, + Id: &id1, + Label: &label1, + Values: []float64{0.25}, + Timestamps: []time.Time{timestamp}, }, { - Id: &id2, - Label: &label2, - Values: []float64{0.0}, + Id: &id2, + Label: &label2, + Values: []float64{0.0}, + Timestamps: []time.Time{timestamp}, }, { - Id: &id3, - Label: &label3, - Values: []float64{0.0}, + Id: &id3, + Label: &label3, + Values: []float64{0.0}, + Timestamps: []time.Time{timestamp}, }, { - Id: &id4, - Label: &label4, - Values: []float64{0.0}, + Id: &id4, + Label: &label4, + Values: []float64{0.0}, + Timestamps: []time.Time{timestamp}, }, } diff --git a/x-pack/metricbeat/module/aws/mtest/integration.go b/x-pack/metricbeat/module/aws/mtest/integration.go index 117ec5e5d424..8c52df79fb29 100644 --- a/x-pack/metricbeat/module/aws/mtest/integration.go +++ b/x-pack/metricbeat/module/aws/mtest/integration.go @@ -5,6 +5,7 @@ package mtest import ( + "errors" "os" "testing" @@ -48,36 +49,43 @@ func GetConfigForTest(metricSetName string) (map[string]interface{}, string) { // CheckEventField function checks a given field type and compares it with the expected type for integration tests. func CheckEventField(metricName string, expectedType string, event mb.Event, t *testing.T) { - if ok, err := event.MetricSetFields.HasKey(metricName); ok { - assert.NoError(t, err) - metricValue, err := event.MetricSetFields.GetValue(metricName) - assert.NoError(t, err) - compareType(metricValue, expectedType, t) - } else if ok, err := event.RootFields.HasKey(metricName); ok { - assert.NoError(t, err) - rootValue, err := event.RootFields.GetValue(metricName) - assert.NoError(t, err) - compareType(rootValue, expectedType, t) + ok1, err1 := event.MetricSetFields.HasKey(metricName) + ok2, err2 := event.RootFields.HasKey(metricName) + if ok1 || ok2 { + if ok1 { + assert.NoError(t, err1) + metricValue, err := event.MetricSetFields.GetValue(metricName) + assert.NoError(t, err) + err = compareType(metricValue, expectedType, metricName) + assert.NoError(t, err) + t.Log("Succeed: Field " + metricName + " matches type " + expectedType) + } else if ok2 { + assert.NoError(t, err2) + rootValue, err := event.RootFields.GetValue(metricName) + assert.NoError(t, err) + err = compareType(rootValue, expectedType, metricName) + assert.NoError(t, err) + t.Log("Succeed: Field " + metricName + " matches type " + expectedType) + } + } else { + t.Log("Field " + metricName + " does not exist in metric set fields") } } -func compareType(metricValue interface{}, expectedType string, t *testing.T) { +func compareType(metricValue interface{}, expectedType string, metricName string) (err error) { switch metricValue.(type) { case float64: if expectedType != "float" { - t.Log("Failed: Field is not in type " + expectedType) - t.Fail() + err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType) } case string: if expectedType != "string" { - t.Log("Failed: Field is not in type " + expectedType) - t.Fail() + err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType) } case int64: if expectedType != "int" { - t.Log("Failed: Field is not in type " + expectedType) - t.Fail() + err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType) } } - t.Log("Succeed: Field matches type " + expectedType) + return } diff --git a/x-pack/metricbeat/module/aws/utils.go b/x-pack/metricbeat/module/aws/utils.go index e66ad82d6057..2249f82e2266 100644 --- a/x-pack/metricbeat/module/aws/utils.go +++ b/x-pack/metricbeat/module/aws/utils.go @@ -84,3 +84,55 @@ func GetMetricDataResults(metricDataQueries []cloudwatch.MetricDataQuery, svc cl func EventMapping(input map[string]interface{}, schema s.Schema) (common.MapStr, error) { return schema.Apply(input, s.FailOnRequired) } + +// CheckTimestampInArray checks if input timestamp exists in timestampArray and if it exists, return the position. +func CheckTimestampInArray(timestamp time.Time, timestampArray []time.Time) (bool, int) { + for i := 0; i < len(timestampArray); i++ { + if timestamp.Equal(timestampArray[i]) { + return true, i + } + } + return false, -1 +} + +// FindTimestamp function checks MetricDataResults and find the timestamp to collect metrics from. +// For example, MetricDataResults might look like: +// metricDataResults = [{ +// Id: "sqs0", +// Label: "testName SentMessageSize", +// StatusCode: Complete, +// Timestamps: [2019-03-11 17:45:00 +0000 UTC], +// Values: [981] +// } { +// Id: "sqs1", +// Label: "testName NumberOfMessagesSent", +// StatusCode: Complete, +// Timestamps: [2019-03-11 17:45:00 +0000 UTC,2019-03-11 17:40:00 +0000 UTC], +// Values: [0.5,0] +// }] +// This case, we are collecting values for both metrics from timestamp 2019-03-11 17:45:00 +0000 UTC. +func FindTimestamp(getMetricDataResults []cloudwatch.MetricDataResult) time.Time { + timestamp := time.Time{} + for _, output := range getMetricDataResults { + // When there are outputs with one timestamp, use this timestamp. + if output.Timestamps != nil && len(output.Timestamps) == 1 { + // Use the first timestamp from Timestamps field to collect the latest data. + timestamp = output.Timestamps[0] + return timestamp + } + } + + // When there is no output with one timestamp, use the latest timestamp from timestamp list. + if timestamp.IsZero() { + for _, output := range getMetricDataResults { + // When there are outputs with one timestamp, use this timestamp + if output.Timestamps != nil && len(output.Timestamps) > 1 { + // Example Timestamps: [2019-03-11 17:36:00 +0000 UTC,2019-03-11 17:31:00 +0000 UTC] + timestamp = output.Timestamps[0] + return timestamp + } + } + } + + return timestamp +} diff --git a/x-pack/metricbeat/module/aws/utils_test.go b/x-pack/metricbeat/module/aws/utils_test.go index 9b57ecd3e67e..00bab25d80c2 100644 --- a/x-pack/metricbeat/module/aws/utils_test.go +++ b/x-pack/metricbeat/module/aws/utils_test.go @@ -7,6 +7,7 @@ package aws import ( "fmt" "testing" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" @@ -169,3 +170,108 @@ func TestGetMetricDataResults(t *testing.T) { assert.Equal(t, label4, *getMetricDataResults[3].Label) assert.Equal(t, 0.0, getMetricDataResults[3].Values[0]) } + +func TestCheckTimestampInArray(t *testing.T) { + timestamp1 := time.Now() + timestamp2 := timestamp1.Add(5 * time.Minute) + timestamp3 := timestamp1.Add(10 * time.Minute) + + cases := []struct { + targetTimestamp time.Time + expectedExists bool + expectedIndex int + }{ + { + targetTimestamp: timestamp1, + expectedExists: true, + expectedIndex: 0, + }, + { + targetTimestamp: timestamp3, + expectedExists: false, + expectedIndex: -1, + }, + } + + timestampArray := []time.Time{timestamp1, timestamp2} + for _, c := range cases { + exists, index := CheckTimestampInArray(c.targetTimestamp, timestampArray) + assert.Equal(t, c.expectedExists, exists) + assert.Equal(t, c.expectedIndex, index) + } +} + +func TestFindTimestamp(t *testing.T) { + timestamp1 := time.Now() + timestamp2 := timestamp1.Add(5 * time.Minute) + cases := []struct { + getMetricDataResults []cloudwatch.MetricDataResult + expectedTimestamp time.Time + }{ + { + getMetricDataResults: []cloudwatch.MetricDataResult{ + { + Id: &id1, + Label: &label1, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp1, timestamp2}, + Values: []float64{0, 1}, + }, + { + Id: &id2, + Label: &label2, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp1}, + Values: []float64{2, 3}, + }, + }, + expectedTimestamp: timestamp1, + }, + { + getMetricDataResults: []cloudwatch.MetricDataResult{ + { + Id: &id1, + Label: &label1, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp1, timestamp2}, + Values: []float64{0, 1}, + }, + { + Id: &id2, + Label: &label2, + StatusCode: cloudwatch.StatusCodeComplete, + }, + }, + expectedTimestamp: timestamp1, + }, + { + getMetricDataResults: []cloudwatch.MetricDataResult{ + { + Id: &id1, + Label: &label1, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp1, timestamp2}, + Values: []float64{0, 1}, + }, + { + Id: &id2, + Label: &label2, + StatusCode: cloudwatch.StatusCodeComplete, + }, + { + Id: &id3, + Label: &label2, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp2}, + Values: []float64{2, 3}, + }, + }, + expectedTimestamp: timestamp2, + }, + } + + for _, c := range cases { + outputTimestamp := FindTimestamp(c.getMetricDataResults) + assert.Equal(t, c.expectedTimestamp, outputTimestamp) + } +}