From 6371886cce2d0e22a00df9d87afd218cbe1b8ddf Mon Sep 17 00:00:00 2001 From: Cristian Ciutea Date: Mon, 5 Oct 2020 10:41:29 +0200 Subject: [PATCH 1/3] feat:added prometheus-histogram and prometheus-summary support in v4 --- pkg/integrations/v4/dm/emitter_no_register.go | 5 +- pkg/integrations/v4/dm/sender.go | 23 +++- pkg/integrations/v4/dm/sender_test.go | 80 +++++++++++ pkg/integrations/v4/dm/telemetry.go | 129 +++++++++++++++++- pkg/integrations/v4/protocol/types.go | 61 +++++++++ 5 files changed, 287 insertions(+), 11 deletions(-) diff --git a/pkg/integrations/v4/dm/emitter_no_register.go b/pkg/integrations/v4/dm/emitter_no_register.go index ba405a824..e215e59d0 100644 --- a/pkg/integrations/v4/dm/emitter_no_register.go +++ b/pkg/integrations/v4/dm/emitter_no_register.go @@ -117,5 +117,8 @@ func (e *nonRegisterEmitter) Send(dto fwrequest.FwRequest) { } // TODO error handling - elog.Error(composeEmitError(emitErrs, len(integrationData.DataSets)).Error()) + composedError := composeEmitError(emitErrs, len(integrationData.DataSets)) + if composedError != nil { + elog.Error(composedError.Error()) + } } diff --git a/pkg/integrations/v4/dm/sender.go b/pkg/integrations/v4/dm/sender.go index 7bc6d3c86..369763695 100644 --- a/pkg/integrations/v4/dm/sender.go +++ b/pkg/integrations/v4/dm/sender.go @@ -70,8 +70,8 @@ type sender struct { } type Calculator struct { - delta deltaCalculator - rate rate.Calculator + delta deltaCalculator + rate rate.Calculator } type deltaCalculator interface { @@ -109,6 +109,10 @@ func (s *sender) SendMetrics(metrics []protocol.Metric) { c = Conversion{toTelemetry: Gauge{calculate: &Rate{get: s.calculator.rate.GetCumulativeRate}}} case "cumulative-count": c = Conversion{toTelemetry: Count{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} + case "prometheus-summary": + c = Conversion{toMultipleTelemetry: PrometheusSummary{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} + case "prometheus-histogram": + c = Conversion{toMultipleTelemetry: PrometheusHistogram{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} default: logger.WithField("name", metric.Name).WithField("metric-type", metric.Name).Warn("received an unknown metric type") continue @@ -123,12 +127,15 @@ func (s *sender) SendMetrics(metrics []protocol.Metric) { continue } - s.harvester.RecordMetric(recMetric) + for _, m := range recMetric { + s.harvester.RecordMetric(m) + } } } func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Common, metrics []protocol.Metric) error { var dMetrics []telemetry.Metric + for _, metric := range metrics { var c Conversion @@ -146,12 +153,16 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo c = Conversion{toTelemetry: Gauge{calculate: &Rate{get: s.calculator.rate.GetCumulativeRate}}} case "cumulative-count": c = Conversion{toTelemetry: Count{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} - default: + case "prometheus-summary": + c = Conversion{toMultipleTelemetry: PrometheusSummary{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} + case "prometheus-histogram": + c = Conversion{toMultipleTelemetry: PrometheusHistogram{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} + default: logger.WithField("name", metric.Name).WithField("metric-type", metric.Name).Warn("received an unknown metric type") continue } - recMetric, err := c.convert(metric) + recMetrics, err := c.convert(metric) if err != nil { if err != errNoCalculation { @@ -160,7 +171,7 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo } continue } - dMetrics = append(dMetrics, recMetric) + dMetrics = append(dMetrics, recMetrics...) } if len(dMetrics) > 0 { diff --git a/pkg/integrations/v4/dm/sender_test.go b/pkg/integrations/v4/dm/sender_test.go index 122659a75..2dfb67a8d 100644 --- a/pkg/integrations/v4/dm/sender_test.go +++ b/pkg/integrations/v4/dm/sender_test.go @@ -4,7 +4,9 @@ package dm import ( "encoding/json" + "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/cumulative" "io/ioutil" + "math" "os" "testing" "time" @@ -146,6 +148,84 @@ func Test_sender_SendMetrics(t *testing.T) { } } +func Test_sender_SendPrometheusMetrics(t *testing.T) { + cannedDuration, _ := time.ParseDuration("1m7s") + cannedDurationInt := int64(cannedDuration.Seconds() * 1000) + cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location()) + cannedDateUnix := cannedDate.Unix() + type fields struct { + harvester *mockHarvester + } + + type args struct { + metrics []protocol.Metric + } + tests := []struct { + name string + fields fields + args args + expectedMetrics []telemetry.Metric + }{ + { + name: "prometheus-summary", + fields: fields{ + harvester: &mockHarvester{}, + }, + args: args{ + metrics: []protocol.Metric{ + { + Name: "PrometheusSummaryMetric", + Type: "prometheus-summary", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Timestamp: &cannedDateUnix, + Interval: &cannedDurationInt, + Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"quantiles\":[{\"quantile\":95,\"value\":3}]}"), + }, + }, + }, + expectedMetrics: []telemetry.Metric{ + telemetry.Summary{ + Name: "PrometheusSummaryMetric_sum", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Sum: float64(2), + Min: math.NaN(), + Max: math.NaN(), + Count: 1, + Timestamp: cannedDate, + Interval: cannedDuration, + }, + telemetry.Gauge{ + Name: "PrometheusSummaryMetric", + Attributes: map[string]interface{}{"att_key": "att_value", "quantile": "95"}, + Value: float64(3), + Timestamp: cannedDate, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &sender{ + harvester: tt.fields.harvester, + calculator: Calculator{delta: cumulative.NewDeltaCalculator()}, + } + + tt.fields.harvester.On("RecordInfraMetrics", telemetry.Attributes{"one":1, "two":"two"}, tt.expectedMetrics).Return(nil) + err := s.SendMetricsWithCommonAttributes(protocol.Common{ + Timestamp: &cannedDateUnix, + Interval: &cannedDurationInt, + Attributes: map[string]interface{}{ + "one": 1, + "two": "two", + }, + }, tt.args.metrics) + require.NoError(t, err) + tt.fields.harvester.AssertExpectations(t) + }) + } +} + func Test_sender_SenderMetric_cumulative_CountCalculator(t *testing.T) { cannedDuration, _ := time.ParseDuration("1m7s") cannedDurationInt := int64(cannedDuration.Seconds() * 1000) diff --git a/pkg/integrations/v4/dm/telemetry.go b/pkg/integrations/v4/dm/telemetry.go index df1a41660..a8df3fc02 100644 --- a/pkg/integrations/v4/dm/telemetry.go +++ b/pkg/integrations/v4/dm/telemetry.go @@ -4,7 +4,9 @@ package dm import ( "errors" + "fmt" "io" + "math" "net/http" "time" @@ -68,17 +70,39 @@ func telemetryHarvesterWithMetricApiUrl(metricApiUrl string) func(*telemetry.Con } type Conversion struct { - toTelemetry Converter + toTelemetry Converter + toMultipleTelemetry DerivingConvertor } -func (c *Conversion) convert(metric protocol.Metric) (telemetry.Metric, error) { - return c.toTelemetry.from(metric) +func (c *Conversion) convert(metric protocol.Metric) ([]telemetry.Metric, error) { + var result []telemetry.Metric + if c.toTelemetry != nil { + converted, err := c.toTelemetry.from(metric) + if err != nil { + return nil, err + } + result = append(result, converted) + } + + if c.toMultipleTelemetry != nil { + derivedMetrics, err := c.toMultipleTelemetry.derivedFrom(metric) + if err != nil { + return nil, err + } + result = append(result, derivedMetrics...) + } + return result, nil } type Converter interface { from(metric protocol.Metric) (telemetry.Metric, error) } +type DerivingConvertor interface { + // derivedFrom is used when a metric is transformed into multiple telemetry metrics. + derivedFrom(metric protocol.Metric) ([]telemetry.Metric, error) +} + type Count struct { calculate *Cumulative } @@ -130,7 +154,8 @@ func (g *Gauge) shouldCalculate() bool { return g.calculate != nil } -type Summary struct{} +type Summary struct { +} func (Summary) from(metric protocol.Metric) (telemetry.Metric, error) { value, err := metric.SummaryValue() @@ -151,6 +176,102 @@ func (Summary) from(metric protocol.Metric) (telemetry.Metric, error) { }, nil } +type PrometheusHistogram struct { + calculate *Cumulative +} + +func (ph PrometheusHistogram) derivedFrom(metric protocol.Metric) ([]telemetry.Metric, error) { + var result []telemetry.Metric + value, err := metric.GetPrometheusHistogramValue() + + if err != nil { + return nil, err + } + + if ph.calculate != nil { + metricName := metric.Name + "_sum" + sumCount, ok := ph.calculate.get(metricName, metric.Attributes, float64(*value.SampleCount), metric.Time()) + if ok { + result = append(result, telemetry.Summary{ + Name: metricName, + Attributes: metric.Attributes, + Count: 1, + Sum: sumCount.Value, + Min: math.NaN(), + Max: math.NaN(), + Timestamp: metric.Time(), + }) + } else { + telemetryLogger.WithField("name", metricName).WithField("metric-type", metric.Type).Debug(noCalculationMadeErrMsg) + } + + metricName = metric.Name + "_bucket" + for _, b := range value.Buckets { + bucketAttrs := metric.CopyAttrs() + bucketAttrs["le"] = fmt.Sprintf("%g", *b.UpperBound) + + bucketCount, ok := ph.calculate.get( + metricName, + bucketAttrs, + float64(*b.CumulativeCount), + metric.Time(), + ) + if ok { + result = append(result, bucketCount) + } else { + telemetryLogger.WithField("name", metricName).WithField("metric-type", metric.Type).Debug(noCalculationMadeErrMsg) + } + } + } + return result, nil +} + +type PrometheusSummary struct { + calculate *Cumulative +} + +func (p PrometheusSummary) derivedFrom(metric protocol.Metric) ([]telemetry.Metric, error) { + var result []telemetry.Metric + value, err := metric.GetPrometheusSummaryValue() + + if err != nil { + return nil, err + } + + result = append(result, telemetry.Summary{ + Name: metric.Name + "_sum", + Attributes: metric.Attributes, + Count: 1, + Sum: value.SampleSum, + Min: math.NaN(), + Max: math.NaN(), + Timestamp: metric.Time(), + Interval: metric.IntervalDuration(), + }) + + if p.calculate != nil { + metricName := metric.Name + "_count" + countMetric, ok := p.calculate.get(metricName, metric.Attributes, value.SampleCount, metric.Time()) + if ok { + result = append(result, countMetric) + } else { + telemetryLogger.WithField("name", metricName).WithField("metric-type", metric.Type).Debug(noCalculationMadeErrMsg) + } + } + + for _, q := range value.Quantiles { + quantileAttrs := metric.CopyAttrs() + quantileAttrs["quantile"] = fmt.Sprintf("%g", q.Quantile) + result = append(result, telemetry.Gauge{ + Name: metric.Name, + Attributes: quantileAttrs, + Value: q.Value, + Timestamp: metric.Time(), + }) + } + return result, nil +} + type Rate struct { get func(name string, attributes map[string]interface{}, diff --git a/pkg/integrations/v4/protocol/types.go b/pkg/integrations/v4/protocol/types.go index 4c65d7303..8e407cd9d 100644 --- a/pkg/integrations/v4/protocol/types.go +++ b/pkg/integrations/v4/protocol/types.go @@ -18,6 +18,9 @@ const ( MetricTypeSummary MetricType = "summary" MetricTypeGauge MetricType = "gauge" MetricTypeRate MetricType = "rate" + + MetricTypePrometheusSummary MetricType = "prometheus-summary" + MetricTypePrometheusHistogram MetricType = "prometheus-histogram" ) const millisSinceJanuaryFirst1978 = 252489600000 @@ -70,6 +73,33 @@ type SummaryValue struct { Sum float64 `json:"sum"` } +// PrometheusHistogram represents a Prometheus histogram +type PrometheusHistogramValue struct { + SampleCount *uint64 `json:"sample_count,omitempty"` + SampleSum *float64 `json:"sample_sum,omitempty"` + // Buckets defines the buckets into which observations are counted. Each + // element in the slice is the upper inclusive bound of a bucket. The + // values must are sorted in strictly increasing order. + Buckets []*bucket `json:"buckets,omitempty"` +} + +type bucket struct { + CumulativeCount *float64 `json:"cumulative_count,omitempty"` + UpperBound *float64 `json:"upper_bound,omitempty"` +} + +// PrometheusSummary represents a Prometheus summary +type PrometheusSummaryValue struct { + SampleCount float64 `json:"sample_count,omitempty"` + SampleSum float64 `json:"sample_sum,omitempty"` + Quantiles []quantile `json:"quantiles,omitempty"` +} + +type quantile struct { + Quantile float64 `json:"quantile,omitempty"` + Value float64 `json:"value,omitempty"` +} + // PluginDataV1 supports a single data set for a single entity type PluginDataV1 struct { PluginOutputIdentifier @@ -195,3 +225,34 @@ func (m *Metric) SummaryValue() (SummaryValue, error) { return SummaryValue{}, fmt.Errorf("metric type %v is not summary", m.Type) } + +func (m *Metric) GetPrometheusSummaryValue() (PrometheusSummaryValue, error) { + if m.Type == MetricTypePrometheusSummary { + var value PrometheusSummaryValue + err := json.Unmarshal(m.Value, &value) + + return value, err + } + + return PrometheusSummaryValue{}, fmt.Errorf("metric type %v is not prometheus-summary", m.Type) +} + +func (m *Metric) GetPrometheusHistogramValue() (PrometheusHistogramValue, error) { + if m.Type == MetricTypePrometheusHistogram { + var value PrometheusHistogramValue + err := json.Unmarshal(m.Value, &value) + + return value, err + } + + return PrometheusHistogramValue{}, fmt.Errorf("metric type %v is not prometheus-histogram", m.Type) +} + +// CopyAttrs returns a (shallow) copy of the passed attrs. +func (m *Metric) CopyAttrs() map[string]interface{} { + duplicate := make(map[string]interface{}, len(m.Attributes)) + for k, v := range m.Attributes { + duplicate[k] = v + } + return duplicate +} \ No newline at end of file From 01c75e537949f4c07bb9ccaafc04cbf38dd791b2 Mon Sep 17 00:00:00 2001 From: Cristian Ciutea Date: Mon, 5 Oct 2020 14:18:55 +0200 Subject: [PATCH 2/3] test: added test for prometheus summary --- pkg/integrations/v4/dm/sender.go | 20 +++-- pkg/integrations/v4/dm/sender_test.go | 117 ++++++++++++-------------- 2 files changed, 65 insertions(+), 72 deletions(-) diff --git a/pkg/integrations/v4/dm/sender.go b/pkg/integrations/v4/dm/sender.go index 369763695..08424866c 100644 --- a/pkg/integrations/v4/dm/sender.go +++ b/pkg/integrations/v4/dm/sender.go @@ -70,8 +70,8 @@ type sender struct { } type Calculator struct { - delta deltaCalculator - rate rate.Calculator + delta deltaCalculator + rate rate.Calculator } type deltaCalculator interface { @@ -134,6 +134,14 @@ func (s *sender) SendMetrics(metrics []protocol.Metric) { } func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Common, metrics []protocol.Metric) error { + dMetrics := s.convertMetrics(metrics) + if len(dMetrics) > 0 { + return s.harvester.RecordInfraMetrics(commonAttributes.Attributes, dMetrics) + } + return nil +} + +func (s *sender) convertMetrics(metrics []protocol.Metric) []telemetry.Metric { var dMetrics []telemetry.Metric for _, metric := range metrics { @@ -157,7 +165,7 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo c = Conversion{toMultipleTelemetry: PrometheusSummary{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} case "prometheus-histogram": c = Conversion{toMultipleTelemetry: PrometheusHistogram{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} - default: + default: logger.WithField("name", metric.Name).WithField("metric-type", metric.Name).Warn("received an unknown metric type") continue } @@ -173,9 +181,5 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo } dMetrics = append(dMetrics, recMetrics...) } - - if len(dMetrics) > 0 { - return s.harvester.RecordInfraMetrics(commonAttributes.Attributes, dMetrics) - } - return nil + return dMetrics } diff --git a/pkg/integrations/v4/dm/sender_test.go b/pkg/integrations/v4/dm/sender_test.go index 2dfb67a8d..f888a64ea 100644 --- a/pkg/integrations/v4/dm/sender_test.go +++ b/pkg/integrations/v4/dm/sender_test.go @@ -153,77 +153,66 @@ func Test_sender_SendPrometheusMetrics(t *testing.T) { cannedDurationInt := int64(cannedDuration.Seconds() * 1000) cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location()) cannedDateUnix := cannedDate.Unix() - type fields struct { - harvester *mockHarvester - } - type args struct { - metrics []protocol.Metric - } - tests := []struct { - name string - fields fields - args args - expectedMetrics []telemetry.Metric - }{ + protocolMetrics := []protocol.Metric{ { - name: "prometheus-summary", - fields: fields{ - harvester: &mockHarvester{}, - }, - args: args{ - metrics: []protocol.Metric{ - { - Name: "PrometheusSummaryMetric", - Type: "prometheus-summary", - Attributes: map[string]interface{}{"att_key": "att_value"}, - Timestamp: &cannedDateUnix, - Interval: &cannedDurationInt, - Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"quantiles\":[{\"quantile\":95,\"value\":3}]}"), - }, - }, - }, - expectedMetrics: []telemetry.Metric{ - telemetry.Summary{ - Name: "PrometheusSummaryMetric_sum", - Attributes: map[string]interface{}{"att_key": "att_value"}, - Sum: float64(2), - Min: math.NaN(), - Max: math.NaN(), - Count: 1, - Timestamp: cannedDate, - Interval: cannedDuration, - }, - telemetry.Gauge{ - Name: "PrometheusSummaryMetric", - Attributes: map[string]interface{}{"att_key": "att_value", "quantile": "95"}, - Value: float64(3), - Timestamp: cannedDate, - }, - }, + Name: "PrometheusSummaryMetric", + Type: "prometheus-summary", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Timestamp: &cannedDateUnix, + Interval: &cannedDurationInt, + Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"quantiles\":[{\"quantile\":95,\"value\":3}]}"), }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := &sender{ - harvester: tt.fields.harvester, - calculator: Calculator{delta: cumulative.NewDeltaCalculator()}, - } + expectedMetrics := []telemetry.Metric{ + telemetry.Summary{ + Name: "PrometheusSummaryMetric_sum", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Sum: float64(2), + Min: math.NaN(), + Max: math.NaN(), + Count: 1, + Timestamp: cannedDate, + Interval: cannedDuration, + }, + telemetry.Gauge{ + Name: "PrometheusSummaryMetric", + Attributes: map[string]interface{}{"att_key": "att_value", "quantile": "95"}, + Value: float64(3), + Timestamp: cannedDate, + }, + } - tt.fields.harvester.On("RecordInfraMetrics", telemetry.Attributes{"one":1, "two":"two"}, tt.expectedMetrics).Return(nil) - err := s.SendMetricsWithCommonAttributes(protocol.Common{ - Timestamp: &cannedDateUnix, - Interval: &cannedDurationInt, - Attributes: map[string]interface{}{ - "one": 1, - "two": "two", - }, - }, tt.args.metrics) - require.NoError(t, err) - tt.fields.harvester.AssertExpectations(t) - }) + s := &sender{ + calculator: Calculator{delta: cumulative.NewDeltaCalculator()}, } + + // Call convert twice to check deltas. + converted := s.convertMetrics(protocolMetrics) + assert.Len(t, converted, len(expectedMetrics)) + + actualSummaryMetric, ok := converted[0].(telemetry.Summary) + assert.True(t, ok) + + expectedSummaryMetric, ok := expectedMetrics[0].(telemetry.Summary) + assert.True(t, ok) + + assert.Equal(t, actualSummaryMetric.Count, expectedSummaryMetric.Count) + assert.Equal(t, actualSummaryMetric.Attributes, expectedSummaryMetric.Attributes) + assert.Equal(t, actualSummaryMetric.Sum, expectedSummaryMetric.Sum) + assert.True(t, math.IsNaN(actualSummaryMetric.Min)) + assert.True(t, math.IsNaN(actualSummaryMetric.Max)) + assert.Equal(t, actualSummaryMetric.Count, expectedSummaryMetric.Count) + assert.Equal(t, actualSummaryMetric.Timestamp, expectedSummaryMetric.Timestamp) + assert.Equal(t, actualSummaryMetric.Interval, expectedSummaryMetric.Interval) + + actualGaugeMetric, ok := converted[1].(telemetry.Gauge) + assert.True(t, ok) + + expectedGaugeMetric, ok := expectedMetrics[1].(telemetry.Gauge) + assert.True(t, ok) + assert.Equal(t, expectedGaugeMetric, actualGaugeMetric) } func Test_sender_SenderMetric_cumulative_CountCalculator(t *testing.T) { From 95b8a98b406972c0db180c598032b8606cdb28a1 Mon Sep 17 00:00:00 2001 From: Cristian Ciutea Date: Thu, 8 Oct 2020 16:59:33 +0200 Subject: [PATCH 3/3] added unit test for prometheus histogram --- pkg/integrations/v4/dm/sender_test.go | 127 +++++++++++++++++++++++--- pkg/integrations/v4/protocol/types.go | 8 +- 2 files changed, 119 insertions(+), 16 deletions(-) diff --git a/pkg/integrations/v4/dm/sender_test.go b/pkg/integrations/v4/dm/sender_test.go index 015957c80..d689e03b9 100644 --- a/pkg/integrations/v4/dm/sender_test.go +++ b/pkg/integrations/v4/dm/sender_test.go @@ -162,7 +162,7 @@ func Test_sender_SendMetrics(t *testing.T) { } } -func Test_sender_SendPrometheusMetrics(t *testing.T) { +func Test_sender_SendPrometheusSummary(t *testing.T) { cannedDuration, _ := time.ParseDuration("1m7s") cannedDurationInt := int64(cannedDuration.Seconds() * 1000) cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location()) @@ -187,14 +187,21 @@ func Test_sender_SendPrometheusMetrics(t *testing.T) { Min: math.NaN(), Max: math.NaN(), Count: 1, - Timestamp: cannedDate, + Timestamp: cannedDate.Add(time.Second), Interval: cannedDuration, }, + telemetry.Count{ + Name: "PrometheusSummaryMetric_count", + AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\"}"), + Value: float64(0), + Timestamp: cannedDate, + Interval: 1 * time.Second, + }, telemetry.Gauge{ Name: "PrometheusSummaryMetric", Attributes: map[string]interface{}{"att_key": "att_value", "quantile": "95"}, Value: float64(3), - Timestamp: cannedDate, + Timestamp: cannedDate.Add(time.Second), }, } @@ -202,7 +209,10 @@ func Test_sender_SendPrometheusMetrics(t *testing.T) { calculator: Calculator{delta: cumulative.NewDeltaCalculator()}, } - // Call convert twice to check deltas. + _ = s.convertMetrics(protocolMetrics) + + // After one second call convert metrics again to check deltas. + cannedDateUnix += 1 converted := s.convertMetrics(protocolMetrics) assert.Len(t, converted, len(expectedMetrics)) @@ -212,21 +222,114 @@ func Test_sender_SendPrometheusMetrics(t *testing.T) { expectedSummaryMetric, ok := expectedMetrics[0].(telemetry.Summary) assert.True(t, ok) - assert.Equal(t, actualSummaryMetric.Count, expectedSummaryMetric.Count) - assert.Equal(t, actualSummaryMetric.Attributes, expectedSummaryMetric.Attributes) - assert.Equal(t, actualSummaryMetric.Sum, expectedSummaryMetric.Sum) + assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count) + assert.Equal(t, expectedSummaryMetric.Attributes, actualSummaryMetric.Attributes) + assert.Equal(t, expectedSummaryMetric.Sum, actualSummaryMetric.Sum) assert.True(t, math.IsNaN(actualSummaryMetric.Min)) assert.True(t, math.IsNaN(actualSummaryMetric.Max)) - assert.Equal(t, actualSummaryMetric.Count, expectedSummaryMetric.Count) - assert.Equal(t, actualSummaryMetric.Timestamp, expectedSummaryMetric.Timestamp) - assert.Equal(t, actualSummaryMetric.Interval, expectedSummaryMetric.Interval) + assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count) + assert.Equal(t, expectedSummaryMetric.Timestamp, actualSummaryMetric.Timestamp) + assert.Equal(t, expectedSummaryMetric.Interval, actualSummaryMetric.Interval) + + actualCountMetric, ok := converted[1].(telemetry.Count) + assert.True(t, ok) - actualGaugeMetric, ok := converted[1].(telemetry.Gauge) + expectedCountMetric, ok := expectedMetrics[1].(telemetry.Count) assert.True(t, ok) + assert.Equal(t, actualCountMetric, expectedCountMetric) - expectedGaugeMetric, ok := expectedMetrics[1].(telemetry.Gauge) + actualGaugeMetric, ok := converted[2].(telemetry.Gauge) + assert.True(t, ok) + + expectedGaugeMetric, ok := expectedMetrics[2].(telemetry.Gauge) assert.True(t, ok) assert.Equal(t, expectedGaugeMetric, actualGaugeMetric) + assert.Equal(t, expectedGaugeMetric, actualGaugeMetric) +} + +func Test_sender_SendPrometheusHistogram(t *testing.T) { + cannedDuration, _ := time.ParseDuration("1m7s") + cannedDurationInt := int64(cannedDuration.Seconds() * 1000) + cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location()) + cannedDateUnix := cannedDate.Unix() + + protocolMetrics := []protocol.Metric{ + { + Name: "PrometheusHistogramMetric", + Type: "prometheus-histogram", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Timestamp: &cannedDateUnix, + Interval: &cannedDurationInt, + Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"buckets\":[{\"cumulative_count\":95,\"upper_bound\":3},{\"cumulative_count\":90,\"upper_bound\":4}]}"), + }, + } + + expectedMetrics := []telemetry.Metric{ + telemetry.Summary{ + Name: "PrometheusHistogramMetric_sum", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Sum: float64(0), + Min: math.NaN(), + Max: math.NaN(), + Count: 1, + Timestamp: cannedDate.Add(time.Second), + }, + telemetry.Count{ + Name: "PrometheusHistogramMetric_bucket", + AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\",\"le\":\"3\"}"), + + Value: float64(0), + Interval: 1 * time.Second, + Timestamp: cannedDate, + }, + telemetry.Count{ + Name: "PrometheusHistogramMetric_bucket", + AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\",\"le\":\"4\"}"), + + Value: float64(0), + Interval: 1 * time.Second, + Timestamp: cannedDate, + }, + } + + s := &sender{ + calculator: Calculator{delta: cumulative.NewDeltaCalculator()}, + } + + _ = s.convertMetrics(protocolMetrics) + + // After one second call convert metrics again to check deltas. + cannedDateUnix += 1 + converted := s.convertMetrics(protocolMetrics) + assert.Len(t, converted, len(expectedMetrics)) + + actualSummaryMetric, ok := converted[0].(telemetry.Summary) + assert.True(t, ok) + + expectedSummaryMetric, ok := expectedMetrics[0].(telemetry.Summary) + assert.True(t, ok) + + assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count) + assert.Equal(t, expectedSummaryMetric.Attributes, actualSummaryMetric.Attributes) + assert.Equal(t, expectedSummaryMetric.Sum, actualSummaryMetric.Sum) + assert.True(t, math.IsNaN(actualSummaryMetric.Min)) + assert.True(t, math.IsNaN(actualSummaryMetric.Max)) + assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count) + assert.Equal(t, expectedSummaryMetric.Timestamp, actualSummaryMetric.Timestamp) + + actualCountMetric, ok := converted[1].(telemetry.Count) + assert.True(t, ok) + + expectedCountMetric, ok := expectedMetrics[1].(telemetry.Count) + assert.True(t, ok) + assert.Equal(t, expectedCountMetric, actualCountMetric) + + actualCountMetric2, ok := converted[2].(telemetry.Count) + assert.True(t, ok) + + expectedCountMetric2, ok := expectedMetrics[2].(telemetry.Count) + assert.True(t, ok) + assert.Equal(t, expectedCountMetric2, actualCountMetric2) } func Test_sender_SenderMetric_cumulative_CountCalculator(t *testing.T) { diff --git a/pkg/integrations/v4/protocol/types.go b/pkg/integrations/v4/protocol/types.go index 8e407cd9d..61c6a2ec9 100644 --- a/pkg/integrations/v4/protocol/types.go +++ b/pkg/integrations/v4/protocol/types.go @@ -19,7 +19,7 @@ const ( MetricTypeGauge MetricType = "gauge" MetricTypeRate MetricType = "rate" - MetricTypePrometheusSummary MetricType = "prometheus-summary" + MetricTypePrometheusSummary MetricType = "prometheus-summary" MetricTypePrometheusHistogram MetricType = "prometheus-histogram" ) @@ -84,13 +84,13 @@ type PrometheusHistogramValue struct { } type bucket struct { - CumulativeCount *float64 `json:"cumulative_count,omitempty"` + CumulativeCount *float64 `json:"cumulative_count,omitempty"` UpperBound *float64 `json:"upper_bound,omitempty"` } // PrometheusSummary represents a Prometheus summary type PrometheusSummaryValue struct { - SampleCount float64 `json:"sample_count,omitempty"` + SampleCount float64 `json:"sample_count,omitempty"` SampleSum float64 `json:"sample_sum,omitempty"` Quantiles []quantile `json:"quantiles,omitempty"` } @@ -255,4 +255,4 @@ func (m *Metric) CopyAttrs() map[string]interface{} { duplicate[k] = v } return duplicate -} \ No newline at end of file +}