Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package internal

import (
"fmt"
"sort"
"strings"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
)

// MetricFamilyPdata is unit which is corresponding to the metrics items which shared the same TYPE/UNIT/... metadata from
Expand Down Expand Up @@ -51,7 +53,7 @@ type metricGroupPdata struct {
family *metricFamilyPdata
}

func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamilyPdata {
func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamilyPdata {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand All @@ -67,6 +69,13 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTime
metadata.Metric = familyName
metadata.Type = textparse.MetricTypeUnknown
}
} else if !ok && isInternalMetric(metricName) {
metadata = defineInternalMetric(metricName, metadata, logger)
}

mtype := convToPdataMetricType(metadata.Type)
if mtype == pdata.MetricDataTypeNone {
logger.Debug(fmt.Sprintf("Invalid metric : %s %+v", metricName, metadata))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetricName and metadata should fields not a dynamic string to allow for easier searching in logging systems.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also question the value of having the log statement here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in the original code

ocaMetricType := convToOCAMetricType(metadata.Type)
if ocaMetricType == metricspb.MetricDescriptor_UNSPECIFIED {
logger.Debug(fmt.Sprintf("Invalid metric : %s %+v", metricName, metadata))
}

this code is a port. @MovieStoreGuy I appreciate the enthusiasm but the context here is that I am porting over code, which requires careful parity changes; you can file bugs for improvements but these PRs am sending are for porting over code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened a followup issue here #5720, can this comment be resolved @MovieStoreGuy?

}

return &metricFamilyPdata{
Expand All @@ -85,6 +94,12 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTime
}
}

func (mf *metricFamilyPdata) IsSameFamily(metricName string) bool {
// trim known suffix if necessary
familyName := normalizeMetricName(metricName)
return mf.name == familyName || familyName != metricName && mf.name == metricName
}

// updateLabelKeys is used to store all the label keys of a same metric family in observed order. since prometheus
// receiver removes any label with empty value before feeding it to an appender, in order to figure out all the labels
// from the same metric family we will need to keep track of what labels have ever been observed.
Expand All @@ -109,6 +124,12 @@ func (mf *metricFamilyPdata) getGroupKey(ls labels.Labels) string {
return dpgSignature(mf.labelKeysOrdered, ls)
}

func (mg *metricGroupPdata) sortPoints() {
sort.Slice(mg.complexValue, func(i, j int) bool {
return mg.complexValue[i].boundary < mg.complexValue[j].boundary
})
}

func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest *pdata.HistogramDataPointSlice) bool {
if !mg.hasCount || len(mg.complexValue) == 0 {
return false
Expand Down Expand Up @@ -273,6 +294,9 @@ func (mf *metricFamilyPdata) getGroups() []*metricGroupPdata {

func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int) {
metric := pdata.NewMetric()
metric.SetDataType(mf.mtype)
metric.SetName(mf.name)

pointCount := 0

switch mf.mtype {
Expand Down Expand Up @@ -307,6 +331,8 @@ func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int
pointCount = sdpL.Len()

default:
// Everything else should be a gauge.
metric.SetDataType(pdata.MetricDataTypeGauge)
gauge := metric.Gauge()
gdpL := gauge.DataPoints()
for _, mg := range mf.getGroups() {
Expand Down
14 changes: 7 additions & 7 deletions receiver/prometheusreceiver/internal/otlp_metricfamily_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestIsCumulativeEquivalence(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily)
mfp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata)
mfp := newMetricFamilyPdata(tt.name, mc, testLogger, 1).(*metricFamilyPdata)
assert.Equal(t, mf.isCumulativeType(), mfp.isCumulativeTypePdata(), "mismatch in isCumulative")
assert.Equal(t, mf.isCumulativeType(), tt.want, "isCumulative does not match for regular metricFamily")
assert.Equal(t, mfp.isCumulativeTypePdata(), tt.want, "isCumulative does not match for pdata metricFamily")
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.metricName, mc, tt.intervalStartTimeMs).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.metricName, mc, testLogger, tt.intervalStartTimeMs).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestMetricGroupData_toDistributionPointEquivalence(t *testing.T) {
intervalStartTimeMs := int64(i + 1)
t.Run(tt.name, func(t *testing.T) {
mf := newMetricFamily(tt.name, mc, zap.NewNop(), intervalStartTimeMs).(*metricFamily)
mp := newMetricFamilyPdata(tt.name, mc, intervalStartTimeMs).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.name, mc, testLogger, intervalStartTimeMs).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.name, mc, testLogger, 1).(*metricFamilyPdata)
for _, lbs := range tt.labelsScrapes {
for _, scrape := range lbs.scrapes {
require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value))
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestMetricGroupData_toSummaryPointEquivalence(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mf := newMetricFamily(tt.name, mc, zap.NewNop(), 1).(*metricFamily)
mp := newMetricFamilyPdata(tt.name, mc, 1).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.name, mc, testLogger, 1).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.metricKind, mc, tt.intervalStartTimestampMs).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.metricKind, mc, testLogger, tt.intervalStartTimestampMs).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
Expand Down Expand Up @@ -553,7 +553,7 @@ func TestMetricGroupData_toNumberDataPointEquivalence(t *testing.T) {
intervalStartTimeMs := int64(11 + i)
t.Run(tt.name, func(t *testing.T) {
mf := newMetricFamily(tt.name, mc, zap.NewNop(), intervalStartTimeMs).(*metricFamily)
mp := newMetricFamilyPdata(tt.name, mc, intervalStartTimeMs).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.name, mc, testLogger, intervalStartTimeMs).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
require.NoError(t, mf.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
Expand Down
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/internal/otlp_metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64)
ts, dts := b.currentMf.ToMetricPdata(&b.metrics)
b.numTimeseries += ts
b.droppedTimeseries += dts
b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs)
b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.logger, b.intervalStartTimeMs)
} else if b.currentMf == nil {
b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs)
b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.logger, b.intervalStartTimeMs)
}

return b.currentMf.Add(metricName, ls, t, v)
Expand Down