Skip to content

Commit

Permalink
Added support for prometheus metrics in protocol v4 (#141)
Browse files Browse the repository at this point in the history
* feat:added prometheus-histogram and prometheus-summary support in v4
  • Loading branch information
cristianciutea authored Oct 13, 2020
1 parent 4137501 commit 5aec3bb
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 13 deletions.
5 changes: 4 additions & 1 deletion pkg/integrations/v4/dm/emitter_no_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,8 @@ func (e *nonRegisterEmitter) Send(dto fwrequest.FwRequest) {
}

// TODO error handling
elog.Error(composeEmitError(emitErrs, len(integrationData.DataSets)).Error())
composedError := composeEmitError(emitErrs, len(integrationData.DataSets))
if composedError != nil {
elog.Error(composedError.Error())
}
}
31 changes: 23 additions & 8 deletions pkg/integrations/v4/dm/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (s *sender) SendMetrics(metrics []protocol.Metric) {
c = Conversion{toTelemetry: Gauge{calculate: &Rate{get: s.calculator.rate.GetCumulativeRate}}}
case "cumulative-count":
c = Conversion{toTelemetry: Count{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
case "prometheus-summary":
c = Conversion{toMultipleTelemetry: PrometheusSummary{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
case "prometheus-histogram":
c = Conversion{toMultipleTelemetry: PrometheusHistogram{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
default:
logger.WithField("name", metric.Name).WithField("metric-type", metric.Name).Warn("received an unknown metric type")
continue
Expand All @@ -108,12 +112,23 @@ func (s *sender) SendMetrics(metrics []protocol.Metric) {
continue
}

s.harvester.RecordMetric(recMetric)
for _, m := range recMetric {
s.harvester.RecordMetric(m)
}
}
}

func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Common, metrics []protocol.Metric) error {
dMetrics := s.convertMetrics(metrics)
if len(dMetrics) > 0 {
return s.harvester.RecordInfraMetrics(commonAttributes.Attributes, dMetrics)
}
return nil
}

func (s *sender) convertMetrics(metrics []protocol.Metric) []telemetry.Metric {
var dMetrics []telemetry.Metric

for _, metric := range metrics {

var c Conversion
Expand All @@ -131,12 +146,16 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo
c = Conversion{toTelemetry: Gauge{calculate: &Rate{get: s.calculator.rate.GetCumulativeRate}}}
case "cumulative-count":
c = Conversion{toTelemetry: Count{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
case "prometheus-summary":
c = Conversion{toMultipleTelemetry: PrometheusSummary{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
case "prometheus-histogram":
c = Conversion{toMultipleTelemetry: PrometheusHistogram{calculate: &Cumulative{get: s.calculator.delta.CountMetric}}}
default:
logger.WithField("name", metric.Name).WithField("metric-type", metric.Name).Warn("received an unknown metric type")
continue
}

recMetric, err := c.convert(metric)
recMetrics, err := c.convert(metric)

if err != nil {
if err != errNoCalculation {
Expand All @@ -145,11 +164,7 @@ func (s *sender) SendMetricsWithCommonAttributes(commonAttributes protocol.Commo
}
continue
}
dMetrics = append(dMetrics, recMetric)
dMetrics = append(dMetrics, recMetrics...)
}

if len(dMetrics) > 0 {
return s.harvester.RecordInfraMetrics(commonAttributes.Attributes, dMetrics)
}
return nil
return dMetrics
}
172 changes: 172 additions & 0 deletions pkg/integrations/v4/dm/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package dm

import (
"encoding/json"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/cumulative"
"io/ioutil"
"math"
"os"
"testing"
"time"
Expand Down Expand Up @@ -160,6 +162,176 @@ func Test_sender_SendMetrics(t *testing.T) {
}
}

func Test_sender_SendPrometheusSummary(t *testing.T) {
cannedDuration, _ := time.ParseDuration("1m7s")
cannedDurationInt := int64(cannedDuration.Seconds() * 1000)
cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location())
cannedDateUnix := cannedDate.Unix()

protocolMetrics := []protocol.Metric{
{
Name: "PrometheusSummaryMetric",
Type: "prometheus-summary",
Attributes: map[string]interface{}{"att_key": "att_value"},
Timestamp: &cannedDateUnix,
Interval: &cannedDurationInt,
Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"quantiles\":[{\"quantile\":95,\"value\":3}]}"),
},
}

expectedMetrics := []telemetry.Metric{
telemetry.Summary{
Name: "PrometheusSummaryMetric_sum",
Attributes: map[string]interface{}{"att_key": "att_value"},
Sum: float64(2),
Min: math.NaN(),
Max: math.NaN(),
Count: 1,
Timestamp: cannedDate.Add(time.Second),
Interval: cannedDuration,
},
telemetry.Count{
Name: "PrometheusSummaryMetric_count",
AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\"}"),
Value: float64(0),
Timestamp: cannedDate,
Interval: 1 * time.Second,
},
telemetry.Gauge{
Name: "PrometheusSummaryMetric",
Attributes: map[string]interface{}{"att_key": "att_value", "quantile": "95"},
Value: float64(3),
Timestamp: cannedDate.Add(time.Second),
},
}

s := &sender{
calculator: Calculator{delta: cumulative.NewDeltaCalculator()},
}

_ = s.convertMetrics(protocolMetrics)

// After one second call convert metrics again to check deltas.
cannedDateUnix += 1
converted := s.convertMetrics(protocolMetrics)
assert.Len(t, converted, len(expectedMetrics))

actualSummaryMetric, ok := converted[0].(telemetry.Summary)
assert.True(t, ok)

expectedSummaryMetric, ok := expectedMetrics[0].(telemetry.Summary)
assert.True(t, ok)

assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count)
assert.Equal(t, expectedSummaryMetric.Attributes, actualSummaryMetric.Attributes)
assert.Equal(t, expectedSummaryMetric.Sum, actualSummaryMetric.Sum)
assert.True(t, math.IsNaN(actualSummaryMetric.Min))
assert.True(t, math.IsNaN(actualSummaryMetric.Max))
assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count)
assert.Equal(t, expectedSummaryMetric.Timestamp, actualSummaryMetric.Timestamp)
assert.Equal(t, expectedSummaryMetric.Interval, actualSummaryMetric.Interval)

actualCountMetric, ok := converted[1].(telemetry.Count)
assert.True(t, ok)

expectedCountMetric, ok := expectedMetrics[1].(telemetry.Count)
assert.True(t, ok)
assert.Equal(t, actualCountMetric, expectedCountMetric)

actualGaugeMetric, ok := converted[2].(telemetry.Gauge)
assert.True(t, ok)

expectedGaugeMetric, ok := expectedMetrics[2].(telemetry.Gauge)
assert.True(t, ok)
assert.Equal(t, expectedGaugeMetric, actualGaugeMetric)
assert.Equal(t, expectedGaugeMetric, actualGaugeMetric)
}

func Test_sender_SendPrometheusHistogram(t *testing.T) {
cannedDuration, _ := time.ParseDuration("1m7s")
cannedDurationInt := int64(cannedDuration.Seconds() * 1000)
cannedDate := time.Date(1980, time.January, 12, 1, 2, 0, 0, time.Now().Location())
cannedDateUnix := cannedDate.Unix()

protocolMetrics := []protocol.Metric{
{
Name: "PrometheusHistogramMetric",
Type: "prometheus-histogram",
Attributes: map[string]interface{}{"att_key": "att_value"},
Timestamp: &cannedDateUnix,
Interval: &cannedDurationInt,
Value: json.RawMessage("{\"sample_count\":1,\"sample_sum\":2,\"buckets\":[{\"cumulative_count\":95,\"upper_bound\":3},{\"cumulative_count\":90,\"upper_bound\":4}]}"),
},
}

expectedMetrics := []telemetry.Metric{
telemetry.Summary{
Name: "PrometheusHistogramMetric_sum",
Attributes: map[string]interface{}{"att_key": "att_value"},
Sum: float64(0),
Min: math.NaN(),
Max: math.NaN(),
Count: 1,
Timestamp: cannedDate.Add(time.Second),
},
telemetry.Count{
Name: "PrometheusHistogramMetric_bucket",
AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\",\"le\":\"3\"}"),

Value: float64(0),
Interval: 1 * time.Second,
Timestamp: cannedDate,
},
telemetry.Count{
Name: "PrometheusHistogramMetric_bucket",
AttributesJSON: json.RawMessage("{\"att_key\":\"att_value\",\"le\":\"4\"}"),

Value: float64(0),
Interval: 1 * time.Second,
Timestamp: cannedDate,
},
}

s := &sender{
calculator: Calculator{delta: cumulative.NewDeltaCalculator()},
}

_ = s.convertMetrics(protocolMetrics)

// After one second call convert metrics again to check deltas.
cannedDateUnix += 1
converted := s.convertMetrics(protocolMetrics)
assert.Len(t, converted, len(expectedMetrics))

actualSummaryMetric, ok := converted[0].(telemetry.Summary)
assert.True(t, ok)

expectedSummaryMetric, ok := expectedMetrics[0].(telemetry.Summary)
assert.True(t, ok)

assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count)
assert.Equal(t, expectedSummaryMetric.Attributes, actualSummaryMetric.Attributes)
assert.Equal(t, expectedSummaryMetric.Sum, actualSummaryMetric.Sum)
assert.True(t, math.IsNaN(actualSummaryMetric.Min))
assert.True(t, math.IsNaN(actualSummaryMetric.Max))
assert.Equal(t, expectedSummaryMetric.Count, actualSummaryMetric.Count)
assert.Equal(t, expectedSummaryMetric.Timestamp, actualSummaryMetric.Timestamp)

actualCountMetric, ok := converted[1].(telemetry.Count)
assert.True(t, ok)

expectedCountMetric, ok := expectedMetrics[1].(telemetry.Count)
assert.True(t, ok)
assert.Equal(t, expectedCountMetric, actualCountMetric)

actualCountMetric2, ok := converted[2].(telemetry.Count)
assert.True(t, ok)

expectedCountMetric2, ok := expectedMetrics[2].(telemetry.Count)
assert.True(t, ok)
assert.Equal(t, expectedCountMetric2, actualCountMetric2)
}

func Test_sender_SenderMetric_cumulative_CountCalculator(t *testing.T) {
cannedDuration, _ := time.ParseDuration("1m7s")
cannedDurationInt := int64(cannedDuration.Seconds() * 1000)
Expand Down
Loading

0 comments on commit 5aec3bb

Please sign in to comment.