Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

// MetricFamilyPdata is unit which is corresponding to the metrics items which shared the same TYPE/UNIT/... metadata from
// a single scrape.
type MetricFamilyPdata interface {
Add(metricName string, ls labels.Labels, t int64, v float64) error
IsSameFamily(metricName string) bool
ToMetricPdata(metrics *pdata.MetricSlice) (int, int)
}

type metricFamilyPdata struct {
// We are composing the already present metricFamily to
// make for a scalable migration, so that we only edit target
Expand All @@ -44,7 +52,7 @@ type metricGroupPdata struct {
family *metricFamilyPdata
}

func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamily {
func newMetricFamilyPdata(metricName string, mc MetadataCache, intervalStartTimeMs int64) MetricFamilyPdata {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand Down Expand Up @@ -254,3 +262,68 @@ func (mf *metricFamilyPdata) Add(metricName string, ls labels.Labels, t int64, v

return nil
}

// getGroups to return groups in insertion order
func (mf *metricFamilyPdata) getGroups() []*metricGroupPdata {
groups := make([]*metricGroupPdata, len(mf.groupOrders))
for k, v := range mf.groupOrders {
groups[v] = mf.groups[k]
}
return groups
}

func (mf *metricFamilyPdata) ToMetricPdata(metrics *pdata.MetricSlice) (int, int) {
metric := pdata.NewMetric()
pointCount := 0

switch mf.mtype {
case pdata.MetricDataTypeHistogram:
histogram := metric.Histogram()
hdpL := histogram.DataPoints()
for _, mg := range mf.getGroups() {
if !mg.toDistributionPoint(mf.labelKeysOrdered, &hdpL) {
mf.droppedTimeseries++
}
}
pointCount = hdpL.Len()

case pdata.MetricDataTypeSummary:
summary := metric.Summary()
sdpL := summary.DataPoints()
for _, mg := range mf.getGroups() {
if !mg.toSummaryPoint(mf.labelKeysOrdered, &sdpL) {
mf.droppedTimeseries++
}
}
pointCount = sdpL.Len()

case pdata.MetricDataTypeSum:
sum := metric.Sum()
sdpL := sum.DataPoints()
for _, mg := range mf.getGroups() {
if !mg.toNumberDataPoint(mf.labelKeysOrdered, &sdpL) {
mf.droppedTimeseries++
}
}
pointCount = sdpL.Len()

default:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is missing a case for counters, but that seems to be covered in #3741.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I added that case in the other PR but I've brought it in, please take a look again. Thank you @Aneurysm9!

gauge := metric.Gauge()
gdpL := gauge.DataPoints()
for _, mg := range mf.getGroups() {
if !mg.toNumberDataPoint(mf.labelKeysOrdered, &gdpL) {
mf.droppedTimeseries++
}
}
pointCount = gdpL.Len()
}

if pointCount == 0 {
return mf.droppedTimeseries, mf.droppedTimeseries
}

metric.CopyTo(metrics.AppendEmpty())

// note: the total number of points is the number of points+droppedTimeseries.
return pointCount + mf.droppedTimeseries, mf.droppedTimeseries
}
105 changes: 105 additions & 0 deletions receiver/prometheusreceiver/internal/otlp_metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package internal

import (
"fmt"
"regexp"
"sort"
"strconv"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"go.uber.org/zap"

"go.opentelemetry.io/collector/model/pdata"
)
Expand Down Expand Up @@ -76,3 +81,103 @@ func convToPdataMetricType(metricType textparse.MetricType) pdata.MetricDataType
return pdata.MetricDataTypeNone
}
}

type metricBuilderPdata struct {
*metricBuilder
metrics pdata.MetricSlice
currentMf MetricFamilyPdata
}

// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
// scraped page by calling its AddDataPoint function, and turn them into an opencensus data.MetricsData object
// by calling its Build function
func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, stalenessStore *stalenessStore) *metricBuilderPdata {
var regex *regexp.Regexp
if startTimeMetricRegex != "" {
regex, _ = regexp.Compile(startTimeMetricRegex)
}
return &metricBuilderPdata{
metrics: pdata.NewMetricSlice(),
metricBuilder: &metricBuilder{
mc: mc,
logger: logger,
numTimeseries: 0,
droppedTimeseries: 0,
useStartTimeMetric: useStartTimeMetric,
startTimeMetricRegex: regex,
stalenessStore: stalenessStore,
},
}
}

// This code is used in follow-up changes but golangci-lint is so pedantic.
var _ = newMetricBuilderPdata
var _ = (*metricBuilderPdata)(nil).AddDataPoint

// AddDataPoint is for feeding prometheus data complexValue in its processing order
func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64) (rerr error) {
// Any datapoint with duplicate labels MUST be rejected per:
// * https://github.com/open-telemetry/wg-prometheus/issues/44
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
// as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.
seen := make(map[string]bool)
dupLabels := make([]string, 0, len(ls))
for _, label := range ls {
if _, ok := seen[label.Name]; ok {
dupLabels = append(dupLabels, label.Name)
}
seen[label.Name] = true
}
if len(dupLabels) != 0 {
sort.Strings(dupLabels)
return fmt.Errorf("invalid sample: non-unique label names: %q", dupLabels)
}

defer func() {
// Only mark this data point as in the current scrape
// iff it isn't a stale metric.
if rerr == nil && !value.IsStaleNaN(v) {
b.stalenessStore.markAsCurrentlySeen(ls, t)
}
}()

metricName := ls.Get(model.MetricNameLabel)
switch {
case metricName == "":
b.numTimeseries++
b.droppedTimeseries++
return errMetricNameNotFound
case isInternalMetric(metricName):
b.hasInternalMetric = true
lm := ls.Map()
// See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
// up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.
if metricName == scrapeUpMetricName && v != 1.0 {
if v == 0.0 {
b.logger.Warn("Failed to scrape Prometheus endpoint",
zap.Int64("scrape_timestamp", t),
zap.String("target_labels", fmt.Sprintf("%v", lm)))
} else {
b.logger.Warn("The 'up' metric contains invalid value",
zap.Float64("value", v),
zap.Int64("scrape_timestamp", t),
zap.String("target_labels", fmt.Sprintf("%v", lm)))
}
}
case b.useStartTimeMetric && b.matchStartTimeMetric(metricName):
b.startTime = v
}

b.hasData = true

if b.currentMf != nil && !b.currentMf.IsSameFamily(metricName) {
ts, dts := b.currentMf.ToMetricPdata(&b.metrics)
b.numTimeseries += ts
b.droppedTimeseries += dts
b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs)
} else if b.currentMf == nil {
b.currentMf = newMetricFamilyPdata(metricName, b.mc, b.intervalStartTimeMs)
}

return b.currentMf.Add(metricName, ls, t, v)
}