diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go index 71bf51fc1735c..2df273cd2943f 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -15,12 +15,14 @@ package internal import ( + "fmt" "sort" "strings" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" ) // MetricFamilyPdata is unit which is corresponding to the metrics items which shared the same TYPE/UNIT/... metadata from @@ -51,7 +53,7 @@ type metricGroupPdata struct { family *metricFamilyPdata } -func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamilyPdata { +func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamilyPdata { familyName := normalizeMetricName(metricName) // lookup metadata based on familyName @@ -67,6 +69,13 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTime metadata.Metric = familyName metadata.Type = textparse.MetricTypeUnknown } + } else if !ok && isInternalMetric(metricName) { + metadata = defineInternalMetric(metricName, metadata, logger) + } + + mtype := convToPdataMetricType(metadata.Type) + if mtype == pdata.MetricDataTypeNone { + logger.Debug(fmt.Sprintf("Invalid metric : %s %+v", metricName, metadata)) } return &metricFamilyPdata{ @@ -85,6 +94,12 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTime } } +func (mf *metricFamilyPdata) IsSameFamily(metricName string) bool { + // trim known suffix if necessary + familyName := normalizeMetricName(metricName) + return mf.name == familyName || familyName != metricName && mf.name == metricName +} + // updateLabelKeys is used to store all the label keys of a same metric family in observed order. since prometheus // receiver removes any label with empty value before feeding it to an appender, in order to figure out all the labels // from the same metric family we will need to keep track of what labels have ever been observed. @@ -109,6 +124,12 @@ func (mf *metricFamilyPdata) getGroupKey(ls labels.Labels) string { return dpgSignature(mf.labelKeysOrdered, ls) } +func (mg *metricGroupPdata) sortPoints() { + sort.Slice(mg.complexValue, func(i, j int) bool { + return mg.complexValue[i].boundary < mg.complexValue[j].boundary + }) +} + func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest *pdata.HistogramDataPointSlice) bool { if !mg.hasCount || len(mg.complexValue) == 0 { return false @@ -273,6 +294,9 @@ func (mf *metricFamilyPdata) getGroups() []*metricGroupPdata { func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int) { metric := pdata.NewMetric() + metric.SetDataType(mf.mtype) + metric.SetName(mf.name) + pointCount := 0 switch mf.mtype { @@ -307,6 +331,8 @@ func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int pointCount = sdpL.Len() default: + // Everything else should be a gauge. + metric.SetDataType(pdata.MetricDataTypeGauge) gauge := metric.Gauge() gdpL := gauge.DataPoints() for _, mg := range mf.getGroups() { diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go index 33592c6717521..5567ad8bf81e8 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go @@ -94,7 +94,7 @@ func TestIsCumulativeEquivalence(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily) - mfp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) + mfp := newMetricFamilyPdata(tt.name, mc, testLogger, 1).(*metricFamilyPdata) assert.Equal(t, mf.isCumulativeType(), mfp.isCumulativeTypePdata(), "mismatch in isCumulative") assert.Equal(t, mf.isCumulativeType(), tt.want, "isCumulative does not match for regular metricFamily") assert.Equal(t, mfp.isCumulativeTypePdata(), tt.want, "isCumulative does not match for pdata metricFamily") @@ -145,7 +145,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.metricName, mc, tt.intervalStartTimeMs).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.metricName, mc, testLogger, tt.intervalStartTimeMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) } @@ -191,7 +191,7 @@ func TestMetricGroupData_toDistributionPointEquivalence(t *testing.T) { intervalStartTimeMs := int64(i + 1) t.Run(tt.name, func(t *testing.T) { mf := newMetricFamily(tt.name, mc, zap.NewNop(), intervalStartTimeMs).(*metricFamily) - mp := newMetricFamilyPdata(tt.name, mc, intervalStartTimeMs).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.name, mc, testLogger, intervalStartTimeMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) @@ -354,7 +354,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.name, mc, testLogger, 1).(*metricFamilyPdata) for _, lbs := range tt.labelsScrapes { for _, scrape := range lbs.scrapes { require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value)) @@ -405,7 +405,7 @@ func TestMetricGroupData_toSummaryPointEquivalence(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily) - mp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.name, mc, testLogger, 1).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) @@ -507,7 +507,7 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.metricKind, mc, tt.intervalStartTimestampMs).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.metricKind, mc, testLogger, tt.intervalStartTimestampMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) } @@ -553,7 +553,7 @@ func TestMetricGroupData_toNumberDataPointEquivalence(t *testing.T) { intervalStartTimeMs := int64(11 + i) t.Run(tt.name, func(t *testing.T) { mf := newMetricFamily(tt.name, mc, zap.NewNop(), intervalStartTimeMs).(*metricFamily) - mp := newMetricFamilyPdata(tt.name, mc, intervalStartTimeMs).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.name, mc, testLogger, intervalStartTimeMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) diff --git a/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go index 9aa519ffd26b3..36246a099882b 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go +++ b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go @@ -163,9 +163,9 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64) ts, dts := b.currentMf.ToMetricPdata(&b.metrics) b.numTimeseries += ts b.droppedTimeseries += dts - b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs) + b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.logger, b.intervalStartTimeMs) } else if b.currentMf == nil { - b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs) + b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.logger, b.intervalStartTimeMs) } return b.currentMf.Add(metricName, ls, t, v)