Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for prometheus metrics in protocol v4 #141

Merged
merged 7 commits into from
Oct 13, 2020
5 changes: 4 additions & 1 deletion pkg/integrations/v4/dm/emitter_no_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,8 @@ func (e *nonRegisterEmitter) Send(dto fwrequest.FwRequest) {
}

// TODO error handling
elog.Error(composeEmitError(emitErrs, len(integrationData.DataSets)).Error())
composedError := composeEmitError(emitErrs, len(integrationData.DataSets))
if composedError != nil {
elog.Error(composedError.Error())
}
}
31 changes: 23 additions & 8 deletions pkg/integrations/v4/dm/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (s *sender) SendMetrics(metrics []protocol.Metric) {
c = Conversion{toTelemetry: Gauge{calculate: &Rate{get: s.calculator.rate.GetCumulativeRate}}}
case "cumulative-count":
c = Conversion{toTelemetry: Count{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
case "prometheus-summary":
c = Conversion{toMultipleTelemetry: PrometheusSummary{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
case "prometheus-histogram":
c = Conversion{toMultipleTelemetry: PrometheusHistogram{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
default:
logger.WithField("name", metric.Name).WithField("metric-type", metric.Name).Warn("received an unknown metric type")
continue
Expand All @@ -108,12 +112,23 @@ func (s *sender) SendMetrics(metrics []protocol.Metric) {
continue
}

s.harvester.RecordMetric(recMetric)
for _, m := range recMetric {
s.harvester.RecordMetric(m)
}
}
}

func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Common, metrics []protocol.Metric) error {
dMetrics := s.convertMetrics(metrics)
if len(dMetrics) > 0 {
return s.harvester.RecordInfraMetrics(commonAttributes.Attributes, dMetrics)
}
return nil
}

func (s *sender) convertMetrics(metrics []protocol.Metric) []telemetry.Metric {
var dMetrics []telemetry.Metric

for _, metric := range metrics {

var c Conversion
Expand All @@ -131,12 +146,16 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo
c = Conversion{toTelemetry: Gauge{calculate: &Rate{get: s.calculator.rate.GetCumulativeRate}}}
case "cumulative-count":
c = Conversion{toTelemetry: Count{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
case "prometheus-summary":
c = Conversion{toMultipleTelemetry: PrometheusSummary{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
case "prometheus-histogram":
c = Conversion{toMultipleTelemetry: PrometheusHistogram{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
default:
logger.WithField("name", metric.Name).WithField("metric-type", metric.Name).Warn("received an unknown metric type")
continue
}

recMetric, err := c.convert(metric)
recMetrics, err := c.convert(metric)

if err != nil {
if err != errNoCalculation {
Expand All @@ -145,11 +164,7 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo
}
continue
}
dMetrics = append(dMetrics, recMetric)
dMetrics = append(dMetrics, recMetrics...)
}

if len(dMetrics) > 0 {
return s.harvester.RecordInfraMetrics(commonAttributes.Attributes, dMetrics)
}
return nil
return dMetrics
}
172 changes: 172 additions & 0 deletions pkg/integrations/v4/dm/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package dm

import (
"encoding/json"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/cumulative"
"io/ioutil"
"math"
"os"
"testing"
"time"
Expand Down Expand Up @@ -160,6 +162,176 @@ func Test_sender_SendMetrics(t *testing.T) {
}
}

func Test_sender_SendPrometheusSummary(t *testing.T) {
cannedDuration, _ := time.ParseDuration("1m7s")
cannedDurationInt := int64(cannedDuration.Seconds() * 1000)
cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location())
cannedDateUnix := cannedDate.Unix()

protocolMetrics := []protocol.Metric{
{
Name: "PrometheusSummaryMetric",
Type: "prometheus-summary",
Attributes: map[string]interface{}{"att_key": "att_value"},
Timestamp: &cannedDateUnix,
Interval: &cannedDurationInt,
Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"quantiles\":[{\"quantile\":95,\"value\":3}]}"),
},
}

expectedMetrics := []telemetry.Metric{
telemetry.Summary{
Name: "PrometheusSummaryMetric_sum",
Attributes: map[string]interface{}{"att_key": "att_value"},
Sum: float64(2),
Min: math.NaN(),
Max: math.NaN(),
Count: 1,
Timestamp: cannedDate.Add(time.Second),
Interval: cannedDuration,
},
telemetry.Count{
Name: "PrometheusSummaryMetric_count",
AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\"}"),
Value: float64(0),
Timestamp: cannedDate,
Interval: 1 * time.Second,
},
telemetry.Gauge{
Name: "PrometheusSummaryMetric",
Attributes: map[string]interface{}{"att_key": "att_value", "quantile": "95"},
Value: float64(3),
Timestamp: cannedDate.Add(time.Second),
},
}

s := &sender{
calculator: Calculator{delta: cumulative.NewDeltaCalculator()},
}

_ = s.convertMetrics(protocolMetrics)

// After one second call convert metrics again to check deltas.
cannedDateUnix += 1
converted := s.convertMetrics(protocolMetrics)
assert.Len(t, converted, len(expectedMetrics))

actualSummaryMetric, ok := converted[0].(telemetry.Summary)
assert.True(t, ok)

expectedSummaryMetric, ok := expectedMetrics[0].(telemetry.Summary)
assert.True(t, ok)

assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count)
assert.Equal(t, expectedSummaryMetric.Attributes, actualSummaryMetric.Attributes)
assert.Equal(t, expectedSummaryMetric.Sum, actualSummaryMetric.Sum)
assert.True(t, math.IsNaN(actualSummaryMetric.Min))
assert.True(t, math.IsNaN(actualSummaryMetric.Max))
assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count)
assert.Equal(t, expectedSummaryMetric.Timestamp, actualSummaryMetric.Timestamp)
assert.Equal(t, expectedSummaryMetric.Interval, actualSummaryMetric.Interval)

actualCountMetric, ok := converted[1].(telemetry.Count)
assert.True(t, ok)

expectedCountMetric, ok := expectedMetrics[1].(telemetry.Count)
assert.True(t, ok)
assert.Equal(t, actualCountMetric, expectedCountMetric)

actualGaugeMetric, ok := converted[2].(telemetry.Gauge)
assert.True(t, ok)

expectedGaugeMetric, ok := expectedMetrics[2].(telemetry.Gauge)
assert.True(t, ok)
assert.Equal(t, expectedGaugeMetric, actualGaugeMetric)
assert.Equal(t, expectedGaugeMetric, actualGaugeMetric)
}

func Test_sender_SendPrometheusHistogram(t *testing.T) {
cannedDuration, _ := time.ParseDuration("1m7s")
cannedDurationInt := int64(cannedDuration.Seconds() * 1000)
cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location())
cannedDateUnix := cannedDate.Unix()

protocolMetrics := []protocol.Metric{
{
Name: "PrometheusHistogramMetric",
Type: "prometheus-histogram",
Attributes: map[string]interface{}{"att_key": "att_value"},
Timestamp: &cannedDateUnix,
Interval: &cannedDurationInt,
Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"buckets\":[{\"cumulative_count\":95,\"upper_bound\":3},{\"cumulative_count\":90,\"upper_bound\":4}]}"),
},
}

expectedMetrics := []telemetry.Metric{
telemetry.Summary{
Name: "PrometheusHistogramMetric_sum",
Attributes: map[string]interface{}{"att_key": "att_value"},
Sum: float64(0),
Min: math.NaN(),
Max: math.NaN(),
Count: 1,
Timestamp: cannedDate.Add(time.Second),
},
telemetry.Count{
Name: "PrometheusHistogramMetric_bucket",
AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\",\"le\":\"3\"}"),

Value: float64(0),
Interval: 1 * time.Second,
Timestamp: cannedDate,
},
telemetry.Count{
Name: "PrometheusHistogramMetric_bucket",
AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\",\"le\":\"4\"}"),

Value: float64(0),
Interval: 1 * time.Second,
Timestamp: cannedDate,
},
}

s := &sender{
calculator: Calculator{delta: cumulative.NewDeltaCalculator()},
}

_ = s.convertMetrics(protocolMetrics)

// After one second call convert metrics again to check deltas.
cannedDateUnix += 1
converted := s.convertMetrics(protocolMetrics)
assert.Len(t, converted, len(expectedMetrics))

actualSummaryMetric, ok := converted[0].(telemetry.Summary)
assert.True(t, ok)

expectedSummaryMetric, ok := expectedMetrics[0].(telemetry.Summary)
assert.True(t, ok)

assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count)
assert.Equal(t, expectedSummaryMetric.Attributes, actualSummaryMetric.Attributes)
assert.Equal(t, expectedSummaryMetric.Sum, actualSummaryMetric.Sum)
assert.True(t, math.IsNaN(actualSummaryMetric.Min))
assert.True(t, math.IsNaN(actualSummaryMetric.Max))
assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count)
assert.Equal(t, expectedSummaryMetric.Timestamp, actualSummaryMetric.Timestamp)

actualCountMetric, ok := converted[1].(telemetry.Count)
assert.True(t, ok)

expectedCountMetric, ok := expectedMetrics[1].(telemetry.Count)
assert.True(t, ok)
assert.Equal(t, expectedCountMetric, actualCountMetric)

actualCountMetric2, ok := converted[2].(telemetry.Count)
assert.True(t, ok)

expectedCountMetric2, ok := expectedMetrics[2].(telemetry.Count)
assert.True(t, ok)
assert.Equal(t, expectedCountMetric2, actualCountMetric2)
}

func Test_sender_SenderMetric_cumulative_CountCalculator(t *testing.T) {
cannedDuration, _ := time.ParseDuration("1m7s")
cannedDurationInt := int64(cannedDuration.Seconds() * 1000)
Expand Down
Loading