diff --git a/processor/metricstarttimeprocessor/go.mod b/processor/metricstarttimeprocessor/go.mod index 3bb33954739f7..d86b4dcd9099c 100644 --- a/processor/metricstarttimeprocessor/go.mod +++ b/processor/metricstarttimeprocessor/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/metri go 1.23.0 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.129.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.129.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v1.35.1-0.20250703115036-26a1aed9c04b @@ -66,3 +67,7 @@ require ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/processor/metricstarttimeprocessor/internal/subtractinitial/adjuster.go b/processor/metricstarttimeprocessor/internal/subtractinitial/adjuster.go index 2bdfeb0abdfe4..c904262dd15c1 100644 --- a/processor/metricstarttimeprocessor/internal/subtractinitial/adjuster.go +++ b/processor/metricstarttimeprocessor/internal/subtractinitial/adjuster.go @@ -10,7 +10,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/datapointstorage" @@ -30,7 +29,7 @@ type Adjuster struct { // timeseries. Subsequent points are normalized against this point. referenceCache *datapointstorage.Cache // previousValueCache to store the previous value of each - // timeseries for reset detection. + // timeseries provided to the adjuster for reset detection. previousValueCache *datapointstorage.Cache set component.TelemetrySettings } @@ -58,16 +57,8 @@ func NewAdjuster(set component.TelemetrySettings, gcInterval time.Duration) *Adj // updated. The function returns a new pmetric.Metrics containing the adjusted // metrics. func (a *Adjuster) AdjustMetrics(_ context.Context, metrics pmetric.Metrics) (pmetric.Metrics, error) { - // Create a copy of metrics to store the results. - resultMetrics := pmetric.NewMetrics() for i := 0; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) - - // Copy over resource info to the result. - resResource := resultMetrics.ResourceMetrics().AppendEmpty() - resResource.SetSchemaUrl(rm.SchemaUrl()) - rm.Resource().CopyTo(resResource.Resource()) - attrHash := pdatautil.MapHash(rm.Resource().Attributes()) referenceTsm, _ := a.referenceCache.Get(attrHash) previousValueTsm, _ := a.previousValueCache.Get(attrHash) @@ -78,44 +69,20 @@ func (a *Adjuster) AdjustMetrics(_ context.Context, metrics pmetric.Metrics) (pm previousValueTsm.Lock() for j := 0; j < rm.ScopeMetrics().Len(); j++ { ilm := rm.ScopeMetrics().At(j) - - // Copy over scope info to the result. - resScope := resResource.ScopeMetrics().AppendEmpty() - resScope.SetSchemaUrl(ilm.SchemaUrl()) - ilm.Scope().CopyTo(resScope.Scope()) - for k := range ilm.Metrics().Len() { metric := ilm.Metrics().At(k) - - // Copy over metric info to the result. - resMetric := resScope.Metrics().AppendEmpty() - resMetric.SetName(metric.Name()) - resMetric.SetDescription(metric.Description()) - resMetric.SetUnit(metric.Unit()) - metric.Metadata().CopyTo(resMetric.Metadata()) - switch dataType := metric.Type(); dataType { - case pmetric.MetricTypeGauge: - // gauges don't need to be adjusted so no additional processing is necessary - metric.CopyTo(resMetric) - case pmetric.MetricTypeHistogram: - adjustMetricHistogram(referenceTsm, previousValueTsm, metric, resMetric.SetEmptyHistogram()) + adjustMetricHistogram(referenceTsm, previousValueTsm, metric) case pmetric.MetricTypeSummary: - adjustMetricSummary(referenceTsm, previousValueTsm, metric, resMetric.SetEmptySummary()) + adjustMetricSummary(referenceTsm, previousValueTsm, metric) case pmetric.MetricTypeSum: - adjustMetricSum(referenceTsm, previousValueTsm, metric, resMetric.SetEmptySum()) + adjustMetricSum(referenceTsm, previousValueTsm, metric) case pmetric.MetricTypeExponentialHistogram: - adjustMetricExponentialHistogram(referenceTsm, previousValueTsm, metric, resMetric.SetEmptyExponentialHistogram()) - - case pmetric.MetricTypeEmpty: - fallthrough - - default: - a.set.Logger.Error("Adjust - skipping unexpected point", zap.String("type", dataType.String())) + adjustMetricExponentialHistogram(referenceTsm, previousValueTsm, metric) } } } @@ -123,258 +90,205 @@ func (a *Adjuster) AdjustMetrics(_ context.Context, metrics pmetric.Metrics) (pm previousValueTsm.Unlock() } - return resultMetrics, nil + return metrics, nil } -func adjustMetricHistogram(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, current pmetric.Metric, resHistogram pmetric.Histogram) { - resHistogram.SetAggregationTemporality(current.Histogram().AggregationTemporality()) - - histogram := current.Histogram() +func adjustMetricHistogram(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, metric pmetric.Metric) { + histogram := metric.Histogram() if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { // Only dealing with CumulativeDistributions. - histogram.CopyTo(resHistogram) return } - currentPoints := histogram.DataPoints() - for i := 0; i < currentPoints.Len(); i++ { - currentDist := currentPoints.At(i) + histogram.DataPoints().RemoveIf(func(currentDist pmetric.HistogramDataPoint) bool { pointStartTime := currentDist.StartTimestamp() if pointStartTime != 0 && pointStartTime != currentDist.Timestamp() { // Report point as is. - currentDist.CopyTo(resHistogram.DataPoints().AppendEmpty()) - continue + return false } - referenceTsi, found := referenceTsm.Get(current, currentDist.Attributes()) + referenceTsi, found := referenceTsm.Get(metric, currentDist.Attributes()) + // previousTsi always exists when reference Tsi is found + previousTsi, _ := previousValueTsm.Get(metric, currentDist.Attributes()) if !found { // First time we see this point. Skip it and use as a reference point for the next points. referenceTsi.Histogram = pmetric.NewHistogramDataPoint() - currentDist.CopyTo(referenceTsi.Histogram) - continue + minimalHistogramCopyTo(currentDist, referenceTsi.Histogram) + previousTsi.Histogram = pmetric.NewHistogramDataPoint() + minimalHistogramCopyTo(currentDist, previousTsi.Histogram) + return true } // Adjust the datapoint based on the reference value. - adjustedPoint := pmetric.NewHistogramDataPoint() - currentDist.CopyTo(adjustedPoint) - adjustedPoint.SetStartTimestamp(referenceTsi.Histogram.StartTimestamp()) - if adjustedPoint.Flags().NoRecordedValue() { - adjustedPoint.CopyTo(resHistogram.DataPoints().AppendEmpty()) - continue + currentDist.SetStartTimestamp(referenceTsi.Histogram.StartTimestamp()) + if currentDist.Flags().NoRecordedValue() { + return false } - isReset := datapointstorage.IsResetHistogram(adjustedPoint, referenceTsi.Histogram) - subtractHistogramDataPoint(adjustedPoint, referenceTsi.Histogram) - previousTsi, found := previousValueTsm.Get(current, currentDist.Attributes()) - if isReset || (found && datapointstorage.IsResetHistogram(adjustedPoint, previousTsi.Histogram)) { + if datapointstorage.IsResetHistogram(currentDist, previousTsi.Histogram) { // reset re-initialize everything and use the non adjusted points start time. - resetStartTimeStamp := pcommon.NewTimestampFromTime(currentDist.StartTimestamp().AsTime().Add(-1 * time.Millisecond)) + resetStartTimeStamp := pcommon.NewTimestampFromTime(pointStartTime.AsTime().Add(-1 * time.Millisecond)) currentDist.SetStartTimestamp(resetStartTimeStamp) - // Update the reference value with the current point. + // Update the reference value with the metric point. referenceTsi.Histogram = pmetric.NewHistogramDataPoint() previousTsi.Histogram = pmetric.NewHistogramDataPoint() - referenceTsi.Histogram.SetStartTimestamp(currentDist.StartTimestamp()) + referenceTsi.Histogram.SetStartTimestamp(resetStartTimeStamp) currentDist.ExplicitBounds().CopyTo(referenceTsi.Histogram.ExplicitBounds()) referenceTsi.Histogram.BucketCounts().FromRaw(make([]uint64, currentDist.BucketCounts().Len())) - - currentDist.CopyTo(resHistogram.DataPoints().AppendEmpty()) - currentDist.CopyTo(previousTsi.Histogram) - continue - } else if !found { - // First point after the reference. Not a reset. - previousTsi.Histogram = pmetric.NewHistogramDataPoint() + minimalHistogramCopyTo(currentDist, previousTsi.Histogram) + } else { + minimalHistogramCopyTo(currentDist, previousTsi.Histogram) + subtractHistogramDataPoint(currentDist, referenceTsi.Histogram) } - - // Update previous values with the current point. - adjustedPoint.CopyTo(previousTsi.Histogram) - adjustedPoint.CopyTo(resHistogram.DataPoints().AppendEmpty()) - } + return false + }) } -func adjustMetricExponentialHistogram(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, current pmetric.Metric, resExpHistogram pmetric.ExponentialHistogram) { - resExpHistogram.SetAggregationTemporality(current.ExponentialHistogram().AggregationTemporality()) - - histogram := current.ExponentialHistogram() +func adjustMetricExponentialHistogram(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, metric pmetric.Metric) { + histogram := metric.ExponentialHistogram() if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { // Only dealing with CumulativeDistributions. - histogram.CopyTo(resExpHistogram) return } - currentPoints := histogram.DataPoints() - for i := 0; i < currentPoints.Len(); i++ { - currentDist := currentPoints.At(i) + histogram.DataPoints().RemoveIf(func(currentDist pmetric.ExponentialHistogramDataPoint) bool { pointStartTime := currentDist.StartTimestamp() if pointStartTime != 0 && pointStartTime != currentDist.Timestamp() { // Report point as is. - currentDist.CopyTo(resExpHistogram.DataPoints().AppendEmpty()) - continue + return false } - referenceTsi, found := referenceTsm.Get(current, currentDist.Attributes()) + referenceTsi, found := referenceTsm.Get(metric, currentDist.Attributes()) + // previousTsi always exists when reference Tsi is found + previousTsi, _ := previousValueTsm.Get(metric, currentDist.Attributes()) if !found { // First time we see this point. Skip it and use as a reference point for the next points. referenceTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint() - currentDist.CopyTo(referenceTsi.ExponentialHistogram) - continue + minimalExponentialHistogramCopyTo(currentDist, referenceTsi.ExponentialHistogram) + previousTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint() + minimalExponentialHistogramCopyTo(currentDist, previousTsi.ExponentialHistogram) + return true } // Adjust the datapoint based on the reference value. - adjustedPoint := pmetric.NewExponentialHistogramDataPoint() - currentDist.CopyTo(adjustedPoint) - adjustedPoint.SetStartTimestamp(referenceTsi.ExponentialHistogram.StartTimestamp()) - if adjustedPoint.Flags().NoRecordedValue() { - adjustedPoint.CopyTo(resExpHistogram.DataPoints().AppendEmpty()) - continue + currentDist.SetStartTimestamp(referenceTsi.ExponentialHistogram.StartTimestamp()) + if currentDist.Flags().NoRecordedValue() { + return false } - isReset := datapointstorage.IsResetExponentialHistogram(adjustedPoint, referenceTsi.ExponentialHistogram) - subtractExponentialHistogramDataPoint(adjustedPoint, referenceTsi.ExponentialHistogram) - - previousTsi, found := previousValueTsm.Get(current, currentDist.Attributes()) - if isReset || (found && datapointstorage.IsResetExponentialHistogram(adjustedPoint, previousTsi.ExponentialHistogram)) { + if datapointstorage.IsResetExponentialHistogram(currentDist, previousTsi.ExponentialHistogram) { // reset re-initialize everything and use the non adjusted points start time. - resetStartTimeStamp := pcommon.NewTimestampFromTime(currentDist.StartTimestamp().AsTime().Add(-1 * time.Millisecond)) + resetStartTimeStamp := pcommon.NewTimestampFromTime(pointStartTime.AsTime().Add(-1 * time.Millisecond)) currentDist.SetStartTimestamp(resetStartTimeStamp) referenceTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint() previousTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint() - referenceTsi.ExponentialHistogram.SetStartTimestamp(currentDist.StartTimestamp()) + referenceTsi.ExponentialHistogram.SetStartTimestamp(resetStartTimeStamp) referenceTsi.ExponentialHistogram.SetScale(currentDist.Scale()) referenceTsi.ExponentialHistogram.Positive().BucketCounts().FromRaw(make([]uint64, currentDist.Positive().BucketCounts().Len())) referenceTsi.ExponentialHistogram.Negative().BucketCounts().FromRaw(make([]uint64, currentDist.Negative().BucketCounts().Len())) - - currentDist.CopyTo(resExpHistogram.DataPoints().AppendEmpty()) - currentDist.CopyTo(previousTsi.ExponentialHistogram) - continue - } else if !found { - // First point after the reference. Not a reset. - previousTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint() + minimalExponentialHistogramCopyTo(currentDist, previousTsi.ExponentialHistogram) + } else { + minimalExponentialHistogramCopyTo(currentDist, previousTsi.ExponentialHistogram) + subtractExponentialHistogramDataPoint(currentDist, referenceTsi.ExponentialHistogram) } - - // Update previous values with the current point. - adjustedPoint.CopyTo(previousTsi.ExponentialHistogram) - adjustedPoint.CopyTo(resExpHistogram.DataPoints().AppendEmpty()) - } + return false + }) } -func adjustMetricSum(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, current pmetric.Metric, resSum pmetric.Sum) { - sum := current.Sum() +func adjustMetricSum(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, metric pmetric.Metric) { + sum := metric.Sum() if sum.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { - sum.CopyTo(resSum) + // Only handle cumulative temporality sums return } - resSum.SetAggregationTemporality(sum.AggregationTemporality()) - resSum.SetIsMonotonic(sum.IsMonotonic()) - currentPoints := sum.DataPoints() - for i := 0; i < currentPoints.Len(); i++ { - currentSum := currentPoints.At(i) + sum.DataPoints().RemoveIf(func(currentSum pmetric.NumberDataPoint) bool { pointStartTime := currentSum.StartTimestamp() if pointStartTime != 0 && pointStartTime != currentSum.Timestamp() { // Report point as is. - currentSum.CopyTo(resSum.DataPoints().AppendEmpty()) - continue + return false } - referenceTsi, found := referenceTsm.Get(current, currentSum.Attributes()) + referenceTsi, found := referenceTsm.Get(metric, currentSum.Attributes()) + // previousTsi always exists when reference Tsi is found + previousTsi, _ := previousValueTsm.Get(metric, currentSum.Attributes()) if !found { // First time we see this point. Skip it and use as a reference point for the next points. referenceTsi.Number = pmetric.NewNumberDataPoint() - currentSum.CopyTo(referenceTsi.Number) - continue + minimalSumCopyTo(currentSum, referenceTsi.Number) + previousTsi.Number = pmetric.NewNumberDataPoint() + minimalSumCopyTo(currentSum, previousTsi.Number) + return true } // Adjust the datapoint based on the reference value. - adjustedPoint := pmetric.NewNumberDataPoint() - currentSum.CopyTo(adjustedPoint) - adjustedPoint.SetStartTimestamp(referenceTsi.Number.StartTimestamp()) - if adjustedPoint.Flags().NoRecordedValue() { - adjustedPoint.CopyTo(resSum.DataPoints().AppendEmpty()) - continue + currentSum.SetStartTimestamp(referenceTsi.Number.StartTimestamp()) + if currentSum.Flags().NoRecordedValue() { + return false } - isReset := datapointstorage.IsResetSum(adjustedPoint, referenceTsi.Number) - adjustedPoint.SetDoubleValue(adjustedPoint.DoubleValue() - referenceTsi.Number.DoubleValue()) - previousTsi, found := previousValueTsm.Get(current, currentSum.Attributes()) - if isReset || (found && datapointstorage.IsResetSum(adjustedPoint, previousTsi.Number)) { + if datapointstorage.IsResetSum(currentSum, previousTsi.Number) { // reset re-initialize everything and use the non adjusted points start time. - resetStartTimeStamp := pcommon.NewTimestampFromTime(currentSum.StartTimestamp().AsTime().Add(-1 * time.Millisecond)) + resetStartTimeStamp := pcommon.NewTimestampFromTime(pointStartTime.AsTime().Add(-1 * time.Millisecond)) currentSum.SetStartTimestamp(resetStartTimeStamp) referenceTsi.Number = pmetric.NewNumberDataPoint() previousTsi.Number = pmetric.NewNumberDataPoint() - referenceTsi.Number.SetStartTimestamp(currentSum.StartTimestamp()) - - currentSum.CopyTo(resSum.DataPoints().AppendEmpty()) - currentSum.CopyTo(previousTsi.Number) - continue - } else if !found { - // First point after the reference. Not a reset. - previousTsi.Number = pmetric.NewNumberDataPoint() + referenceTsi.Number.SetStartTimestamp(resetStartTimeStamp) + minimalSumCopyTo(currentSum, previousTsi.Number) + } else { + minimalSumCopyTo(currentSum, previousTsi.Number) + currentSum.SetDoubleValue(currentSum.DoubleValue() - referenceTsi.Number.DoubleValue()) } - - // Update previous values with the current point. - adjustedPoint.CopyTo(previousTsi.Number) - adjustedPoint.CopyTo(resSum.DataPoints().AppendEmpty()) - } + return false + }) } -func adjustMetricSummary(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, current pmetric.Metric, resSummary pmetric.Summary) { - currentPoints := current.Summary().DataPoints() - for i := 0; i < currentPoints.Len(); i++ { - currentSummary := currentPoints.At(i) +func adjustMetricSummary(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, metric pmetric.Metric) { + metric.Summary().DataPoints().RemoveIf(func(currentSummary pmetric.SummaryDataPoint) bool { pointStartTime := currentSummary.StartTimestamp() if pointStartTime != 0 && pointStartTime != currentSummary.Timestamp() { // Report point as is. - currentSummary.CopyTo(resSummary.DataPoints().AppendEmpty()) - continue + return false } - referenceTsi, found := referenceTsm.Get(current, currentSummary.Attributes()) + referenceTsi, found := referenceTsm.Get(metric, currentSummary.Attributes()) + // previousTsi always exists when reference Tsi is found + previousTsi, _ := previousValueTsm.Get(metric, currentSummary.Attributes()) if !found { // First time we see this point. Skip it and use as a reference point for the next points. referenceTsi.Summary = pmetric.NewSummaryDataPoint() - currentSummary.CopyTo(referenceTsi.Summary) - continue + minimalSummaryCopyTo(currentSummary, referenceTsi.Summary) + previousTsi.Summary = pmetric.NewSummaryDataPoint() + minimalSummaryCopyTo(currentSummary, previousTsi.Summary) + return true } // Adjust the datapoint based on the reference value. - adjustedPoint := pmetric.NewSummaryDataPoint() - currentSummary.CopyTo(adjustedPoint) - adjustedPoint.SetStartTimestamp(referenceTsi.Summary.StartTimestamp()) - if adjustedPoint.Flags().NoRecordedValue() { - adjustedPoint.CopyTo(resSummary.DataPoints().AppendEmpty()) - continue + currentSummary.SetStartTimestamp(referenceTsi.Summary.StartTimestamp()) + if currentSummary.Flags().NoRecordedValue() { + return false } - isReset := datapointstorage.IsResetSummary(adjustedPoint, referenceTsi.Summary) - adjustedPoint.SetCount(adjustedPoint.Count() - referenceTsi.Summary.Count()) - adjustedPoint.SetSum(adjustedPoint.Sum() - referenceTsi.Summary.Sum()) - - previousTsi, found := previousValueTsm.Get(current, currentSummary.Attributes()) - if isReset || (found && datapointstorage.IsResetSummary(adjustedPoint, previousTsi.Summary)) { + if datapointstorage.IsResetSummary(currentSummary, previousTsi.Summary) { // reset re-initialize everything and use the non adjusted points start time. - resetStartTimeStamp := pcommon.NewTimestampFromTime(currentSummary.StartTimestamp().AsTime().Add(-1 * time.Millisecond)) + resetStartTimeStamp := pcommon.NewTimestampFromTime(pointStartTime.AsTime().Add(-1 * time.Millisecond)) currentSummary.SetStartTimestamp(resetStartTimeStamp) referenceTsi.Summary = pmetric.NewSummaryDataPoint() previousTsi.Summary = pmetric.NewSummaryDataPoint() - referenceTsi.Summary.SetStartTimestamp(currentSummary.StartTimestamp()) - - currentSummary.CopyTo(resSummary.DataPoints().AppendEmpty()) - currentSummary.CopyTo(previousTsi.Summary) - continue - } else if !found { - // First point after the reference. Not a reset. - previousTsi.Summary = pmetric.NewSummaryDataPoint() + referenceTsi.Summary.SetStartTimestamp(resetStartTimeStamp) + minimalSummaryCopyTo(currentSummary, previousTsi.Summary) + } else { + currentSummary.SetStartTimestamp(referenceTsi.Summary.StartTimestamp()) + minimalSummaryCopyTo(currentSummary, previousTsi.Summary) + currentSummary.SetCount(currentSummary.Count() - referenceTsi.Summary.Count()) + currentSummary.SetSum(currentSummary.Sum() - referenceTsi.Summary.Sum()) } - - // Update previous values with the current point. - adjustedPoint.CopyTo(previousTsi.Summary) - adjustedPoint.CopyTo(resSummary.DataPoints().AppendEmpty()) - } + return false + }) } // subtractHistogramDataPoint subtracts b from a. @@ -425,3 +339,52 @@ func subtractExponentialBuckets(a, b pmetric.ExponentialHistogramDataPointBucket } return newBuckets } + +// minimalHistogramCopyTo is equivalent to a.CopyTo(b) without copying attributes or exemplars +func minimalHistogramCopyTo(a, b pmetric.HistogramDataPoint) { + // Copy attributes and exemplars to temporary structures so they are not copied to b. + tmpAttrs := pcommon.NewMap() + tmpExemplars := pmetric.NewExemplarSlice() + a.Attributes().MoveTo(tmpAttrs) + a.Exemplars().MoveAndAppendTo(tmpExemplars) + a.CopyTo(b) + // Restore attributes and exemplars. + tmpAttrs.MoveTo(a.Attributes()) + tmpExemplars.MoveAndAppendTo(a.Exemplars()) +} + +// minimalExponentialHistogramCopyTo is equivalent to a.CopyTo(b) without copying attributes or exemplars +func minimalExponentialHistogramCopyTo(a, b pmetric.ExponentialHistogramDataPoint) { + // Copy attributes and exemplars to temporary structures so they are not copied to b. + tmpAttrs := pcommon.NewMap() + tmpExemplars := pmetric.NewExemplarSlice() + a.Attributes().MoveTo(tmpAttrs) + a.Exemplars().MoveAndAppendTo(tmpExemplars) + a.CopyTo(b) + // Restore attributes and exemplars. + tmpAttrs.MoveTo(a.Attributes()) + tmpExemplars.MoveAndAppendTo(a.Exemplars()) +} + +// minimalSumCopyTo is equivalent to a.CopyTo(b) without copying attributes or exemplars +func minimalSumCopyTo(a, b pmetric.NumberDataPoint) { + // Copy attributes and exemplars to temporary structures so they are not copied to b. + tmpAttrs := pcommon.NewMap() + tmpExemplars := pmetric.NewExemplarSlice() + a.Attributes().MoveTo(tmpAttrs) + a.Exemplars().MoveAndAppendTo(tmpExemplars) + a.CopyTo(b) + // Restore attributes and exemplars. + tmpAttrs.MoveTo(a.Attributes()) + tmpExemplars.MoveAndAppendTo(a.Exemplars()) +} + +// minimalSummaryCopyTo is equivalent to a.CopyTo(b) without copying attributes or exemplars +func minimalSummaryCopyTo(a, b pmetric.SummaryDataPoint) { + // Copy attributes to a temporary map so they are not copied to b. + tmpAttrs := pcommon.NewMap() + a.Attributes().MoveTo(tmpAttrs) + a.CopyTo(b) + // Restore attributes. + tmpAttrs.MoveTo(a.Attributes()) +} diff --git a/processor/metricstarttimeprocessor/internal/testhelper/util.go b/processor/metricstarttimeprocessor/internal/testhelper/util.go index fb9c5270517eb..c667731f3548f 100644 --- a/processor/metricstarttimeprocessor/internal/testhelper/util.go +++ b/processor/metricstarttimeprocessor/internal/testhelper/util.go @@ -12,6 +12,8 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" semconv "go.opentelemetry.io/otel/semconv/v1.27.0" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) func TimestampFromMs(timeAtMs int64) pcommon.Timestamp { @@ -306,7 +308,7 @@ func RunScript(t *testing.T, ma Adjuster, tests []*MetricsAdjusterTest, addition rm.Resource().Attributes().PutStr(fmt.Sprintf("%d", i), attr) } } - assert.Equal(t, test.Adjusted, adjusted) + assert.NoError(t, pmetrictest.CompareMetrics(test.Adjusted, adjusted)) }) } }