Skip to content

Commit

Permalink
Update grouping key, add event.created, drop ID()
Browse files Browse the repository at this point in the history
# Update grouping key

The dimensionsKey contains all dimension fields values we want to use
to group the time series.

We need to add the timestamp to the key, so we only group time
series with the same timestamp.

# Add `event.created` field

We need to add an extra dimension to avoid data loss on TSDB
since GCP metrics with the same @timestamp become visible with
different "ingest delay".

For the full context, read elastic/integrations#6568 (comment)

# Drop ID() function

Remove the `ID()` function from the Metadata Collector.

Since we are unifying the metric grouping logic for all metric types, we
don't need to keep the `ID()` function anymore.

# Renaming

I also renamed some structs, functions, and variables with the purpose
of making their role and purpose more clear.

We can remove this part if it does not improve clarity.
  • Loading branch information
zmoog committed Nov 2, 2023
1 parent e329b58 commit 3d02db0
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 186 deletions.
9 changes: 0 additions & 9 deletions x-pack/metricbeat/module/gcp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
// metricset. For example, Compute instance labels.
type MetadataService interface {
MetadataCollector
Identity
}

// MetadataCollector must be implemented by services that has special code needs that aren't fulfilled by the Stackdriver
Expand Down Expand Up @@ -65,11 +64,3 @@ type MetadataCollectorData struct {
Labels mapstr.M
ECS mapstr.M
}

// Identity must be implemented by GCP services that can add some short of data to group their metrics (like instance
// id on Compute or topic in PubSub)
type Identity interface {

// ID returns a unique identifier to group many metrics into a single event
ID(ctx context.Context, in *MetadataCollectorInputData) (string, error)
}
19 changes: 0 additions & 19 deletions x-pack/metricbeat/module/gcp/metrics/cloudsql/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package cloudsql

import (
"context"
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -158,24 +157,6 @@ func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, re
return cloudsqlMetadata, nil
}

func (s *metadataCollector) ID(ctx context.Context, in *gcp.MetadataCollectorInputData) (string, error) {
metadata, err := s.Metadata(ctx, in.TimeSeries)
if err != nil {
return "", err
}

metadata.ECS.Update(metadata.Labels)
if in.Timestamp != nil {
_, _ = metadata.ECS.Put("timestamp", in.Timestamp)
} else if in.Point != nil {
_, _ = metadata.ECS.Put("timestamp", in.Point.Interval.EndTime)
} else {
return "", errors.New("no timestamp information found")
}

return metadata.ECS.String(), nil
}

func (s *metadataCollector) instance(ctx context.Context, instanceName string) (*sqladmin.DatabaseInstance, error) {
s.getInstances(ctx)

Expand Down
32 changes: 0 additions & 32 deletions x-pack/metricbeat/module/gcp/metrics/compute/identity.go

This file was deleted.

57 changes: 45 additions & 12 deletions x-pack/metricbeat/module/gcp/metrics/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,45 +182,78 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) (err erro
for _, v := range sdc.MetricTypes {
metricsToCollect[sdc.AddPrefixTo(v)] = m.metricsMeta[sdc.AddPrefixTo(v)]
}
responses, err := m.requester.Metrics(ctx, sdc.ServiceName, sdc.Aligner, metricsToCollect)

// Collect time series values from Google Cloud Monitoring API
timeSeries, err := m.requester.Metrics(ctx, sdc.ServiceName, sdc.Aligner, metricsToCollect)
if err != nil {
err = fmt.Errorf("error trying to get metrics for project '%s' and zone '%s' or region '%s': %w", m.config.ProjectID, m.config.Zone, m.config.Region, err)
m.Logger().Error(err)
return err
}

events, err := m.eventMapping(ctx, responses, sdc)
events, err := m.mapToEvents(ctx, timeSeries, sdc)
if err != nil {
err = fmt.Errorf("eventMapping failed: %w", err)
err = fmt.Errorf("mapToEvents failed: %w", err)
m.Logger().Error(err)
return err
}

// Publish events to Elasticsearch
m.Logger().Debugf("Total %d of events are created for service name = %s and metric type = %s.", len(events), sdc.ServiceName, sdc.MetricTypes)
for _, event := range events {
reporter.Event(event)
}
}

return nil
}

func (m *MetricSet) eventMapping(ctx context.Context, tss []timeSeriesWithAligner, sdc metricsConfig) ([]mb.Event, error) {
e := newIncomingFieldExtractor(m.Logger(), sdc)
// mapToEvents maps time series data from GCP into events for Elasticsearch.
func (m *MetricSet) mapToEvents(ctx context.Context, timeSeries []timeSeriesWithAligner, sdc metricsConfig) ([]mb.Event, error) {
mapper := newIncomingFieldMapper(m.Logger(), sdc)

var gcpService = gcp.NewStackdriverMetadataServiceForTimeSeries(nil)
var metadataService = gcp.NewStackdriverMetadataServiceForTimeSeries(nil)
var err error

if !m.config.ExcludeLabels {
if gcpService, err = NewMetadataServiceForConfig(m.config, sdc.ServiceName); err != nil {
if metadataService, err = NewMetadataServiceForConfig(m.config, sdc.ServiceName); err != nil {
return nil, fmt.Errorf("error trying to create metadata service: %w", err)
}
}

tsGrouped := m.timeSeriesGrouped(ctx, gcpService, tss, e)

//Create single events for each group of data that matches some common patterns like labels and timestamp

events := createEventsFromGroups(sdc.ServiceName, tsGrouped)
// Group the time series values by common traits.
timeSeriesGroups := m.groupTimeSeries(ctx, timeSeries, metadataService, mapper)

// Generate a `created` event timestamp for all events collected in this batch.
//
// Why do we need keep track of the event creation timestamp?
// ----------------------------------------------------------
//
// GCP metrics have different ingestion delays; after GCP collects a metric from
// a resource, it takes some time to be available for ingestion.
//
// Some metrics have no ingestion delay, while others have a delay of up to multiple
// minutes.
//
// For example,
// - `container/memory.limit.bytes` has no ingest delay, while
// - `container/memory/request_bytes` has two minutes ingest delay.
//
// So, even if the metricset collects these metrics at two minutes apart, the metrics
// will have the same timestamp.
//
// When metrics have the same timestamp and dimensions, the metricset will group them
// into a single event. However, the metricset cannot group metrics collected at different
// times.
//
// The metricset cannot group the events from different collections, so we need
// to add an `event.created` field to avoid having two documents with the same timestamp
// and dimensions.
//
eventCreatedTime := time.Now().UTC()

// Create single events for each time series group.
events := createEventsFromGroups(sdc.ServiceName, timeSeriesGroups, eventCreatedTime)

return events, nil
}
Expand Down
18 changes: 0 additions & 18 deletions x-pack/metricbeat/module/gcp/metrics/redis/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,6 @@ type metadataCollector struct {
logger *logp.Logger
}

func (s *metadataCollector) ID(ctx context.Context, in *gcp.MetadataCollectorInputData) (string, error) {
metadata, err := s.Metadata(ctx, in.TimeSeries)
if err != nil {
return "", err
}

metadata.ECS.Update(metadata.Labels)
if in.Timestamp != nil {
_, _ = metadata.ECS.Put("timestamp", in.Timestamp)
} else if in.Point != nil {
_, _ = metadata.ECS.Put("timestamp", in.Point.Interval.EndTime)
} else {
return "", fmt.Errorf("no timestamp information found")
}

return metadata.ECS.String(), nil
}

// Metadata implements googlecloud.MetadataCollector to the known set of labels from a Redis TimeSeries single point of data.
func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.TimeSeries) (gcp.MetadataCollectorData, error) {
metadata, err := s.instanceMetadata(ctx, s.instanceID(resp), s.instanceRegion(resp))
Expand Down
23 changes: 16 additions & 7 deletions x-pack/metricbeat/module/gcp/metrics/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

func newIncomingFieldExtractor(l *logp.Logger, mc metricsConfig) *incomingFieldExtractor {
return &incomingFieldExtractor{logger: l, mc: mc}
// newIncomingFieldMapper creates a new incomingFieldMapper.
func newIncomingFieldMapper(l *logp.Logger, mc metricsConfig) *incomingFieldMapper {
return &incomingFieldMapper{logger: l, mc: mc}
}

type incomingFieldExtractor struct {
// incomingFieldMapper is a component that maps the incoming data from GCP to a slice
// of KeyValuePoint structs.
type incomingFieldMapper struct {
logger *logp.Logger
mc metricsConfig
}

// KeyValuePoint is a struct to capture the information parsed in an instant of a single metric
// KeyValuePoint is a struct to capture the information parsed in an instant of a single metric.
//
// The metricset uses the KeyValuePoint struct internally to represent a single metric value,
// @timestamp, and metadata.
type KeyValuePoint struct {
Key string
Value interface{}
Expand All @@ -34,8 +40,11 @@ type KeyValuePoint struct {
Timestamp time.Time
}

// extractTimeSeriesMetricValues valuable to send to Elasticsearch. This includes, for example, metric values, labels and timestamps
func (e *incomingFieldExtractor) extractTimeSeriesMetricValues(resp *monitoringpb.TimeSeries, aligner string) (points []KeyValuePoint) {
// mapTimeSeriesToKeyValuesPoints maps TimeSeries data from GCP to a slice of KeyValuePoint structs.
//
// The metricset uses the KeyValuePoint struct internally to represent a single metric value with
// its corresponding metadata.
func (e *incomingFieldMapper) mapTimeSeriesToKeyValuesPoints(resp *monitoringpb.TimeSeries, aligner string) (points []KeyValuePoint) {
points = make([]KeyValuePoint, 0)

for _, point := range resp.Points {
Expand All @@ -58,7 +67,7 @@ func (e *incomingFieldExtractor) extractTimeSeriesMetricValues(resp *monitoringp
return points
}

func (e *incomingFieldExtractor) getTimestamp(p *monitoringpb.Point) (ts time.Time, err error) {
func (e *incomingFieldMapper) getTimestamp(p *monitoringpb.Point) (ts time.Time, err error) {
// Don't add point intervals that can't be "stated" at some timestamp.
if p.Interval != nil {
return p.Interval.EndTime.AsTime(), nil
Expand Down
50 changes: 33 additions & 17 deletions x-pack/metricbeat/module/gcp/metrics/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"context"
"fmt"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/elastic/beats/v7/x-pack/metricbeat/module/gcp"
"github.com/elastic/elastic-agent-libs/mapstr"
"time"
)

func getKeyValue(m mapstr.M, field string) string {
Expand All @@ -27,9 +27,12 @@ func getKeyValue(m mapstr.M, field string) string {
return strVal
}

func createDimensionsKey(kv KeyValuePoint) string {
// Figure the list of dimensions that we want to group by from kv.ECS and kv.Labels

// createGroupingKey returns a key to group metrics by dimensions.
//
// At a high level, the key is made of the following components:
// - @timestamp
// - list dimension values
func createGroupingKey(kv KeyValuePoint) string {
accountID := getKeyValue(kv.ECS, "cloud.account.id")
az := getKeyValue(kv.ECS, "cloud.availability_zone")
instanceID := getKeyValue(kv.ECS, "cloud.instance.id")
Expand All @@ -49,18 +52,29 @@ func createDimensionsKey(kv KeyValuePoint) string {
return dimensionsKey
}

// groupMetricsByDimensions returns a map of metrics grouped by dimensions.
func groupMetricsByDimensions(keyValues []KeyValuePoint) map[string][]KeyValuePoint {
groupedMetrics := make(map[string][]KeyValuePoint)

for _, kv := range keyValues {
dimensionsKey := createDimensionsKey(kv)
dimensionsKey := createGroupingKey(kv)
groupedMetrics[dimensionsKey] = append(groupedMetrics[dimensionsKey], kv)
}

return groupedMetrics
}

func createEventsFromGroups(service string, groups map[string][]KeyValuePoint) []mb.Event {
// createEventsFromGroups returns a slice of events from the metric groups.
//
// Each group is made or one or more metrics, so the function collapses the
// metrics in each group into a single event:
//
// []KeyValuePoint -> mb.Event
//
// Collapsing the metrics in each group into a single event should not cause
// any loss of information, since all metrics in a group share the same timestamp
// and dimensions.
func createEventsFromGroups(service string, groups map[string][]KeyValuePoint, eventCreatedTime time.Time) []mb.Event {
events := make([]mb.Event, 0, len(groups))

for _, group := range groups {
Expand All @@ -82,28 +96,31 @@ func createEventsFromGroups(service string, groups map[string][]KeyValuePoint) [
event.RootFields = group[0].ECS
}

_, _ = event.RootFields.Put("event.created", eventCreatedTime)

events = append(events, event)
}

return events
}

// timeSeriesGrouped groups TimeSeries responses into common Elasticsearch friendly events. This is to avoid sending
// events with a single metric that shares info (like timestamp) with another event with a single metric too
func (m *MetricSet) timeSeriesGrouped(ctx context.Context, gcpService gcp.MetadataService, tsas []timeSeriesWithAligner, e *incomingFieldExtractor) map[string][]KeyValuePoint {
metadataService := gcpService
// groupTimeSeries groups TimeSeries into Elasticsearch friendly events.
//
// By grouping multiple TimeSeries (according to @timestamp and dimensions) into single event,
// we can avoid sending events with a single metric.
func (m *MetricSet) groupTimeSeries(ctx context.Context, timeSeries []timeSeriesWithAligner, defaultMetadataService gcp.MetadataService, mapper *incomingFieldMapper) map[string][]KeyValuePoint {
metadataService := defaultMetadataService

var kvs []KeyValuePoint

for _, tsa := range tsas {
for _, tsa := range timeSeries {
aligner := tsa.aligner
for _, ts := range tsa.timeSeries {
keyValues := e.extractTimeSeriesMetricValues(ts, aligner)

sdCollectorInputData := gcp.NewStackdriverCollectorInputData(ts, m.config.ProjectID, m.config.Zone, m.config.Region, m.config.Regions)
if gcpService == nil {
if defaultMetadataService == nil {
metadataService = gcp.NewStackdriverMetadataServiceForTimeSeries(ts)
}
sdCollectorInputData := gcp.NewStackdriverCollectorInputData(ts, m.config.ProjectID, m.config.Zone, m.config.Region, m.config.Regions)
keyValues := mapper.mapTimeSeriesToKeyValuesPoints(ts, aligner)

for i := range keyValues {
sdCollectorInputData.Timestamp = &keyValues[i].Timestamp
Expand All @@ -116,7 +133,6 @@ func (m *MetricSet) timeSeriesGrouped(ctx context.Context, gcpService gcp.Metada

keyValues[i].ECS = metadataCollectorData.ECS
keyValues[i].Labels = metadataCollectorData.Labels

}

kvs = append(kvs, keyValues...)
Expand Down
Loading

0 comments on commit 3d02db0

Please sign in to comment.