Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewritereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds support to accept METRIC_TYPE_UNSPECIFIED as gauge type

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [41840]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
5 changes: 5 additions & 0 deletions receiver/prometheusremotewritereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.135.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.135.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.132.0
github.com/prometheus/prometheus v0.304.3-0.20250703114031-419d436a447a
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.41.1-0.20250911090542-8b3a08ca2a38
Expand Down Expand Up @@ -153,3 +154,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden =>
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor => ../../processor/deltatocumulativeprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus => ../../pkg/translator/prometheus

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common
11 changes: 9 additions & 2 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/zap/zapcore"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) {
Expand Down Expand Up @@ -347,14 +348,20 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
metric, exists := metricCache[metricKey]
if !exists {
switch ts.Metadata.Type {
case writev2.Metadata_METRIC_TYPE_GAUGE:
case writev2.Metadata_METRIC_TYPE_GAUGE, writev2.Metadata_METRIC_TYPE_UNSPECIFIED:
metric = setMetric(scope, metricName, unit, description)
metric.SetEmptyGauge()
if ts.Metadata.Type == writev2.Metadata_METRIC_TYPE_UNSPECIFIED {
metric.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "unknown")
} else {
metric.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "gauge")
}
case writev2.Metadata_METRIC_TYPE_COUNTER:
metric = setMetric(scope, metricName, unit, description)
sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
metric.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "counter")
case writev2.Metadata_METRIC_TYPE_SUMMARY:
// Drop summary series as we will not handle them.
continue
Expand All @@ -370,7 +377,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
}

switch ts.Metadata.Type {
case writev2.Metadata_METRIC_TYPE_GAUGE:
case writev2.Metadata_METRIC_TYPE_GAUGE, writev2.Metadata_METRIC_TYPE_UNSPECIFIED:
addNumberDatapoints(metric.Gauge().DataPoints(), ls, ts, &stats)
case writev2.Metadata_METRIC_TYPE_COUNTER:
addNumberDatapoints(metric.Sum().DataPoints(), ls, ts, &stats)
Expand Down
80 changes: 72 additions & 8 deletions receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver/internal/metadata"
)

Expand Down Expand Up @@ -256,21 +258,43 @@ func TestTranslateV2(t *testing.T) {
expectError: "help ref 3 is out of bounds of symbolsTable",
},
{
name: "unsupported metric type UNSPECIFIED",
name: "accept unspecified metric type as gauge",
request: &writev2.Request{
Symbols: []string{"", "__name__", "test_metric", "job", "test_job", "instance", "test_instance"},
Timeseries: []writev2.TimeSeries{
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
CreatedTimestamp: 1,
},
},
},
expectError: `unsupported metric type "METRIC_TYPE_UNSPECIFIED" for metric "test_metric"`,
expectedMetrics: func() pmetric.Metrics {
expected := pmetric.NewMetrics()
rm := expected.ResourceMetrics().AppendEmpty()
attrs := rm.Resource().Attributes()
attrs.PutStr("service.name", "test_job")
attrs.PutStr("service.instance.id", "test_instance")

sm := rm.ScopeMetrics().AppendEmpty()
sm.Scope().SetName("OpenTelemetry Collector")
sm.Scope().SetVersion("latest")
metric := sm.Metrics().AppendEmpty()
metric.SetName("test_metric")
metric.SetUnit("")
metric.SetDescription("")
metric.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "unknown")

dp := metric.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
dp.SetDoubleValue(1.0)
dp.SetStartTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
return expected
}(),
expectedStats: remote.WriteResponseStats{
Confirmed: false,
Samples: 0,
Confirmed: true,
Samples: 1,
Histograms: 0,
Exemplars: 0,
},
Expand All @@ -294,6 +318,7 @@ func TestTranslateV2(t *testing.T) {
metrics1.SetName("test_metric1")
metrics1.SetUnit("")
metrics1.SetDescription("")
metrics1.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "gauge")

dp1 := metrics1.SetEmptyGauge().DataPoints().AppendEmpty()
dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
Expand Down Expand Up @@ -321,6 +346,7 @@ func TestTranslateV2(t *testing.T) {
metrics2.SetName("test_metric1")
metrics2.SetUnit("")
metrics2.SetDescription("")
metrics2.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "gauge")

dp3 := metrics2.SetEmptyGauge().DataPoints().AppendEmpty()
dp3.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond)))
Expand Down Expand Up @@ -422,6 +448,7 @@ func TestTranslateV2(t *testing.T) {
metrics1.SetName("test_metric")
metrics1.SetUnit("")
metrics1.SetDescription("")
metrics1.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "gauge")

dp1 := metrics1.SetEmptyGauge().DataPoints().AppendEmpty()
dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
Expand All @@ -438,6 +465,7 @@ func TestTranslateV2(t *testing.T) {
cbneMetric.SetName("test_metric")
cbneMetric.SetUnit("")
cbneMetric.SetDescription("")
// cbneMetric.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "histogram")
hist := cbneMetric.SetEmptyHistogram()
hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

Expand All @@ -455,7 +483,8 @@ func TestTranslateV2(t *testing.T) {
metrics2 := sm2.Metrics().AppendEmpty()
metrics2.SetName("test_metric")
metrics2.SetUnit("")
metrics1.SetDescription("")
metrics2.SetDescription("")
metrics2.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "gauge")

dp3 := metrics2.SetEmptyGauge().DataPoints().AppendEmpty()
dp3.SetTimestamp(pcommon.Timestamp(3 * int64(time.Millisecond)))
Expand All @@ -467,6 +496,7 @@ func TestTranslateV2(t *testing.T) {
expMetric.SetName("test_metric")
expMetric.SetUnit("")
expMetric.SetDescription("")
// expMetric.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "histogram")
expHist := expMetric.SetEmptyExponentialHistogram()
expHist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

Expand Down Expand Up @@ -550,6 +580,7 @@ func TestTranslateV2(t *testing.T) {
metrics1.SetName("test_metric")
metrics1.SetUnit("seconds")
metrics1.SetDescription("longer description")
metrics1.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "gauge")

dp1 := metrics1.SetEmptyGauge().DataPoints().AppendEmpty()
dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
Expand All @@ -565,6 +596,7 @@ func TestTranslateV2(t *testing.T) {
metrics2.SetName("test_metric")
metrics2.SetUnit("milliseconds")
metrics2.SetDescription("small desc")
metrics2.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "gauge")

dp3 := metrics2.SetEmptyGauge().DataPoints().AppendEmpty()
dp3.SetTimestamp(pcommon.Timestamp(3 * int64(time.Millisecond)))
Expand Down Expand Up @@ -633,6 +665,7 @@ func TestTranslateV2(t *testing.T) {
m.SetName("normal_metric")
m.SetUnit("")
m.SetDescription("")
m.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "gauge")

dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetDoubleValue(1.0)
Expand Down Expand Up @@ -710,6 +743,7 @@ func TestTranslateV2(t *testing.T) {
m.SetName("test_metric")
m.SetUnit("")
m.SetDescription("")
// m.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "histogram")

hist := m.SetEmptyExponentialHistogram()
hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
Expand Down Expand Up @@ -793,6 +827,7 @@ func TestTranslateV2(t *testing.T) {
m.SetName("test_metric")
m.SetUnit("")
m.SetDescription("")
// m.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "histogram")

hist := m.SetEmptyExponentialHistogram()
hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
Expand Down Expand Up @@ -855,6 +890,7 @@ func TestTranslateV2(t *testing.T) {
m := sm.Metrics().AppendEmpty()
m.SetName("test_metric")
m.SetUnit("")
// m.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "histogram")

hist := m.SetEmptyExponentialHistogram()
hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
Expand Down Expand Up @@ -944,6 +980,7 @@ func TestTranslateV2(t *testing.T) {
m := sm.Metrics().AppendEmpty()
m.SetName("test_metric")
m.SetUnit("")
// m.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "histogram")

hist := m.SetEmptyExponentialHistogram()
hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
Expand Down Expand Up @@ -1357,6 +1394,7 @@ func TestTranslateV2(t *testing.T) {
m2.SetName("test_mixed_histogram")
m2.SetUnit("")
m2.SetDescription("")
// m2.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "histogram")
hist2 := m2.SetEmptyExponentialHistogram()
hist2.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

Expand Down Expand Up @@ -1387,6 +1425,7 @@ func TestTranslateV2(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics))
assert.Equal(t, tc.expectedStats, stats)
assert.Equal(t, buildMetaDataMapByID(tc.expectedMetrics), buildMetaDataMapByID(metrics))
})
}
}
Expand Down Expand Up @@ -1755,3 +1794,28 @@ func TestLRUCacheResourceMetrics(t *testing.T) {
// As just have 1 slot in the cache, but the cache for metric1 was evicted, this metric1_1 should generate a new resource metric, even having the same job/instance than the metric1.
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1_1, mockConsumer.metrics[3]))
}

func buildMetaDataMapByID(ms pmetric.Metrics) map[string]map[string]any {
Comment on lines +1797 to +1798

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for building this, that looks good as an initial implementation!

In the long run, we want this method to be moved to pkg/pdatatest/pmetrictest so this is re-usable by other components that might need to compare metadata. And also because this piece of code makes more sense over there, comparing OTLP metadata isn't really in scope of prometheusremotewritereceiver.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, will move this once it’s a bit more refined.

result := make(map[string]map[string]any)
for i := 0; i < ms.ResourceMetrics().Len(); i++ {
rm := ms.ResourceMetrics().At(i)
resourceID := identity.OfResource(rm.Resource()).String()
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
sm := rm.ScopeMetrics().At(j)
scopeName := sm.Scope().Name()
scopeVersion := sm.Scope().Version()
for k := 0; k < sm.Metrics().Len(); k++ {
m := sm.Metrics().At(k)
metricID := fmt.Sprintf("%s:%s:%s:%s:%s",
resourceID,
scopeName,
scopeVersion,
m.Name(),
m.Unit(),
)
result[metricID] = m.Metadata().AsRaw()
}
}
}
return result
}
Loading