diff --git a/receiver/prometheusreceiver/internal/metricfamily.go b/receiver/prometheusreceiver/internal/metricfamily.go index b735bb0fcfb5..9898a0b9f498 100644 --- a/receiver/prometheusreceiver/internal/metricfamily.go +++ b/receiver/prometheusreceiver/internal/metricfamily.go @@ -37,18 +37,19 @@ type MetricFamily interface { } type metricFamily struct { - name string - mtype metricspb.MetricDescriptor_Type - mc MetadataCache - droppedTimeseries int - labelKeys map[string]bool - labelKeysOrdered []string - metadata *scrape.MetricMetadata - groupOrders map[string]int - groups map[string]*metricGroup + name string + mtype metricspb.MetricDescriptor_Type + mc MetadataCache + droppedTimeseries int + labelKeys map[string]bool + labelKeysOrdered []string + metadata *scrape.MetricMetadata + groupOrders map[string]int + groups map[string]*metricGroup + intervalStartTimeMs int64 } -func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) MetricFamily { +func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamily { familyName := normalizeMetricName(metricName) // lookup metadata based on familyName @@ -73,15 +74,16 @@ func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) Me } return &metricFamily{ - name: familyName, - mtype: ocaMetricType, - mc: mc, - droppedTimeseries: 0, - labelKeys: make(map[string]bool), - labelKeysOrdered: make([]string, 0), - metadata: &metadata, - groupOrders: make(map[string]int), - groups: make(map[string]*metricGroup), + name: familyName, + mtype: ocaMetricType, + mc: mc, + droppedTimeseries: 0, + labelKeys: make(map[string]bool), + labelKeysOrdered: make([]string, 0), + metadata: &metadata, + groupOrders: make(map[string]int), + groups: make(map[string]*metricGroup), + intervalStartTimeMs: intervalStartTimeMs, } } @@ -164,10 +166,11 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey string, ls labels.Label mg, ok := mf.groups[groupKey] if !ok { mg = &metricGroup{ - family: mf, - ts: ts, - ls: ls, - complexValue: make([]*dataPoint, 0), + family: mf, + ts: ts, + ls: ls, + complexValue: make([]*dataPoint, 0), + intervalStartTimeMs: mf.intervalStartTimeMs, } mf.groups[groupKey] = mg // maintaining data insertion order is helpful to generate stable/reproducible metric output @@ -279,15 +282,16 @@ type dataPoint struct { // a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for // simple types like counter and gauge, each data point is a group of itself type metricGroup struct { - family *metricFamily - ts int64 - ls labels.Labels - count float64 - hasCount bool - sum float64 - hasSum bool - value float64 - complexValue []*dataPoint + family *metricFamily + ts int64 + ls labels.Labels + count float64 + hasCount bool + sum float64 + hasSum bool + value float64 + complexValue []*dataPoint + intervalStartTimeMs int64 } func (mg *metricGroup) sortPoints() { @@ -388,9 +392,7 @@ func (mg *metricGroup) toDoubleValueTimeSeries(orderedLabelKeys []string) *metri var startTs *timestamppb.Timestamp // gauge/undefined types has no start time if mg.family.isCumulativeType() { - // TODO(@odeke-em): use the actual interval start time as reported in - // https://github.com/open-telemetry/opentelemetry-collector/issues/3691 - startTs = timestampFromMs(mg.ts) + startTs = timestampFromMs(mg.intervalStartTimeMs) } return &metricspb.TimeSeries{ diff --git a/receiver/prometheusreceiver/internal/metricsbuilder.go b/receiver/prometheusreceiver/internal/metricsbuilder.go index 7210db9b41d9..600be819dffa 100644 --- a/receiver/prometheusreceiver/internal/metricsbuilder.go +++ b/receiver/prometheusreceiver/internal/metricsbuilder.go @@ -57,6 +57,7 @@ type metricBuilder struct { useStartTimeMetric bool startTimeMetricRegex *regexp.Regexp startTime float64 + intervalStartTimeMs int64 logger *zap.Logger currentMf MetricFamily stalenessStore *stalenessStore @@ -65,7 +66,7 @@ type metricBuilder struct { // newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus // scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object // by calling its Build function -func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilder { +func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore, intervalStartTimeMs int64) *metricBuilder { var regex *regexp.Regexp if startTimeMetricRegex != "" { regex, _ = regexp.Compile(startTimeMetricRegex) @@ -79,6 +80,7 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric useStartTimeMetric: useStartTimeMetric, startTimeMetricRegex: regex, stalenessStore: stalenessStore, + intervalStartTimeMs: intervalStartTimeMs, } } @@ -153,9 +155,9 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr if m != nil { b.metrics = append(b.metrics, m) } - b.currentMf = newMetricFamily(metricName, b.mc, b.logger) + b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs) } else if b.currentMf == nil { - b.currentMf = newMetricFamily(metricName, b.mc, b.logger) + b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs) } return b.currentMf.Add(metricName, ls, t, v) diff --git a/receiver/prometheusreceiver/internal/metricsbuilder_test.go b/receiver/prometheusreceiver/internal/metricsbuilder_test.go index 68bb9b70d390..d398ace689a3 100644 --- a/receiver/prometheusreceiver/internal/metricsbuilder_test.go +++ b/receiver/prometheusreceiver/internal/metricsbuilder_test.go @@ -99,7 +99,7 @@ func runBuilderTests(t *testing.T, tests []buildTestData) { mc := newMockMetadataCache(testMetadata) st := startTs for i, page := range tt.inputs { - b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), startTs) b.startTime = defaultBuilderStartTime // set to a non-zero value for _, pt := range page.pts { // set ts for testing @@ -123,7 +123,7 @@ func runBuilderStartTimeTests(t *testing.T, tests []buildTestData, st := startTs for _, page := range tt.inputs { b := newMetricBuilder(mc, true, startTimeMetricRegex, - testLogger, dummyStalenessStore()) + testLogger, dummyStalenessStore(), 0) b.startTime = defaultBuilderStartTime // set to a non-zero value for _, pt := range page.pts { // set ts for testing @@ -1201,7 +1201,7 @@ func Test_metricBuilder_summary(t *testing.T) { func Test_metricBuilder_baddata(t *testing.T) { t.Run("empty-metric-name", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound { t.Error("expecting errMetricNameNotFound error, but get nil") @@ -1215,7 +1215,7 @@ func Test_metricBuilder_baddata(t *testing.T) { t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel { t.Error("expecting errEmptyBoundaryLabel error, but get nil") @@ -1224,7 +1224,7 @@ func Test_metricBuilder_baddata(t *testing.T) { t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + b := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel { t.Error("expecting errEmptyBoundaryLabel error, but get nil") @@ -1452,7 +1452,7 @@ func Test_heuristicalMetricAndKnownUnits(t *testing.T) { // Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44. func TestMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) { mc := newMockMetadataCache(testMetadata) - mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore()) + mb := newMetricBuilder(mc, true, "", testLogger, dummyStalenessStore(), 0) dupLabels := labels.Labels{ {Name: "__name__", Value: "test"}, diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go index f1ad9fcbb631..dcff1e9ff902 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -44,7 +44,7 @@ type metricGroupPdata struct { family *metricFamilyPdata } -func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily { +func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamily { familyName := normalizeMetricName(metricName) // lookup metadata based on familyName @@ -66,13 +66,14 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache) MetricFamily { mtype: convToPdataMetricType(metadata.Type), groups: make(map[string]*metricGroupPdata), metricFamily: metricFamily{ - name: familyName, - mc: mc, - droppedTimeseries: 0, - labelKeys: make(map[string]bool), - labelKeysOrdered: make([]string, 0), - metadata: &metadata, - groupOrders: make(map[string]int), + name: familyName, + mc: mc, + droppedTimeseries: 0, + labelKeys: make(map[string]bool), + labelKeysOrdered: make([]string, 0), + metadata: &metadata, + groupOrders: make(map[string]int), + intervalStartTimeMs: intervalStartTimeMs, }, } } @@ -177,9 +178,7 @@ func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *p tsNanos := pdata.Timestamp(mg.ts * 1e6) // gauge/undefined types have no start time. if mg.family.isCumulativeTypePdata() { - // TODO(@odeke-em): use the actual interval start time as reported in - // https://github.com/open-telemetry/opentelemetry-collector/issues/3691 - startTsNanos = tsNanos + startTsNanos = pdata.Timestamp(mg.intervalStartTimeMs * 1e6) } point := dest.AppendEmpty() @@ -213,9 +212,10 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels. mg = &metricGroupPdata{ family: mf, metricGroup: metricGroup{ - ts: ts, - ls: ls, - complexValue: make([]*dataPoint, 0), + ts: ts, + ls: ls, + complexValue: make([]*dataPoint, 0), + intervalStartTimeMs: mf.intervalStartTimeMs, }, } mf.groups[groupKey] = mg diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go index f789918c91b0..b5c0d2c0c422 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go @@ -94,8 +94,8 @@ func TestIsCumulativeEquivalence(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily) - mfp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily) + mfp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) assert.Equal(t, mf.isCumulativeType(), mfp.isCumulativeTypePdata(), "mismatch in isCumulative") assert.Equal(t, mf.isCumulativeType(), tt.want, "isCumulative does not match for regular metricFamily") assert.Equal(t, mfp.isCumulativeTypePdata(), tt.want, "isCumulative does not match for pdata metricFamily") @@ -110,14 +110,18 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { metric string } tests := []struct { - name string - labels labels.Labels - scrapes []*scrape - want func() pdata.HistogramDataPoint + name string + metricName string + labels labels.Labels + scrapes []*scrape + want func() pdata.HistogramDataPoint + intervalStartTimeMs int64 }{ { - name: "histogram", - labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}}, + name: "histogram with startTimestamp of 11", + metricName: "histogram", + intervalStartTimeMs: 1717, + labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "le", Value: "0.75"}, {Name: "b", Value: "B"}}, scrapes: []*scrape{ {at: 11, value: 10, metric: "histogram_count"}, {at: 11, value: 1004.78, metric: "histogram_sum"}, @@ -142,7 +146,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.metricName, mc, tt.intervalStartTimeMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) } @@ -183,11 +187,12 @@ func TestMetricGroupData_toDistributionPointEquivalence(t *testing.T) { }, } - for _, tt := range tests { + for i, tt := range tests { tt := tt + intervalStartTimeMs := int64(i + 1) t.Run(tt.name, func(t *testing.T) { - mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily) - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mf := newMetricFamily(tt.name, mc, zap.NewNop(), intervalStartTimeMs).(*metricFamily) + mp := newMetricFamilyPdata(tt.name, mc, intervalStartTimeMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) @@ -350,7 +355,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) for _, lbs := range tt.labelsScrapes { for _, scrape := range lbs.scrapes { require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value)) @@ -400,8 +405,8 @@ func TestMetricGroupData_toSummaryPointEquivalence(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily) - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily) + mp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) @@ -453,22 +458,45 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) { metric string } tests := []struct { - name string - labels labels.Labels - scrapes []*scrape - want func() pdata.NumberDataPoint + name string + metricKind string + labels labels.Labels + scrapes []*scrape + intervalStartTimestampMs int64 + want func() pdata.NumberDataPoint }{ { - name: "counter", - labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "b", Value: "B"}}, + metricKind: "counter", + name: "counter:: startTimestampMs of 11", + intervalStartTimestampMs: 11, + labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "b", Value: "B"}}, scrapes: []*scrape{ - {at: 38, value: 39.9, metric: "value"}, + {at: 13, value: 33.7, metric: "value"}, }, want: func() pdata.NumberDataPoint { point := pdata.NewNumberDataPoint() - point.SetDoubleVal(39.9) - point.SetTimestamp(38 * 1e6) // the time in milliseconds -> nanoseconds. - point.SetStartTimestamp(38 * 1e6) + point.SetDoubleVal(33.7) + point.SetTimestamp(13 * 1e6) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(11 * 1e6) + labelsMap := point.LabelsMap() + labelsMap.Insert("a", "A") + labelsMap.Insert("b", "B") + return point + }, + }, + { + name: "counter:: startTimestampMs of 0", + metricKind: "counter", + intervalStartTimestampMs: 0, + labels: labels.Labels{{Name: "a", Value: "A"}, {Name: "b", Value: "B"}}, + scrapes: []*scrape{ + {at: 28, value: 99.9, metric: "value"}, + }, + want: func() pdata.NumberDataPoint { + point := pdata.NewNumberDataPoint() + point.SetDoubleVal(99.9) + point.SetTimestamp(28 * 1e6) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(0) labelsMap := point.LabelsMap() labelsMap.Insert("a", "A") labelsMap.Insert("b", "B") @@ -480,7 +508,7 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mp := newMetricFamilyPdata(tt.metricKind, mc, tt.intervalStartTimestampMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) } @@ -506,9 +534,10 @@ func TestMetricGroupData_toNumberDataPointEquivalence(t *testing.T) { metric string } tests := []struct { - name string - labels labels.Labels - scrapes []*scrape + name string + labels labels.Labels + scrapes []*scrape + wantValue float64 }{ { name: "counter", @@ -516,14 +545,16 @@ func TestMetricGroupData_toNumberDataPointEquivalence(t *testing.T) { scrapes: []*scrape{ {at: 13, value: 33.7, metric: "value"}, }, + wantValue: 33.7, }, } - for _, tt := range tests { + for i, tt := range tests { tt := tt + intervalStartTimeMs := int64(11 + i) t.Run(tt.name, func(t *testing.T) { - mf := newMetricFamily(tt.name, mc, zap.NewNop()).(*metricFamily) - mp := newMetricFamilyPdata(tt.name, mc).(*metricFamilyPdata) + mf := newMetricFamily(tt.name, mc, zap.NewNop(), intervalStartTimeMs).(*metricFamily) + mp := newMetricFamilyPdata(tt.name, mc, intervalStartTimeMs).(*metricFamilyPdata) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) @@ -537,9 +568,11 @@ func TestMetricGroupData_toNumberDataPointEquivalence(t *testing.T) { ocPoint := ocTimeseries.Points[0] pdataPoint := ddpL.At(0) // 1. Ensure that the startTimestamps are equal. - require.Equal(t, ocTimeseries.GetStartTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "The timestamp must be equal") + require.Equal(t, ocTimeseries.GetStartTimestamp().AsTime(), pdataPoint.StartTimestamp().AsTime(), "The timestamp must be equal") + require.Equal(t, intervalStartTimeMs*1e6, pdataPoint.StartTimestamp().AsTime().UnixNano(), "intervalStartTimeMs must be the same") // 2. Ensure that the value is equal. - require.Equal(t, ocPoint.GetDoubleValue(), pdataPoint.DoubleVal(), "Count must be equal") + require.Equal(t, ocPoint.GetDoubleValue(), pdataPoint.DoubleVal(), "Values must be equal") + require.Equal(t, tt.wantValue, pdataPoint.DoubleVal(), "Values must be equal") // 4. Ensure that the point's timestamp is equal to that from the OpenCensusProto data point. require.Equal(t, ocPoint.GetTimestamp().AsTime(), pdataPoint.Timestamp().AsTime(), "Point timestamps must be equal") // 5. Ensure that the labels all match up. diff --git a/receiver/prometheusreceiver/internal/transaction.go b/receiver/prometheusreceiver/internal/transaction.go index 48cc54796276..90449c0e9cf4 100644 --- a/receiver/prometheusreceiver/internal/transaction.go +++ b/receiver/prometheusreceiver/internal/transaction.go @@ -166,7 +166,7 @@ func (tr *transaction) initTransaction(ls labels.Labels) error { tr.instance = instance } tr.node, tr.resource = createNodeAndResource(job, instance, mc.SharedLabels().Get(model.SchemeLabel)) - tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger, tr.stalenessStore) + tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger, tr.stalenessStore, tr.startTimeMs) tr.isNew = false return nil }