Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1c4423a
receiver/prometheus: roundtrip Prometheus->Pdata direct conversion wi…
odeke-em Sep 3, 2021
5cf4213
Retrofit top level package for pdata comparisons
odeke-em Sep 13, 2021
959a3ed
Reuse for pdata metrics creators from PR #5231
odeke-em Sep 13, 2021
de159f6
Shed all in-code direct references to OpenCensus
odeke-em Sep 14, 2021
a90ea33
Merge remote-tracking branch 'origin/main' into receiver-prometheus-w…
Aneurysm9 Nov 23, 2021
20bf876
receiver/prometheus: fix tests
Aneurysm9 Nov 23, 2021
0874f0f
receiver/prometheus: buffer metrics in builder to avoid issues with o…
Aneurysm9 Nov 23, 2021
4739bcf
Merge remote-tracking branch 'origin/main' into receiver-prometheus-w…
Aneurysm9 Nov 23, 2021
1c85958
receiver/prometheus: restore prom->OC->pdata pipeline
Aneurysm9 Nov 23, 2021
65fd778
receiver/prometheus: add feature gate to control whether to use OpenC…
Aneurysm9 Nov 23, 2021
6770a1c
receiver/prometheus: localize metrics test helpers
Aneurysm9 Nov 23, 2021
9208e41
receiver/prometheus: store metrics in a map in metricbuilder to avoid…
Aneurysm9 Nov 23, 2021
8845e18
receiver/prometheus: run component tests against both OC and pdata ex…
Aneurysm9 Nov 23, 2021
e710275
receiver/prometheus: ensure OC metrics builder tests are deterministic
Aneurysm9 Nov 23, 2021
5a56ab7
receiver/prometheus: restore replace directive to ensure use of local…
Aneurysm9 Nov 23, 2021
f862b94
receiver/prometheus: fix lint issues
Aneurysm9 Nov 23, 2021
598f4a2
receiver/prometheus: fix lint issues
Aneurysm9 Nov 23, 2021
2112e47
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Nov 24, 2021
838f7a5
make porto
Aneurysm9 Nov 24, 2021
7e04b44
receiver/prometheus: run e2e tests in parallel to avoid doubling test…
Aneurysm9 Nov 24, 2021
d3ff42a
Revert "receiver/prometheus: run e2e tests in parallel to avoid doubl…
Aneurysm9 Nov 24, 2021
4fb78ab
receiver/prometheus: address PR feedback
Aneurysm9 Nov 24, 2021
f293579
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Nov 24, 2021
514e284
receiver/prometheus: ensure test helper can extract timestamp from me…
Aneurysm9 Nov 25, 2021
ac4dbd9
receiver/prometheus: tests should not be expecting empty label values
Aneurysm9 Nov 25, 2021
29dfb74
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Nov 29, 2021
ae0559b
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Nov 29, 2021
d0bd35d
make porto
Aneurysm9 Nov 29, 2021
4922d5c
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Nov 29, 2021
5f52842
receiver/prometheus: remove gauge histogram pdata adjustment tests
Aneurysm9 Nov 30, 2021
589a21d
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Nov 30, 2021
ad029fa
Merge remote-tracking branch 'a9/receiver-prometheus-wireUp-roundTrip…
Aneurysm9 Nov 30, 2021
24060cf
receiver/prometheus: remove unused gauge histogram data generator helper
Aneurysm9 Nov 30, 2021
4fce276
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Dec 6, 2021
9344131
Fix lint errors
Aneurysm9 Dec 6, 2021
5c67cc5
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Dec 7, 2021
9ed90fe
receiver/prometheus: Add comments detailing use of locks in metrics a…
Aneurysm9 Dec 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions receiver/prometheusreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/prometheus/prometheus/discovery/targetgroup"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/featuregate"
"gopkg.in/yaml.v2"
)

Expand All @@ -39,6 +40,16 @@ const (
prometheusConfigKey = "config"
)

var pdataPipelineGate = featuregate.Gate{
ID: "receiver.prometheus.OTLPDirect",
Enabled: false,
Description: "Controls whether to use a new translation directly from Prometheus timeseries to pdata, without an intermediate representation as OpenCensus data.",
}

func init() {
featuregate.Register(pdataPipelineGate)
}

// Config defines configuration for Prometheus receiver.
type Config struct {
config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
Expand All @@ -47,6 +58,7 @@ type Config struct {
BufferCount int `mapstructure:"buffer_count"`
UseStartTimeMetric bool `mapstructure:"use_start_time_metric"`
StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"`
pdataDirect bool
Copy link
Copy Markdown
Contributor

@dashpole dashpole Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably simpler just to use featuregate.IsEnabled(pdataPipelineGate.ID) where it is needed, rather than store it here. Is there a way to set feature gates during a test?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only a global feature gate registry, so it can't necessarily be used in tests. The feature gate registry also has a lock, so we recommend that the feature status is checked once and stored if it is needed repeatedly. There is no mechanism for operators to change feature gate statuses during program runtime, so it should be safe to read this once and re-use it.


// ConfigPlaceholder is just an entry to make the configuration pass a check
// that requires that all keys present in the config actually exist on the
Expand Down
2 changes: 2 additions & 0 deletions receiver/prometheusreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/collector/service/featuregate"
)

// This file implements config for Prometheus receiver.
Expand All @@ -44,6 +45,7 @@ func NewFactory() component.ReceiverFactory {
func createDefaultConfig() config.Receiver {
return &Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
pdataDirect: featuregate.IsEnabled(pdataPipelineGate.ID),
}
}

Expand Down
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ require (
github.com/aws/aws-sdk-go v1.42.14 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/containerd/containerd v1.5.8 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/containerd/containerd v1.4.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/digitalocean/godo v1.62.0 // indirect
github.com/docker/distribution v2.7.1+incompatible // indirect
Expand Down
361 changes: 2 additions & 359 deletions receiver/prometheusreceiver/go.sum

Large diffs are not rendered by default.

7 changes: 0 additions & 7 deletions receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
// a single scrape.
type MetricFamily interface {
Add(metricName string, ls labels.Labels, t int64, v float64) error
IsSameFamily(metricName string) bool
ToMetric() (*metricspb.Metric, int, int)
}

Expand Down Expand Up @@ -125,12 +124,6 @@ func defineInternalMetric(metricName string, metadata scrape.MetricMetadata, log
return metadata
}

func (mf *metricFamily) IsSameFamily(metricName string) bool {
// trim known suffix if necessary
familyName := normalizeMetricName(metricName)
return mf.name == familyName || familyName != metricName && mf.name == metricName
}

// updateLabelKeys is used to store all the label keys of a same metric family in observed order. since prometheus
// receiver removes any label with empty value before feeding it to an appender, in order to figure out all the labels
// from the same metric family we will need to keep track of what labels have ever been observed.
Expand Down
23 changes: 5 additions & 18 deletions receiver/prometheusreceiver/internal/metrics_adjuster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,24 +408,11 @@ func Test_jobGC(t *testing.T) {
}

var (
g1 = "gauge1"
gd1 = "gaugedist1"
c1 = "cumulative1"
cd1 = "cumulativedist1"
s1 = "summary1"
k1 = []string{"k1"}
k1k2 = []string{"k1", "k2"}
k1k2k3 = []string{"k1", "k2", "k3"}
v1v2 = []string{"v1", "v2"}
v10v20 = []string{"v10", "v20"}
v100v200 = []string{"v100", "v200"}
bounds0 = []float64{1, 2, 4}
percent0 = []float64{10, 50, 90}
t1Ms = time.Unix(0, 1000000)
t2Ms = time.Unix(0, 2000000)
t3Ms = time.Unix(0, 3000000)
t4Ms = time.Unix(0, 5000000)
t5Ms = time.Unix(0, 5000000)
t1Ms = time.Unix(0, 1000000)
t2Ms = time.Unix(0, 2000000)
t3Ms = time.Unix(0, 3000000)
t4Ms = time.Unix(0, 5000000)
t5Ms = time.Unix(0, 5000000)
)

type metricsAdjusterTest struct {
Expand Down
27 changes: 13 additions & 14 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type metricBuilder struct {
startTime float64
intervalStartTimeMs int64
logger *zap.Logger
currentMf MetricFamily
families map[string]MetricFamily
}

// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
Expand All @@ -72,6 +72,7 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric
return &metricBuilder{
mc: mc,
metrics: make([]*metricspb.Metric, 0),
families: map[string]MetricFamily{},
logger: logger,
numTimeseries: 0,
droppedTimeseries: 0,
Expand Down Expand Up @@ -137,19 +138,18 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error

b.hasData = true

if b.currentMf != nil && !b.currentMf.IsSameFamily(metricName) {
m, ts, dts := b.currentMf.ToMetric()
b.numTimeseries += ts
b.droppedTimeseries += dts
if m != nil {
b.metrics = append(b.metrics, m)
familyName := normalizeMetricName(metricName)
curMF, ok := b.families[familyName]
if !ok {
if mf, ok := b.families[metricName]; ok {
curMF = mf
} else {
curMF = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
b.families[familyName] = curMF
}
b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
} else if b.currentMf == nil {
b.currentMf = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
}

return b.currentMf.Add(metricName, ls, t, v)
return curMF.Add(metricName, ls, t, v)
}

// Build an opencensus data.MetricsData based on all added data complexValue.
Expand All @@ -162,14 +162,13 @@ func (b *metricBuilder) Build() ([]*metricspb.Metric, int, int, error) {
return nil, 0, 0, errNoDataToBuild
}

if b.currentMf != nil {
m, ts, dts := b.currentMf.ToMetric()
for _, mf := range b.families {
m, ts, dts := mf.ToMetric()
b.numTimeseries += ts
b.droppedTimeseries += dts
if m != nil {
b.metrics = append(b.metrics, m)
}
b.currentMf = nil
}

return b.metrics, b.numTimeseries, b.droppedTimeseries, nil
Expand Down
23 changes: 20 additions & 3 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,30 @@ func runBuilderTests(t *testing.T, tests []buildTestData) {
}
metrics, _, _, err := b.Build()
assert.NoError(t, err)
assert.EqualValues(t, tt.wants[i], metrics)
assertEquivalentMetrics(t, tt.wants[i], metrics)
st += interval
}
})
}
}

func assertEquivalentMetrics(t *testing.T, want, got []*metricspb.Metric) {
if !assert.Equal(t, len(want), len(got)) {
return
}
wmap := map[string]*metricspb.Metric{}
gmap := map[string]*metricspb.Metric{}

for i := 0; i < len(want); i++ {
wi := want[i]
wmap[wi.GetMetricDescriptor().GetName()] = wi
gi := got[i]
gmap[gi.GetMetricDescriptor().GetName()] = gi
}

assert.EqualValues(t, wmap, gmap)
}

func runBuilderStartTimeTests(t *testing.T, tests []buildTestData,
startTimeMetricRegex string, expectedBuilderStartTime float64) {
for _, tt := range tests {
Expand Down Expand Up @@ -1298,10 +1315,10 @@ func Test_isUsefulLabel(t *testing.T) {

func Benchmark_dpgSignature(b *testing.B) {
knownLabelKeys := []string{"a", "b"}
labels := labels.FromStrings("a", "va", "b", "vb", "x", "xa")
ls := labels.FromStrings("a", "va", "b", "vb", "x", "xa")
b.ReportAllocs()
for i := 0; i < b.N; i++ {
runtime.KeepAlive(dpgSignature(knownLabelKeys, labels))
runtime.KeepAlive(dpgSignature(knownLabelKeys, ls))
}
}

Expand Down
140 changes: 140 additions & 0 deletions receiver/prometheusreceiver/internal/metricsutil_pdata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"

import "go.opentelemetry.io/collector/model/pdata"

type kv struct {
Key, Value string
}

func distPointPdata(ts pdata.Timestamp, bounds []float64, counts []uint64) *pdata.HistogramDataPoint {
hdp := pdata.NewHistogramDataPoint()
hdp.SetExplicitBounds(bounds)
hdp.SetBucketCounts(counts)
hdp.SetTimestamp(ts)
var sum float64
var count uint64
for i, bcount := range counts {
count += bcount
if i > 0 {
sum += float64(bcount) * bounds[i-1]
}
}
hdp.SetCount(count)
hdp.SetSum(sum)

return &hdp
}

func cumulativeDistMetricPdata(name string, kvp []*kv, startTs pdata.Timestamp, points ...*pdata.HistogramDataPoint) *pdata.Metric {
metric := pdata.NewMetric()
metric.SetName(name)
metric.SetDataType(pdata.MetricDataTypeHistogram)
histogram := metric.Histogram()
histogram.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)

destPointL := histogram.DataPoints()
// By default the AggregationTemporality is Cumulative until it'll be changed by the caller.
for _, point := range points {
destPoint := destPointL.AppendEmpty()
point.CopyTo(destPoint)
point.SetStartTimestamp(startTs)
attrs := destPoint.Attributes()
for _, kv := range kvp {
attrs.InsertString(kv.Key, kv.Value)
}
}
return &metric
}

func doublePointPdata(ts pdata.Timestamp, value float64) *pdata.NumberDataPoint {
ndp := pdata.NewNumberDataPoint()
ndp.SetTimestamp(ts)
ndp.SetDoubleVal(value)

return &ndp
}

func gaugeMetricPdata(name string, kvp []*kv, startTs pdata.Timestamp, points ...*pdata.NumberDataPoint) *pdata.Metric {
metric := pdata.NewMetric()
metric.SetName(name)
metric.SetDataType(pdata.MetricDataTypeGauge)

destPointL := metric.Gauge().DataPoints()
for _, point := range points {
destPoint := destPointL.AppendEmpty()
point.CopyTo(destPoint)
point.SetStartTimestamp(startTs)
attrs := destPoint.Attributes()
for _, kv := range kvp {
attrs.InsertString(kv.Key, kv.Value)
}
}
return &metric
}

func summaryPointPdata(ts pdata.Timestamp, count uint64, sum float64, quantiles, values []float64) *pdata.SummaryDataPoint {
sdp := pdata.NewSummaryDataPoint()
sdp.SetTimestamp(ts)
sdp.SetCount(count)
sdp.SetSum(sum)
qvL := sdp.QuantileValues()
for i := 0; i < len(quantiles); i++ {
qvi := qvL.AppendEmpty()
qvi.SetQuantile(quantiles[i])
qvi.SetValue(values[i])
}
return &sdp
}

func summaryMetricPdata(name string, kvp []*kv, startTs pdata.Timestamp, points ...*pdata.SummaryDataPoint) *pdata.Metric {
metric := pdata.NewMetric()
metric.SetName(name)
metric.SetDataType(pdata.MetricDataTypeSummary)

destPointL := metric.Summary().DataPoints()
for _, point := range points {
destPoint := destPointL.AppendEmpty()
point.CopyTo(destPoint)
point.SetStartTimestamp(startTs)
attrs := destPoint.Attributes()
for _, kv := range kvp {
attrs.InsertString(kv.Key, kv.Value)
}
}
return &metric
}

func sumMetricPdata(name string, kvp []*kv, startTs pdata.Timestamp, points ...*pdata.NumberDataPoint) *pdata.Metric {
metric := pdata.NewMetric()
metric.SetName(name)
metric.SetDataType(pdata.MetricDataTypeSum)
sum := metric.Sum()
sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
sum.SetIsMonotonic(true)

destPointL := sum.DataPoints()
for _, point := range points {
destPoint := destPointL.AppendEmpty()
point.CopyTo(destPoint)
point.SetStartTimestamp(startTs)
attrs := destPoint.Attributes()
for _, kv := range kvp {
attrs.InsertString(kv.Key, kv.Value)
}
}
return &metric
}
Loading