Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions pkg/translator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,16 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
return s
}

// validateMetrics returns a bool representing whether the metric has a valid type and temporality combination and a
// matching metric type and field
func validateMetrics(metric pmetric.Metric) bool {
// isValidAggregationTemporality checks whether an OTel metric has a valid
// aggregation temporality for conversion to a Prometheus metric.
func isValidAggregationTemporality(metric pmetric.Metric) bool {
switch metric.Type() {
case pmetric.MetricTypeGauge:
return metric.Gauge().DataPoints().Len() != 0
case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
return true
case pmetric.MetricTypeSum:
return metric.Sum().DataPoints().Len() != 0 && metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
return metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
case pmetric.MetricTypeHistogram:
return metric.Histogram().DataPoints().Len() != 0 && metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
case pmetric.MetricTypeSummary:
return metric.Summary().DataPoints().Len() != 0
return metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
}
return false
}
Expand Down
84 changes: 52 additions & 32 deletions pkg/translator/prometheusremotewrite/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,44 +29,64 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
)

// Test_validateMetrics checks validateMetrics return true if a type and temporality combination is valid, false
// otherwise.
func Test_validateMetrics(t *testing.T) {
func Test_isValidAggregationTemporality(t *testing.T) {
l := pcommon.NewMap()

// define a single test
type combTest struct {
tests := []struct {
name string
metric pmetric.Metric
want bool
}{
{
name: "summary",
metric: func() pmetric.Metric {
quantiles := pmetric.NewSummaryDataPointValueAtQuantileSlice()
quantiles.AppendEmpty().SetValue(1)
return getSummaryMetric("", l, 0, 0, 0, quantiles)
}(),
want: true,
},
{
name: "gauge",
metric: getIntGaugeMetric("", l, 0, 0),
want: true,
},
{
name: "cumulative sum",
metric: getIntSumMetric("", l, pmetric.AggregationTemporalityCumulative, 0, 0),
want: true,
},
{
name: "cumulative histogram",
metric: getHistogramMetric(
"", l, pmetric.AggregationTemporalityCumulative, 0, 0, 0, []float64{}, []uint64{}),
want: true,
},
{
name: "missing type",
metric: pmetric.NewMetric(),
want: false,
},
{
name: "unspecified sum temporality",
metric: getIntSumMetric("", l, pmetric.AggregationTemporalityUnspecified, 0, 0),
want: false,
},
{
name: "delta sum",
metric: getIntSumMetric("", l, pmetric.AggregationTemporalityDelta, 0, 0),
want: false,
},
{
name: "delta histogram",
metric: getHistogramMetric(
"", l, pmetric.AggregationTemporalityDelta, 0, 0, 0, []float64{}, []uint64{}),
want: false,
},
}

var tests []combTest

// append true cases
for k, validMetric := range validMetrics1 {
name := "valid_" + k

tests = append(tests, combTest{
name,
validMetric,
true,
})
}

for k, invalidMetric := range invalidMetrics {
name := "invalid_" + k

tests = append(tests, combTest{
name,
invalidMetric,
false,
})
}

// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := validateMetrics(tt.metric)
got := isValidAggregationTemporality(tt.metric)
assert.Equal(t, tt.want, got)
})
}
Expand Down Expand Up @@ -163,7 +183,7 @@ func Test_timeSeriesSignature(t *testing.T) {
validMetrics1[validHistogram],
validMetrics1[validHistogram].Type().String() + lb2Sig,
},
// descriptor type cannot be nil, as checked by validateMetrics
// descriptor type cannot be nil, as checked by validateAggregationTemporality
{
"nil_case",
nil,
Expand Down
3 changes: 1 addition & 2 deletions pkg/translator/prometheusremotewrite/metrics_to_prw.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp
metric := metricSlice.At(k)
mostRecentTimestamp = maxTimestamp(mostRecentTimestamp, mostRecentTimestampInMetric(metric))

// check for valid type and temporality combination and for matching data field and type
if ok := validateMetrics(metric); !ok {
if !isValidAggregationTemporality(metric) {
errs = multierr.Append(errs, errors.New("invalid temporality and type combination"))
continue
}
Expand Down
142 changes: 28 additions & 114 deletions pkg/translator/prometheusremotewrite/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,53 +106,24 @@ var (
lb1Sig: getTimeSeries(getPromLabels(label11, value11, label12, value12),
nil...),
}
bounds = []float64{0.1, 0.5, 0.99}
buckets = []uint64{1, 2, 3}

quantileBounds = []float64{0.15, 0.9, 0.99}
quantileValues = []float64{7, 8, 9}
quantiles = getQuantiles(quantileBounds, quantileValues)

validIntGauge = "valid_IntGauge"
validDoubleGauge = "valid_DoubleGauge"
validIntSum = "valid_IntSum"
validSum = "valid_Sum"
validHistogram = "valid_Histogram"
validSummary = "valid_Summary"
suffixedCounter = "valid_IntSum_total"

// valid metrics as input should not return error
validMetrics1 = map[string]pmetric.Metric{
validIntGauge: getIntGaugeMetric(validIntGauge, lbs1, intVal1, time1),
validDoubleGauge: getDoubleGaugeMetric(validDoubleGauge, lbs1, floatVal1, time1),
validIntSum: getIntSumMetric(validIntSum, lbs1, intVal1, time1),
suffixedCounter: getIntSumMetric(suffixedCounter, lbs1, intVal1, time1),
validSum: getSumMetric(validSum, lbs1, floatVal1, time1),
validHistogram: getHistogramMetric(validHistogram, lbs1, time1, floatVal1, uint64(intVal1), bounds, buckets),
validSummary: getSummaryMetric(validSummary, lbs1, time1, floatVal1, uint64(intVal1), quantiles),
}

empty = "empty"

// Category 1: type and data field doesn't match
emptyGauge = "emptyGauge"
emptySum = "emptySum"
emptyHistogram = "emptyHistogram"
emptySummary = "emptySummary"

// Category 2: invalid type and temporality combination
emptyCumulativeSum = "emptyCumulativeSum"
emptyCumulativeHistogram = "emptyCumulativeHistogram"

// different metrics that will not pass validate metrics and will cause the exporter to return an error
invalidMetrics = map[string]pmetric.Metric{
empty: pmetric.NewMetric(),
emptyGauge: getEmptyGaugeMetric(emptyGauge),
emptySum: getEmptySumMetric(emptySum),
emptyHistogram: getEmptyHistogramMetric(emptyHistogram),
emptySummary: getEmptySummaryMetric(emptySummary),
emptyCumulativeSum: getEmptyCumulativeSumMetric(emptyCumulativeSum),
emptyCumulativeHistogram: getEmptyCumulativeHistogramMetric(emptyCumulativeHistogram),
validHistogram: getHistogramMetric(
validHistogram,
lbs1,
pmetric.AggregationTemporalityCumulative,
time1,
floatVal1,
uint64(intVal1),
[]float64{0.1, 0.5, 0.99}, []uint64{1, 2, 3},
),
}
)

Expand Down Expand Up @@ -240,26 +211,6 @@ func getHistogramDataPointWithExemplars(t *testing.T, time time.Time, value floa
return h
}

func getQuantiles(bounds []float64, values []float64) pmetric.SummaryDataPointValueAtQuantileSlice {
quantiles := pmetric.NewSummaryDataPointValueAtQuantileSlice()
quantiles.EnsureCapacity(len(bounds))

for i := 0; i < len(bounds); i++ {
quantile := quantiles.AppendEmpty()
quantile.SetQuantile(bounds[i])
quantile.SetValue(values[i])
}

return quantiles
}

func getEmptyGaugeMetric(name string) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptyGauge()
return metric
}

func getIntGaugeMetric(name string, attributes pcommon.Map, value int64, ts uint64) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
Expand Down Expand Up @@ -290,17 +241,16 @@ func getDoubleGaugeMetric(name string, attributes pcommon.Map, value float64, ts
return metric
}

func getEmptySumMetric(name string) pmetric.Metric {
func getIntSumMetric(
name string,
attributes pcommon.Map,
temporality pmetric.AggregationTemporality,
value int64,
ts uint64,
) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptySum()
return metric
}

func getIntSumMetric(name string, attributes pcommon.Map, value int64, ts uint64) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.SetEmptySum().SetAggregationTemporality(temporality)
dp := metric.Sum().DataPoints().AppendEmpty()
if strings.HasPrefix(name, "staleNaN") {
dp.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
Expand All @@ -313,48 +263,19 @@ func getIntSumMetric(name string, attributes pcommon.Map, value int64, ts uint64
return metric
}

func getEmptyCumulativeSumMetric(name string) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
return metric
}

func getSumMetric(name string, attributes pcommon.Map, value float64, ts uint64) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
dp := metric.Sum().DataPoints().AppendEmpty()
if strings.HasPrefix(name, "staleNaN") {
dp.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
}
dp.SetDoubleValue(value)
attributes.CopyTo(dp.Attributes())

dp.SetStartTimestamp(pcommon.Timestamp(0))
dp.SetTimestamp(pcommon.Timestamp(ts))
return metric
}

func getEmptyHistogramMetric(name string) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptyHistogram()
return metric
}

func getEmptyCumulativeHistogramMetric(name string) pmetric.Metric {
func getHistogramMetric(
name string,
attributes pcommon.Map,
temporality pmetric.AggregationTemporality,
ts uint64,
sum float64,
count uint64,
bounds []float64,
buckets []uint64,
) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
return metric
}

func getHistogramMetric(name string, attributes pcommon.Map, ts uint64, sum float64, count uint64, bounds []float64,
buckets []uint64) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.SetEmptyHistogram().SetAggregationTemporality(temporality)
dp := metric.Histogram().DataPoints().AppendEmpty()
if strings.HasPrefix(name, "staleNaN") {
dp.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
Expand All @@ -369,13 +290,6 @@ func getHistogramMetric(name string, attributes pcommon.Map, ts uint64, sum floa
return metric
}

func getEmptySummaryMetric(name string) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetEmptySummary()
return metric
}

func getSummaryMetric(name string, attributes pcommon.Map, ts uint64, sum float64, count uint64, quantiles pmetric.SummaryDataPointValueAtQuantileSlice) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
Expand Down