diff --git a/pkg/integrations/v4/dm/emitter_no_register.go b/pkg/integrations/v4/dm/emitter_no_register.go index ba405a824..e215e59d0 100644 --- a/pkg/integrations/v4/dm/emitter_no_register.go +++ b/pkg/integrations/v4/dm/emitter_no_register.go @@ -117,5 +117,8 @@ func (e *nonRegisterEmitter) Send(dto fwrequest.FwRequest) { } // TODO error handling - elog.Error(composeEmitError(emitErrs, len(integrationData.DataSets)).Error()) + composedError := composeEmitError(emitErrs, len(integrationData.DataSets)) + if composedError != nil { + elog.Error(composedError.Error()) + } } diff --git a/pkg/integrations/v4/dm/sender.go b/pkg/integrations/v4/dm/sender.go index 135ef6532..b771a4fee 100644 --- a/pkg/integrations/v4/dm/sender.go +++ b/pkg/integrations/v4/dm/sender.go @@ -94,6 +94,10 @@ func (s *sender) SendMetrics(metrics []protocol.Metric) { c = Conversion{toTelemetry: Gauge{calculate: &Rate{get: s.calculator.rate.GetCumulativeRate}}} case "cumulative-count": c = Conversion{toTelemetry: Count{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} + case "prometheus-summary": + c = Conversion{toMultipleTelemetry: PrometheusSummary{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} + case "prometheus-histogram": + c = Conversion{toMultipleTelemetry: PrometheusHistogram{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} default: logger.WithField("name", metric.Name).WithField("metric-type", metric.Name).Warn("received an unknown metric type") continue @@ -108,12 +112,23 @@ func (s *sender) SendMetrics(metrics []protocol.Metric) { continue } - s.harvester.RecordMetric(recMetric) + for _, m := range recMetric { + s.harvester.RecordMetric(m) + } } } func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Common, metrics []protocol.Metric) error { + dMetrics := s.convertMetrics(metrics) + if len(dMetrics) > 0 { + return s.harvester.RecordInfraMetrics(commonAttributes.Attributes, dMetrics) + } + return nil +} + +func (s *sender) convertMetrics(metrics []protocol.Metric) []telemetry.Metric { var dMetrics []telemetry.Metric + for _, metric := range metrics { var c Conversion @@ -131,12 +146,16 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo c = Conversion{toTelemetry: Gauge{calculate: &Rate{get: s.calculator.rate.GetCumulativeRate}}} case "cumulative-count": c = Conversion{toTelemetry: Count{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} + case "prometheus-summary": + c = Conversion{toMultipleTelemetry: PrometheusSummary{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} + case "prometheus-histogram": + c = Conversion{toMultipleTelemetry: PrometheusHistogram{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}} default: logger.WithField("name", metric.Name).WithField("metric-type", metric.Name).Warn("received an unknown metric type") continue } - recMetric, err := c.convert(metric) + recMetrics, err := c.convert(metric) if err != nil { if err != errNoCalculation { @@ -145,11 +164,7 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo } continue } - dMetrics = append(dMetrics, recMetric) + dMetrics = append(dMetrics, recMetrics...) } - - if len(dMetrics) > 0 { - return s.harvester.RecordInfraMetrics(commonAttributes.Attributes, dMetrics) - } - return nil + return dMetrics } diff --git a/pkg/integrations/v4/dm/sender_test.go b/pkg/integrations/v4/dm/sender_test.go index 469505e43..d689e03b9 100644 --- a/pkg/integrations/v4/dm/sender_test.go +++ b/pkg/integrations/v4/dm/sender_test.go @@ -4,7 +4,9 @@ package dm import ( "encoding/json" + "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/cumulative" "io/ioutil" + "math" "os" "testing" "time" @@ -160,6 +162,176 @@ func Test_sender_SendMetrics(t *testing.T) { } } +func Test_sender_SendPrometheusSummary(t *testing.T) { + cannedDuration, _ := time.ParseDuration("1m7s") + cannedDurationInt := int64(cannedDuration.Seconds() * 1000) + cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location()) + cannedDateUnix := cannedDate.Unix() + + protocolMetrics := []protocol.Metric{ + { + Name: "PrometheusSummaryMetric", + Type: "prometheus-summary", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Timestamp: &cannedDateUnix, + Interval: &cannedDurationInt, + Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"quantiles\":[{\"quantile\":95,\"value\":3}]}"), + }, + } + + expectedMetrics := []telemetry.Metric{ + telemetry.Summary{ + Name: "PrometheusSummaryMetric_sum", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Sum: float64(2), + Min: math.NaN(), + Max: math.NaN(), + Count: 1, + Timestamp: cannedDate.Add(time.Second), + Interval: cannedDuration, + }, + telemetry.Count{ + Name: "PrometheusSummaryMetric_count", + AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\"}"), + Value: float64(0), + Timestamp: cannedDate, + Interval: 1 * time.Second, + }, + telemetry.Gauge{ + Name: "PrometheusSummaryMetric", + Attributes: map[string]interface{}{"att_key": "att_value", "quantile": "95"}, + Value: float64(3), + Timestamp: cannedDate.Add(time.Second), + }, + } + + s := &sender{ + calculator: Calculator{delta: cumulative.NewDeltaCalculator()}, + } + + _ = s.convertMetrics(protocolMetrics) + + // After one second call convert metrics again to check deltas. + cannedDateUnix += 1 + converted := s.convertMetrics(protocolMetrics) + assert.Len(t, converted, len(expectedMetrics)) + + actualSummaryMetric, ok := converted[0].(telemetry.Summary) + assert.True(t, ok) + + expectedSummaryMetric, ok := expectedMetrics[0].(telemetry.Summary) + assert.True(t, ok) + + assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count) + assert.Equal(t, expectedSummaryMetric.Attributes, actualSummaryMetric.Attributes) + assert.Equal(t, expectedSummaryMetric.Sum, actualSummaryMetric.Sum) + assert.True(t, math.IsNaN(actualSummaryMetric.Min)) + assert.True(t, math.IsNaN(actualSummaryMetric.Max)) + assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count) + assert.Equal(t, expectedSummaryMetric.Timestamp, actualSummaryMetric.Timestamp) + assert.Equal(t, expectedSummaryMetric.Interval, actualSummaryMetric.Interval) + + actualCountMetric, ok := converted[1].(telemetry.Count) + assert.True(t, ok) + + expectedCountMetric, ok := expectedMetrics[1].(telemetry.Count) + assert.True(t, ok) + assert.Equal(t, actualCountMetric, expectedCountMetric) + + actualGaugeMetric, ok := converted[2].(telemetry.Gauge) + assert.True(t, ok) + + expectedGaugeMetric, ok := expectedMetrics[2].(telemetry.Gauge) + assert.True(t, ok) + assert.Equal(t, expectedGaugeMetric, actualGaugeMetric) + assert.Equal(t, expectedGaugeMetric, actualGaugeMetric) +} + +func Test_sender_SendPrometheusHistogram(t *testing.T) { + cannedDuration, _ := time.ParseDuration("1m7s") + cannedDurationInt := int64(cannedDuration.Seconds() * 1000) + cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location()) + cannedDateUnix := cannedDate.Unix() + + protocolMetrics := []protocol.Metric{ + { + Name: "PrometheusHistogramMetric", + Type: "prometheus-histogram", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Timestamp: &cannedDateUnix, + Interval: &cannedDurationInt, + Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"buckets\":[{\"cumulative_count\":95,\"upper_bound\":3},{\"cumulative_count\":90,\"upper_bound\":4}]}"), + }, + } + + expectedMetrics := []telemetry.Metric{ + telemetry.Summary{ + Name: "PrometheusHistogramMetric_sum", + Attributes: map[string]interface{}{"att_key": "att_value"}, + Sum: float64(0), + Min: math.NaN(), + Max: math.NaN(), + Count: 1, + Timestamp: cannedDate.Add(time.Second), + }, + telemetry.Count{ + Name: "PrometheusHistogramMetric_bucket", + AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\",\"le\":\"3\"}"), + + Value: float64(0), + Interval: 1 * time.Second, + Timestamp: cannedDate, + }, + telemetry.Count{ + Name: "PrometheusHistogramMetric_bucket", + AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\",\"le\":\"4\"}"), + + Value: float64(0), + Interval: 1 * time.Second, + Timestamp: cannedDate, + }, + } + + s := &sender{ + calculator: Calculator{delta: cumulative.NewDeltaCalculator()}, + } + + _ = s.convertMetrics(protocolMetrics) + + // After one second call convert metrics again to check deltas. + cannedDateUnix += 1 + converted := s.convertMetrics(protocolMetrics) + assert.Len(t, converted, len(expectedMetrics)) + + actualSummaryMetric, ok := converted[0].(telemetry.Summary) + assert.True(t, ok) + + expectedSummaryMetric, ok := expectedMetrics[0].(telemetry.Summary) + assert.True(t, ok) + + assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count) + assert.Equal(t, expectedSummaryMetric.Attributes, actualSummaryMetric.Attributes) + assert.Equal(t, expectedSummaryMetric.Sum, actualSummaryMetric.Sum) + assert.True(t, math.IsNaN(actualSummaryMetric.Min)) + assert.True(t, math.IsNaN(actualSummaryMetric.Max)) + assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count) + assert.Equal(t, expectedSummaryMetric.Timestamp, actualSummaryMetric.Timestamp) + + actualCountMetric, ok := converted[1].(telemetry.Count) + assert.True(t, ok) + + expectedCountMetric, ok := expectedMetrics[1].(telemetry.Count) + assert.True(t, ok) + assert.Equal(t, expectedCountMetric, actualCountMetric) + + actualCountMetric2, ok := converted[2].(telemetry.Count) + assert.True(t, ok) + + expectedCountMetric2, ok := expectedMetrics[2].(telemetry.Count) + assert.True(t, ok) + assert.Equal(t, expectedCountMetric2, actualCountMetric2) +} + func Test_sender_SenderMetric_cumulative_CountCalculator(t *testing.T) { cannedDuration, _ := time.ParseDuration("1m7s") cannedDurationInt := int64(cannedDuration.Seconds() * 1000) diff --git a/pkg/integrations/v4/dm/telemetry.go b/pkg/integrations/v4/dm/telemetry.go index df1a41660..a8df3fc02 100644 --- a/pkg/integrations/v4/dm/telemetry.go +++ b/pkg/integrations/v4/dm/telemetry.go @@ -4,7 +4,9 @@ package dm import ( "errors" + "fmt" "io" + "math" "net/http" "time" @@ -68,17 +70,39 @@ func telemetryHarvesterWithMetricApiUrl(metricApiUrl string) func(*telemetry.Con } type Conversion struct { - toTelemetry Converter + toTelemetry Converter + toMultipleTelemetry DerivingConvertor } -func (c *Conversion) convert(metric protocol.Metric) (telemetry.Metric, error) { - return c.toTelemetry.from(metric) +func (c *Conversion) convert(metric protocol.Metric) ([]telemetry.Metric, error) { + var result []telemetry.Metric + if c.toTelemetry != nil { + converted, err := c.toTelemetry.from(metric) + if err != nil { + return nil, err + } + result = append(result, converted) + } + + if c.toMultipleTelemetry != nil { + derivedMetrics, err := c.toMultipleTelemetry.derivedFrom(metric) + if err != nil { + return nil, err + } + result = append(result, derivedMetrics...) + } + return result, nil } type Converter interface { from(metric protocol.Metric) (telemetry.Metric, error) } +type DerivingConvertor interface { + // derivedFrom is used when a metric is transformed into multiple telemetry metrics. + derivedFrom(metric protocol.Metric) ([]telemetry.Metric, error) +} + type Count struct { calculate *Cumulative } @@ -130,7 +154,8 @@ func (g *Gauge) shouldCalculate() bool { return g.calculate != nil } -type Summary struct{} +type Summary struct { +} func (Summary) from(metric protocol.Metric) (telemetry.Metric, error) { value, err := metric.SummaryValue() @@ -151,6 +176,102 @@ func (Summary) from(metric protocol.Metric) (telemetry.Metric, error) { }, nil } +type PrometheusHistogram struct { + calculate *Cumulative +} + +func (ph PrometheusHistogram) derivedFrom(metric protocol.Metric) ([]telemetry.Metric, error) { + var result []telemetry.Metric + value, err := metric.GetPrometheusHistogramValue() + + if err != nil { + return nil, err + } + + if ph.calculate != nil { + metricName := metric.Name + "_sum" + sumCount, ok := ph.calculate.get(metricName, metric.Attributes, float64(*value.SampleCount), metric.Time()) + if ok { + result = append(result, telemetry.Summary{ + Name: metricName, + Attributes: metric.Attributes, + Count: 1, + Sum: sumCount.Value, + Min: math.NaN(), + Max: math.NaN(), + Timestamp: metric.Time(), + }) + } else { + telemetryLogger.WithField("name", metricName).WithField("metric-type", metric.Type).Debug(noCalculationMadeErrMsg) + } + + metricName = metric.Name + "_bucket" + for _, b := range value.Buckets { + bucketAttrs := metric.CopyAttrs() + bucketAttrs["le"] = fmt.Sprintf("%g", *b.UpperBound) + + bucketCount, ok := ph.calculate.get( + metricName, + bucketAttrs, + float64(*b.CumulativeCount), + metric.Time(), + ) + if ok { + result = append(result, bucketCount) + } else { + telemetryLogger.WithField("name", metricName).WithField("metric-type", metric.Type).Debug(noCalculationMadeErrMsg) + } + } + } + return result, nil +} + +type PrometheusSummary struct { + calculate *Cumulative +} + +func (p PrometheusSummary) derivedFrom(metric protocol.Metric) ([]telemetry.Metric, error) { + var result []telemetry.Metric + value, err := metric.GetPrometheusSummaryValue() + + if err != nil { + return nil, err + } + + result = append(result, telemetry.Summary{ + Name: metric.Name + "_sum", + Attributes: metric.Attributes, + Count: 1, + Sum: value.SampleSum, + Min: math.NaN(), + Max: math.NaN(), + Timestamp: metric.Time(), + Interval: metric.IntervalDuration(), + }) + + if p.calculate != nil { + metricName := metric.Name + "_count" + countMetric, ok := p.calculate.get(metricName, metric.Attributes, value.SampleCount, metric.Time()) + if ok { + result = append(result, countMetric) + } else { + telemetryLogger.WithField("name", metricName).WithField("metric-type", metric.Type).Debug(noCalculationMadeErrMsg) + } + } + + for _, q := range value.Quantiles { + quantileAttrs := metric.CopyAttrs() + quantileAttrs["quantile"] = fmt.Sprintf("%g", q.Quantile) + result = append(result, telemetry.Gauge{ + Name: metric.Name, + Attributes: quantileAttrs, + Value: q.Value, + Timestamp: metric.Time(), + }) + } + return result, nil +} + type Rate struct { get func(name string, attributes map[string]interface{}, diff --git a/pkg/integrations/v4/protocol/types.go b/pkg/integrations/v4/protocol/types.go index 54b279ea4..f4f21935a 100644 --- a/pkg/integrations/v4/protocol/types.go +++ b/pkg/integrations/v4/protocol/types.go @@ -19,6 +19,9 @@ const ( MetricTypeSummary MetricType = "summary" MetricTypeGauge MetricType = "gauge" MetricTypeRate MetricType = "rate" + + MetricTypePrometheusSummary MetricType = "prometheus-summary" + MetricTypePrometheusHistogram MetricType = "prometheus-histogram" ) const millisSinceJanuaryFirst1978 = 252489600000 @@ -73,6 +76,33 @@ type SummaryValue struct { Sum float64 `json:"sum"` } +// PrometheusHistogram represents a Prometheus histogram +type PrometheusHistogramValue struct { + SampleCount *uint64 `json:"sample_count,omitempty"` + SampleSum *float64 `json:"sample_sum,omitempty"` + // Buckets defines the buckets into which observations are counted. Each + // element in the slice is the upper inclusive bound of a bucket. The + // values must are sorted in strictly increasing order. + Buckets []*bucket `json:"buckets,omitempty"` +} + +type bucket struct { + CumulativeCount *float64 `json:"cumulative_count,omitempty"` + UpperBound *float64 `json:"upper_bound,omitempty"` +} + +// PrometheusSummary represents a Prometheus summary +type PrometheusSummaryValue struct { + SampleCount float64 `json:"sample_count,omitempty"` + SampleSum float64 `json:"sample_sum,omitempty"` + Quantiles []quantile `json:"quantiles,omitempty"` +} + +type quantile struct { + Quantile float64 `json:"quantile,omitempty"` + Value float64 `json:"value,omitempty"` +} + // PluginDataV1 supports a single data set for a single entity type PluginDataV1 struct { PluginOutputIdentifier @@ -272,3 +302,34 @@ func (m *Metric) SummaryValue() (SummaryValue, error) { return SummaryValue{}, fmt.Errorf("metric type %v is not summary", m.Type) } + +func (m *Metric) GetPrometheusSummaryValue() (PrometheusSummaryValue, error) { + if m.Type == MetricTypePrometheusSummary { + var value PrometheusSummaryValue + err := json.Unmarshal(m.Value, &value) + + return value, err + } + + return PrometheusSummaryValue{}, fmt.Errorf("metric type %v is not prometheus-summary", m.Type) +} + +func (m *Metric) GetPrometheusHistogramValue() (PrometheusHistogramValue, error) { + if m.Type == MetricTypePrometheusHistogram { + var value PrometheusHistogramValue + err := json.Unmarshal(m.Value, &value) + + return value, err + } + + return PrometheusHistogramValue{}, fmt.Errorf("metric type %v is not prometheus-histogram", m.Type) +} + +// CopyAttrs returns a (shallow) copy of the passed attrs. +func (m *Metric) CopyAttrs() map[string]interface{} { + duplicate := make(map[string]interface{}, len(m.Attributes)) + for k, v := range m.Attributes { + duplicate[k] = v + } + return duplicate +}