From 71f1af82548047ce5ba8913afeca608d851b2377 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 24 Sep 2021 11:55:27 +0200 Subject: [PATCH 1/4] [exporter/datadogexporter] Copy main branch contents --- .../model/translator/metrics_translator.go | 445 +++++++ .../translator/metrics_translator_test.go | 1038 +++++++++++++++++ pkg/otlp/model/translator/sketches_test.go | 190 +++ pkg/otlp/model/translator/ttlcache.go | 89 ++ pkg/otlp/model/translator/ttlcache_test.go | 75 ++ 5 files changed, 1837 insertions(+) create mode 100644 pkg/otlp/model/translator/metrics_translator.go create mode 100644 pkg/otlp/model/translator/metrics_translator_test.go create mode 100644 pkg/otlp/model/translator/sketches_test.go create mode 100644 pkg/otlp/model/translator/ttlcache.go create mode 100644 pkg/otlp/model/translator/ttlcache_test.go diff --git a/pkg/otlp/model/translator/metrics_translator.go b/pkg/otlp/model/translator/metrics_translator.go new file mode 100644 index 000000000000..538ac2250d3e --- /dev/null +++ b/pkg/otlp/model/translator/metrics_translator.go @@ -0,0 +1,445 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "context" + "fmt" + "math" + "strconv" + "time" + + "github.com/DataDog/datadog-agent/pkg/quantile" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" + "gopkg.in/zorkian/go-datadog-api.v2" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/sketches" +) + +const metricName string = "metric name" + +const ( + histogramModeNoBuckets = "nobuckets" + histogramModeCounters = "counters" + histogramModeDistributions = "distributions" +) + +// HostnameProvider gets a hostname +type HostnameProvider interface { + // Hostname gets the hostname from the machine. + Hostname(ctx context.Context) (string, error) +} + +type Translator struct { + prevPts *TTLCache + logger *zap.Logger + cfg config.MetricsConfig + buildInfo component.BuildInfo + fallbackHostnameProvider HostnameProvider +} + +func New(cache *TTLCache, params component.ExporterCreateSettings, cfg config.MetricsConfig, fallbackHostProvider HostnameProvider) *Translator { + return &Translator{cache, params.Logger, cfg, params.BuildInfo, fallbackHostProvider} +} + +// getTags maps an attributeMap into a slice of Datadog tags +func getTags(labels pdata.AttributeMap) []string { + tags := make([]string, 0, labels.Len()) + labels.Range(func(key string, value pdata.AttributeValue) bool { + v := value.AsString() + if v == "" { + // Tags can't end with ":" so we replace empty values with "n/a" + v = "n/a" + } + tags = append(tags, fmt.Sprintf("%s:%s", key, v)) + return true + }) + return tags +} + +// isCumulativeMonotonic checks if a metric is a cumulative monotonic metric +func isCumulativeMonotonic(md pdata.Metric) bool { + switch md.DataType() { + case pdata.MetricDataTypeSum: + return md.Sum().AggregationTemporality() == pdata.AggregationTemporalityCumulative && + md.Sum().IsMonotonic() + } + return false +} + +// isSkippable checks if a value can be skipped (because it is not supported by the backend). +// It logs that the value is unsupported for debugging since this sometimes means there is a bug. +func (t *Translator) isSkippable(name string, v float64) bool { + skippable := math.IsInf(v, 0) || math.IsNaN(v) + if skippable { + t.logger.Debug("Unsupported metric value", zap.String(metricName, name), zap.Float64("value", v)) + } + return skippable +} + +// mapNumberMetrics maps double datapoints into Datadog metrics +func (t *Translator) mapNumberMetrics(name string, dt metrics.MetricDataType, slice pdata.NumberDataPointSlice, attrTags []string) []datadog.Metric { + ms := make([]datadog.Metric, 0, slice.Len()) + for i := 0; i < slice.Len(); i++ { + p := slice.At(i) + tags := getTags(p.Attributes()) + tags = append(tags, attrTags...) + var val float64 + switch p.Type() { + case pdata.MetricValueTypeDouble: + val = p.DoubleVal() + case pdata.MetricValueTypeInt: + val = float64(p.IntVal()) + } + + if t.isSkippable(name, val) { + continue + } + + ms = append(ms, + metrics.NewMetric(name, dt, uint64(p.Timestamp()), val, tags), + ) + } + return ms +} + +// mapNumberMonotonicMetrics maps monotonic datapoints into Datadog metrics +func (t *Translator) mapNumberMonotonicMetrics(name string, slice pdata.NumberDataPointSlice, attrTags []string) []datadog.Metric { + ms := make([]datadog.Metric, 0, slice.Len()) + for i := 0; i < slice.Len(); i++ { + p := slice.At(i) + ts := uint64(p.Timestamp()) + tags := getTags(p.Attributes()) + tags = append(tags, attrTags...) + + var val float64 + switch p.Type() { + case pdata.MetricValueTypeDouble: + val = p.DoubleVal() + case pdata.MetricValueTypeInt: + val = float64(p.IntVal()) + } + + if t.isSkippable(name, val) { + continue + } + + if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, val); ok { + ms = append(ms, metrics.NewCount(name, ts, dx, tags)) + } + } + return ms +} + +func getBounds(p pdata.HistogramDataPoint, idx int) (lowerBound float64, upperBound float64) { + // See https://github.com/open-telemetry/opentelemetry-proto/blob/v0.10.0/opentelemetry/proto/metrics/v1/metrics.proto#L427-L439 + lowerBound = math.Inf(-1) + upperBound = math.Inf(1) + if idx > 0 { + lowerBound = p.ExplicitBounds()[idx-1] + } + if idx < len(p.ExplicitBounds()) { + upperBound = p.ExplicitBounds()[idx] + } + return +} + +func (t *Translator) getSketchBuckets(name string, ts uint64, p pdata.HistogramDataPoint, delta bool, tags []string) sketches.SketchSeries { + as := &quantile.Agent{} + for j := range p.BucketCounts() { + lowerBound, upperBound := getBounds(p, j) + // InsertInterpolate doesn't work with an infinite bound; insert in to the bucket that contains the non-infinite bound + // https://github.com/DataDog/datadog-agent/blob/7.31.0/pkg/aggregator/check_sampler.go#L107-L111 + if math.IsInf(upperBound, 1) { + upperBound = lowerBound + } else if math.IsInf(lowerBound, -1) { + lowerBound = upperBound + } + + count := p.BucketCounts()[j] + if delta { + as.InsertInterpolate(lowerBound, upperBound, uint(count)) + } else if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, float64(count)); ok { + as.InsertInterpolate(lowerBound, upperBound, uint(dx)) + } + + } + return sketches.SketchSeries{ + Name: name, + Tags: tags, + Interval: 1, + Points: []sketches.SketchPoint{{ + Ts: int64(p.Timestamp() / 1e9), + Sketch: as.Finish(), + }}, + } +} + +func (t *Translator) getLegacyBuckets(name string, p pdata.HistogramDataPoint, delta bool, tags []string) []datadog.Metric { + // We have a single metric, 'bucket', which is tagged with the bucket bounds. See: + // https://github.com/DataDog/integrations-core/blob/7.30.1/datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/transformers/histogram.py + ms := make([]datadog.Metric, 0, len(p.BucketCounts())) + fullName := fmt.Sprintf("%s.bucket", name) + for idx, val := range p.BucketCounts() { + lowerBound, upperBound := getBounds(p, idx) + bucketTags := []string{ + fmt.Sprintf("lower_bound:%s", formatFloat(lowerBound)), + fmt.Sprintf("upper_bound:%s", formatFloat(upperBound)), + } + bucketTags = append(bucketTags, tags...) + + count := float64(val) + ts := uint64(p.Timestamp()) + if delta { + ms = append(ms, metrics.NewCount(fullName, ts, count, bucketTags)) + } else if dx, ok := t.prevPts.putAndGetDiff(fullName, bucketTags, ts, count); ok { + ms = append(ms, metrics.NewCount(fullName, ts, dx, bucketTags)) + } + } + return ms +} + +// mapHistogramMetrics maps double histogram metrics slices to Datadog metrics +// +// A Histogram metric has: +// - The count of values in the population +// - The sum of values in the population +// - A number of buckets, each of them having +// - the bounds that define the bucket +// - the count of the number of items in that bucket +// - a sample value from each bucket +// +// We follow a similar approach to our OpenMetrics check: +// we report sum and count by default; buckets count can also +// be reported (opt-in) tagged by lower bound. +func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataPointSlice, delta bool, attrTags []string) (ms []datadog.Metric, sl sketches.SketchSeriesList) { + // Allocate assuming none are nil and no buckets + ms = make([]datadog.Metric, 0, 2*slice.Len()) + if t.cfg.HistConfig.Mode == histogramModeDistributions { + sl = make(sketches.SketchSeriesList, 0, slice.Len()) + } + for i := 0; i < slice.Len(); i++ { + p := slice.At(i) + ts := uint64(p.Timestamp()) + tags := getTags(p.Attributes()) + tags = append(tags, attrTags...) + + if t.cfg.HistConfig.SendCountSum { + count := float64(p.Count()) + countName := fmt.Sprintf("%s.count", name) + if delta { + ms = append(ms, metrics.NewCount(countName, ts, count, tags)) + } else if dx, ok := t.prevPts.putAndGetDiff(countName, tags, ts, count); ok { + ms = append(ms, metrics.NewCount(countName, ts, dx, tags)) + } + } + + if t.cfg.HistConfig.SendCountSum { + sum := p.Sum() + sumName := fmt.Sprintf("%s.sum", name) + if !t.isSkippable(sumName, p.Sum()) { + if delta { + ms = append(ms, metrics.NewCount(sumName, ts, sum, tags)) + } else if dx, ok := t.prevPts.putAndGetDiff(sumName, tags, ts, sum); ok { + ms = append(ms, metrics.NewCount(sumName, ts, dx, tags)) + } + } + } + + switch t.cfg.HistConfig.Mode { + case histogramModeCounters: + ms = append(ms, t.getLegacyBuckets(name, p, delta, tags)...) + case histogramModeDistributions: + sl = append(sl, t.getSketchBuckets(name, ts, p, true, tags)) + } + } + return +} + +// formatFloat formats a float number as close as possible to what +// we do on the Datadog Agent Python OpenMetrics check, which, in turn, tries to +// follow https://github.com/OpenObservability/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#considerations-canonical-numbers +func formatFloat(f float64) string { + if math.IsInf(f, 1) { + return "inf" + } else if math.IsInf(f, -1) { + return "-inf" + } else if math.IsNaN(f) { + return "nan" + } else if f == 0 { + return "0" + } + + // Add .0 to whole numbers + s := strconv.FormatFloat(f, 'g', -1, 64) + if f == math.Floor(f) { + s = s + ".0" + } + return s +} + +// getQuantileTag returns the quantile tag for summary types. +func getQuantileTag(quantile float64) string { + return fmt.Sprintf("quantile:%s", formatFloat(quantile)) +} + +// mapSummaryMetrics maps summary datapoints into Datadog metrics +func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPointSlice, attrTags []string) []datadog.Metric { + // Allocate assuming none are nil and no quantiles + ms := make([]datadog.Metric, 0, 2*slice.Len()) + for i := 0; i < slice.Len(); i++ { + p := slice.At(i) + ts := uint64(p.Timestamp()) + tags := getTags(p.Attributes()) + tags = append(tags, attrTags...) + + // count and sum are increasing; we treat them as cumulative monotonic sums. + { + countName := fmt.Sprintf("%s.count", name) + if dx, ok := t.prevPts.putAndGetDiff(countName, tags, ts, float64(p.Count())); ok && !t.isSkippable(countName, dx) { + ms = append(ms, metrics.NewCount(countName, ts, dx, tags)) + } + } + + { + sumName := fmt.Sprintf("%s.sum", name) + if !t.isSkippable(sumName, p.Sum()) { + if dx, ok := t.prevPts.putAndGetDiff(sumName, tags, ts, p.Sum()); ok { + ms = append(ms, metrics.NewCount(sumName, ts, dx, tags)) + } + } + } + + if t.cfg.Quantiles { + fullName := fmt.Sprintf("%s.quantile", name) + quantiles := p.QuantileValues() + for i := 0; i < quantiles.Len(); i++ { + q := quantiles.At(i) + + if t.isSkippable(fullName, q.Value()) { + continue + } + + quantileTags := []string{getQuantileTag(q.Quantile())} + quantileTags = append(quantileTags, tags...) + ms = append(ms, + metrics.NewGauge(fullName, ts, q.Value(), quantileTags), + ) + } + } + } + return ms +} + +// MapMetrics maps OTLP metrics into the DataDog format +func (t *Translator) MapMetrics(md pdata.Metrics) (series []datadog.Metric, sl sketches.SketchSeriesList) { + pushTime := uint64(time.Now().UTC().UnixNano()) + rms := md.ResourceMetrics() + seenHosts := make(map[string]struct{}) + for i := 0; i < rms.Len(); i++ { + rm := rms.At(i) + + var attributeTags []string + + // Only fetch attribute tags if they're not already converted into labels. + // Otherwise some tags would be present twice in a metric's tag list. + if !t.cfg.ExporterConfig.ResourceAttributesAsTags { + attributeTags = attributes.TagsFromAttributes(rm.Resource().Attributes()) + } + + host, ok := attributes.HostnameFromAttributes(rm.Resource().Attributes()) + if !ok { + fallbackHost, err := t.fallbackHostnameProvider.Hostname(context.Background()) + host = "" + if err == nil { + host = fallbackHost + } + } + seenHosts[host] = struct{}{} + + ilms := rm.InstrumentationLibraryMetrics() + for j := 0; j < ilms.Len(); j++ { + ilm := ilms.At(j) + metricsArray := ilm.Metrics() + for k := 0; k < metricsArray.Len(); k++ { + md := metricsArray.At(k) + var datapoints []datadog.Metric + var sketchesPoints sketches.SketchSeriesList + switch md.DataType() { + case pdata.MetricDataTypeGauge: + datapoints = t.mapNumberMetrics(md.Name(), metrics.Gauge, md.Gauge().DataPoints(), attributeTags) + case pdata.MetricDataTypeSum: + switch md.Sum().AggregationTemporality() { + case pdata.AggregationTemporalityCumulative: + if t.cfg.SendMonotonic && isCumulativeMonotonic(md) { + datapoints = t.mapNumberMonotonicMetrics(md.Name(), md.Sum().DataPoints(), attributeTags) + } else { + datapoints = t.mapNumberMetrics(md.Name(), metrics.Gauge, md.Sum().DataPoints(), attributeTags) + } + case pdata.AggregationTemporalityDelta: + datapoints = t.mapNumberMetrics(md.Name(), metrics.Count, md.Sum().DataPoints(), attributeTags) + default: // pdata.AggregationTemporalityUnspecified or any other not supported type + t.logger.Debug("Unknown or unsupported aggregation temporality", + zap.String(metricName, md.Name()), + zap.Any("aggregation temporality", md.Sum().AggregationTemporality()), + ) + continue + } + case pdata.MetricDataTypeHistogram: + switch md.Histogram().AggregationTemporality() { + case pdata.AggregationTemporalityCumulative, pdata.AggregationTemporalityDelta: + delta := md.Histogram().AggregationTemporality() == pdata.AggregationTemporalityDelta + datapoints, sketchesPoints = t.mapHistogramMetrics(md.Name(), md.Histogram().DataPoints(), delta, attributeTags) + default: // pdata.AggregationTemporalityUnspecified or any other not supported type + t.logger.Debug("Unknown or unsupported aggregation temporality", + zap.String("metric name", md.Name()), + zap.Any("aggregation temporality", md.Histogram().AggregationTemporality()), + ) + continue + } + case pdata.MetricDataTypeSummary: + datapoints = t.mapSummaryMetrics(md.Name(), md.Summary().DataPoints(), attributeTags) + default: // pdata.MetricDataTypeNone or any other not supported type + t.logger.Debug("Unknown or unsupported metric type", zap.String(metricName, md.Name()), zap.Any("data type", md.DataType())) + continue + } + + for i := range datapoints { + datapoints[i].SetHost(host) + } + + for i := range sl { + sl[i].Host = host + } + + series = append(series, datapoints...) + sl = append(sl, sketchesPoints...) + } + } + } + + for host := range seenHosts { + // Report the host as running + runningMetric := metrics.DefaultMetrics("metrics", host, pushTime, t.buildInfo) + series = append(series, runningMetric...) + } + + return +} diff --git a/pkg/otlp/model/translator/metrics_translator_test.go b/pkg/otlp/model/translator/metrics_translator_test.go new file mode 100644 index 000000000000..d2e798fb37a7 --- /dev/null +++ b/pkg/otlp/model/translator/metrics_translator_test.go @@ -0,0 +1,1038 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "context" + "math" + "testing" + "time" + + gocache "github.com/patrickmn/go-cache" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + "gopkg.in/zorkian/go-datadog-api.v2" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" +) + +var defaultCfg = config.MetricsConfig{ + SendMonotonic: true, + HistConfig: config.HistogramConfig{ + Mode: histogramModeNoBuckets, + SendCountSum: true, + }, +} + +func TestMetricValue(t *testing.T) { + var ( + name = "name" + value = math.Pi + ts = uint64(time.Now().UnixNano()) + tags = []string{"tool:opentelemetry", "version:0.1.0"} + ) + + metric := metrics.NewGauge(name, ts, value, tags) + assert.Equal(t, string(metrics.Gauge), metric.GetType()) + assert.Equal(t, tags, metric.Tags) +} + +func TestGetTags(t *testing.T) { + attributes := pdata.NewAttributeMapFromMap(map[string]pdata.AttributeValue{ + "key1": pdata.NewAttributeValueString("val1"), + "key2": pdata.NewAttributeValueString("val2"), + "key3": pdata.NewAttributeValueString(""), + }) + + assert.ElementsMatch(t, + getTags(attributes), + [...]string{"key1:val1", "key2:val2", "key3:n/a"}, + ) +} + +func TestIsCumulativeMonotonic(t *testing.T) { + // Some of these examples are from the hostmetrics receiver + // and reflect the semantic meaning of the metrics there. + // + // If the receiver changes these examples should be added here too + + { // Sum: Cumulative but not monotonic + metric := pdata.NewMetric() + metric.SetName("system.filesystem.usage") + metric.SetDescription("Filesystem bytes used.") + metric.SetUnit("bytes") + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetIsMonotonic(false) + sum.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + + assert.False(t, isCumulativeMonotonic(metric)) + } + + { // Sum: Cumulative and monotonic + metric := pdata.NewMetric() + metric.SetName("system.network.packets") + metric.SetDescription("The number of packets transferred.") + metric.SetUnit("1") + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + + assert.True(t, isCumulativeMonotonic(metric)) + } + + { // DoubleSumL Cumulative and monotonic + metric := pdata.NewMetric() + metric.SetName("metric.example") + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + + assert.True(t, isCumulativeMonotonic(metric)) + } + + { // Not IntSum + metric := pdata.NewMetric() + metric.SetName("system.cpu.load_average.1m") + metric.SetDescription("Average CPU Load over 1 minute.") + metric.SetUnit("1") + metric.SetDataType(pdata.MetricDataTypeGauge) + + assert.False(t, isCumulativeMonotonic(metric)) + } +} + +type testProvider string + +func (t testProvider) Hostname(context.Context) (string, error) { + return string(t), nil +} + +func newTranslator(logger *zap.Logger, cfg config.MetricsConfig) *Translator { + params := component.ExporterCreateSettings{ + BuildInfo: component.BuildInfo{ + Version: "1.0", + }, + TelemetrySettings: component.TelemetrySettings{ + Logger: logger, + }, + } + return New(newTestCache(), params, cfg, testProvider("fallbackHostname")) +} + +func TestMapIntMetrics(t *testing.T) { + ts := pdata.NewTimestampFromTime(time.Now()) + slice := pdata.NewNumberDataPointSlice() + point := slice.AppendEmpty() + point.SetIntVal(17) + point.SetTimestamp(ts) + tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + + assert.ElementsMatch(t, + tr.mapNumberMetrics("int64.test", metrics.Gauge, slice, []string{}), + []datadog.Metric{metrics.NewGauge("int64.test", uint64(ts), 17, []string{})}, + ) + + assert.ElementsMatch(t, + tr.mapNumberMetrics("int64.delta.test", metrics.Count, slice, []string{}), + []datadog.Metric{metrics.NewCount("int64.delta.test", uint64(ts), 17, []string{})}, + ) + + // With attribute tags + assert.ElementsMatch(t, + tr.mapNumberMetrics("int64.test", metrics.Gauge, slice, []string{"attribute_tag:attribute_value"}), + []datadog.Metric{metrics.NewGauge("int64.test", uint64(ts), 17, []string{"attribute_tag:attribute_value"})}, + ) +} + +func TestMapDoubleMetrics(t *testing.T) { + ts := pdata.NewTimestampFromTime(time.Now()) + slice := pdata.NewNumberDataPointSlice() + point := slice.AppendEmpty() + point.SetDoubleVal(math.Pi) + point.SetTimestamp(ts) + tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + + assert.ElementsMatch(t, + tr.mapNumberMetrics("float64.test", metrics.Gauge, slice, []string{}), + []datadog.Metric{metrics.NewGauge("float64.test", uint64(ts), math.Pi, []string{})}, + ) + + assert.ElementsMatch(t, + tr.mapNumberMetrics("float64.delta.test", metrics.Count, slice, []string{}), + []datadog.Metric{metrics.NewCount("float64.delta.test", uint64(ts), math.Pi, []string{})}, + ) + + // With attribute tags + assert.ElementsMatch(t, + tr.mapNumberMetrics("float64.test", metrics.Gauge, slice, []string{"attribute_tag:attribute_value"}), + []datadog.Metric{metrics.NewGauge("float64.test", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"})}, + ) +} + +func seconds(i int) pdata.Timestamp { + return pdata.NewTimestampFromTime(time.Unix(int64(i), 0)) +} + +func TestMapIntMonotonicMetrics(t *testing.T) { + // Create list of values + deltas := []int64{1, 2, 200, 3, 7, 0} + cumulative := make([]int64, len(deltas)+1) + cumulative[0] = 0 + for i := 1; i < len(cumulative); i++ { + cumulative[i] = cumulative[i-1] + deltas[i-1] + } + + //Map to OpenTelemetry format + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(cumulative)) + for i, val := range cumulative { + point := slice.AppendEmpty() + point.SetIntVal(val) + point.SetTimestamp(seconds(i)) + } + + // Map to Datadog format + metricName := "metric.example" + expected := make([]datadog.Metric, len(deltas)) + for i, val := range deltas { + expected[i] = metrics.NewCount(metricName, uint64(seconds(i+1)), float64(val), []string{}) + } + + tr := newTranslator(zap.NewNop(), defaultCfg) + output := tr.mapNumberMonotonicMetrics(metricName, slice, []string{}) + + assert.ElementsMatch(t, output, expected) +} + +func TestMapIntMonotonicDifferentDimensions(t *testing.T) { + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + + // No tags + point := slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + + point = slice.AppendEmpty() + point.SetIntVal(20) + point.SetTimestamp(seconds(1)) + + // One tag: valA + point = slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + point.Attributes().InsertString("key1", "valA") + + point = slice.AppendEmpty() + point.SetIntVal(30) + point.SetTimestamp(seconds(1)) + point.Attributes().InsertString("key1", "valA") + + // same tag: valB + point = slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + point.Attributes().InsertString("key1", "valB") + + point = slice.AppendEmpty() + point.SetIntVal(40) + point.SetTimestamp(seconds(1)) + point.Attributes().InsertString("key1", "valB") + + tr := newTranslator(zap.NewNop(), defaultCfg) + + assert.ElementsMatch(t, + tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), + []datadog.Metric{ + metrics.NewCount(metricName, uint64(seconds(1)), 20, []string{}), + metrics.NewCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), + metrics.NewCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), + }, + ) +} + +func TestMapIntMonotonicWithReboot(t *testing.T) { + values := []int64{0, 30, 0, 20} + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(values)) + + for i, val := range values { + point := slice.AppendEmpty() + point.SetTimestamp(seconds(i)) + point.SetIntVal(val) + } + + tr := newTranslator(zap.NewNop(), defaultCfg) + assert.ElementsMatch(t, + tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), + []datadog.Metric{ + metrics.NewCount(metricName, uint64(seconds(1)), 30, []string{}), + metrics.NewCount(metricName, uint64(seconds(3)), 20, []string{}), + }, + ) +} + +func TestMapIntMonotonicOutOfOrder(t *testing.T) { + stamps := []int{1, 0, 2, 3} + values := []int64{0, 1, 2, 3} + + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(values)) + + for i, val := range values { + point := slice.AppendEmpty() + point.SetTimestamp(seconds(stamps[i])) + point.SetIntVal(val) + } + + tr := newTranslator(zap.NewNop(), defaultCfg) + assert.ElementsMatch(t, + tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), + []datadog.Metric{ + metrics.NewCount(metricName, uint64(seconds(2)), 2, []string{}), + metrics.NewCount(metricName, uint64(seconds(3)), 1, []string{}), + }, + ) +} + +func TestMapDoubleMonotonicMetrics(t *testing.T) { + deltas := []float64{1, 2, 200, 3, 7, 0} + cumulative := make([]float64, len(deltas)+1) + cumulative[0] = 0 + for i := 1; i < len(cumulative); i++ { + cumulative[i] = cumulative[i-1] + deltas[i-1] + } + + //Map to OpenTelemetry format + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(cumulative)) + for i, val := range cumulative { + point := slice.AppendEmpty() + point.SetDoubleVal(val) + point.SetTimestamp(seconds(i)) + } + + // Map to Datadog format + metricName := "metric.example" + expected := make([]datadog.Metric, len(deltas)) + for i, val := range deltas { + expected[i] = metrics.NewCount(metricName, uint64(seconds(i+1)), val, []string{}) + } + + tr := newTranslator(zap.NewNop(), defaultCfg) + output := tr.mapNumberMonotonicMetrics(metricName, slice, []string{}) + + assert.ElementsMatch(t, expected, output) +} + +func TestMapDoubleMonotonicDifferentDimensions(t *testing.T) { + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + + // No tags + point := slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + + point = slice.AppendEmpty() + point.SetDoubleVal(20) + point.SetTimestamp(seconds(1)) + + // One tag: valA + point = slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + point.Attributes().InsertString("key1", "valA") + + point = slice.AppendEmpty() + point.SetDoubleVal(30) + point.SetTimestamp(seconds(1)) + point.Attributes().InsertString("key1", "valA") + + // one tag: valB + point = slice.AppendEmpty() + point.SetTimestamp(seconds(0)) + point.Attributes().InsertString("key1", "valB") + + point = slice.AppendEmpty() + point.SetDoubleVal(40) + point.SetTimestamp(seconds(1)) + point.Attributes().InsertString("key1", "valB") + + tr := newTranslator(zap.NewNop(), defaultCfg) + + assert.ElementsMatch(t, + tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), + []datadog.Metric{ + metrics.NewCount(metricName, uint64(seconds(1)), 20, []string{}), + metrics.NewCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), + metrics.NewCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), + }, + ) +} + +func TestMapDoubleMonotonicWithReboot(t *testing.T) { + values := []float64{0, 30, 0, 20} + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(values)) + + for i, val := range values { + point := slice.AppendEmpty() + point.SetTimestamp(seconds(2 * i)) + point.SetDoubleVal(val) + } + + tr := newTranslator(zap.NewNop(), defaultCfg) + assert.ElementsMatch(t, + tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), + []datadog.Metric{ + metrics.NewCount(metricName, uint64(seconds(2)), 30, []string{}), + metrics.NewCount(metricName, uint64(seconds(6)), 20, []string{}), + }, + ) +} + +func TestMapDoubleMonotonicOutOfOrder(t *testing.T) { + stamps := []int{1, 0, 2, 3} + values := []float64{0, 1, 2, 3} + + metricName := "metric.example" + slice := pdata.NewNumberDataPointSlice() + slice.EnsureCapacity(len(values)) + + for i, val := range values { + point := slice.AppendEmpty() + point.SetTimestamp(seconds(stamps[i])) + point.SetDoubleVal(val) + } + + tr := newTranslator(zap.NewNop(), defaultCfg) + assert.ElementsMatch(t, + tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), + []datadog.Metric{ + metrics.NewCount(metricName, uint64(seconds(2)), 2, []string{}), + metrics.NewCount(metricName, uint64(seconds(3)), 1, []string{}), + }, + ) +} + +func TestMapDeltaHistogramMetrics(t *testing.T) { + ts := pdata.NewTimestampFromTime(time.Now()) + slice := pdata.NewHistogramDataPointSlice() + point := slice.AppendEmpty() + point.SetCount(20) + point.SetSum(math.Pi) + point.SetBucketCounts([]uint64{2, 18}) + point.SetExplicitBounds([]float64{0}) + point.SetTimestamp(ts) + + noBuckets := []datadog.Metric{ + metrics.NewCount("doubleHist.test.count", uint64(ts), 20, []string{}), + metrics.NewCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{}), + } + + buckets := []datadog.Metric{ + metrics.NewCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0"}), + metrics.NewCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf"}), + } + + tr := newTranslator(zap.NewNop(), defaultCfg) + delta := true + + tr.cfg.HistConfig.Mode = histogramModeNoBuckets + res, sl := tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) + require.Empty(t, sl) + assert.ElementsMatch(t, + res, // No buckets + noBuckets, + ) + + tr.cfg.HistConfig.Mode = histogramModeCounters + res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) + require.Empty(t, sl) + assert.ElementsMatch(t, + res, // buckets + append(noBuckets, buckets...), + ) + + // With attribute tags + noBucketsAttributeTags := []datadog.Metric{ + metrics.NewCount("doubleHist.test.count", uint64(ts), 20, []string{"attribute_tag:attribute_value"}), + metrics.NewCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"}), + } + + bucketsAttributeTags := []datadog.Metric{ + metrics.NewCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0", "attribute_tag:attribute_value"}), + metrics.NewCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf", "attribute_tag:attribute_value"}), + } + + tr.cfg.HistConfig.Mode = histogramModeNoBuckets + res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}) + require.Empty(t, sl) + assert.ElementsMatch(t, + res, // No buckets + noBucketsAttributeTags, + ) + + tr.cfg.HistConfig.Mode = histogramModeCounters + res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}) + require.Empty(t, sl) + assert.ElementsMatch(t, + res, // buckets + append(noBucketsAttributeTags, bucketsAttributeTags...), + ) +} + +func TestMapCumulativeHistogramMetrics(t *testing.T) { + slice := pdata.NewHistogramDataPointSlice() + point := slice.AppendEmpty() + point.SetCount(20) + point.SetSum(math.Pi) + point.SetBucketCounts([]uint64{2, 18}) + point.SetExplicitBounds([]float64{0}) + point.SetTimestamp(seconds(0)) + + point = slice.AppendEmpty() + point.SetCount(20 + 30) + point.SetSum(math.Pi + 20) + point.SetBucketCounts([]uint64{2 + 11, 18 + 2}) + point.SetExplicitBounds([]float64{0}) + point.SetTimestamp(seconds(2)) + + expected := []datadog.Metric{ + metrics.NewCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}), + metrics.NewCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}), + metrics.NewCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}), + metrics.NewCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), + } + + tr := newTranslator(zap.NewNop(), defaultCfg) + delta := false + + tr.cfg.HistConfig.Mode = histogramModeCounters + res, sl := tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) + require.Empty(t, sl) + assert.ElementsMatch(t, + res, + expected, + ) +} + +func TestLegacyBucketsTags(t *testing.T) { + // Test that passing the same tags slice doesn't reuse the slice. + cfg := config.MetricsConfig{} + tr := newTranslator(zap.NewNop(), cfg) + + tags := make([]string, 0, 10) + + pointOne := pdata.NewHistogramDataPoint() + pointOne.SetBucketCounts([]uint64{2, 18}) + pointOne.SetExplicitBounds([]float64{0}) + pointOne.SetTimestamp(seconds(0)) + seriesOne := tr.getLegacyBuckets("test.histogram.one", pointOne, true, tags) + + pointTwo := pdata.NewHistogramDataPoint() + pointTwo.SetBucketCounts([]uint64{2, 18}) + pointTwo.SetExplicitBounds([]float64{1}) + pointTwo.SetTimestamp(seconds(0)) + seriesTwo := tr.getLegacyBuckets("test.histogram.two", pointTwo, true, tags) + + assert.ElementsMatch(t, seriesOne[0].Tags, []string{"lower_bound:-inf", "upper_bound:0"}) + assert.ElementsMatch(t, seriesTwo[0].Tags, []string{"lower_bound:-inf", "upper_bound:1.0"}) +} + +func TestFormatFloat(t *testing.T) { + tests := []struct { + f float64 + s string + }{ + {f: 0, s: "0"}, + {f: 0.001, s: "0.001"}, + {f: 0.9, s: "0.9"}, + {f: 0.95, s: "0.95"}, + {f: 0.99, s: "0.99"}, + {f: 0.999, s: "0.999"}, + {f: 1, s: "1.0"}, + {f: 2, s: "2.0"}, + {f: math.Inf(1), s: "inf"}, + {f: math.Inf(-1), s: "-inf"}, + {f: math.NaN(), s: "nan"}, + {f: 1e-10, s: "1e-10"}, + } + + for _, test := range tests { + assert.Equal(t, test.s, formatFloat(test.f)) + } +} + +func exampleSummaryDataPointSlice(ts pdata.Timestamp, sum float64, count uint64) pdata.SummaryDataPointSlice { + slice := pdata.NewSummaryDataPointSlice() + point := slice.AppendEmpty() + point.SetCount(count) + point.SetSum(sum) + qSlice := point.QuantileValues() + + qMin := qSlice.AppendEmpty() + qMin.SetQuantile(0.0) + qMin.SetValue(0) + + qMedian := qSlice.AppendEmpty() + qMedian.SetQuantile(0.5) + qMedian.SetValue(100) + + q999 := qSlice.AppendEmpty() + q999.SetQuantile(0.999) + q999.SetValue(500) + + qMax := qSlice.AppendEmpty() + qMax.SetQuantile(1) + qMax.SetValue(600) + point.SetTimestamp(ts) + return slice +} + +func TestMapSummaryMetrics(t *testing.T) { + ts := pdata.NewTimestampFromTime(time.Now()) + slice := exampleSummaryDataPointSlice(ts, 10_001, 101) + + newTranslator := func(tags []string, quantiles bool) *Translator { + c := newTestCache() + c.cache.Set(c.metricDimensionsToMapKey("summary.example.count", tags), numberCounter{0, 1}, gocache.NoExpiration) + c.cache.Set(c.metricDimensionsToMapKey("summary.example.sum", tags), numberCounter{0, 1}, gocache.NoExpiration) + return New(c, componenttest.NewNopExporterCreateSettings(), config.MetricsConfig{Quantiles: quantiles}, testProvider("fallbackHostname")) + } + + noQuantiles := []datadog.Metric{ + metrics.NewCount("summary.example.count", uint64(ts), 100, []string{}), + metrics.NewCount("summary.example.sum", uint64(ts), 10_000, []string{}), + } + quantiles := []datadog.Metric{ + metrics.NewGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0"}), + metrics.NewGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5"}), + metrics.NewGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999"}), + metrics.NewGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0"}), + } + tr := newTranslator([]string{}, false) + assert.ElementsMatch(t, + tr.mapSummaryMetrics("summary.example", slice, []string{}), + noQuantiles, + ) + tr = newTranslator([]string{}, true) + assert.ElementsMatch(t, + tr.mapSummaryMetrics("summary.example", slice, []string{}), + append(noQuantiles, quantiles...), + ) + + noQuantilesAttr := []datadog.Metric{ + metrics.NewCount("summary.example.count", uint64(ts), 100, []string{"attribute_tag:attribute_value"}), + metrics.NewCount("summary.example.sum", uint64(ts), 10_000, []string{"attribute_tag:attribute_value"}), + } + quantilesAttr := []datadog.Metric{ + metrics.NewGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0", "attribute_tag:attribute_value"}), + metrics.NewGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5", "attribute_tag:attribute_value"}), + metrics.NewGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999", "attribute_tag:attribute_value"}), + metrics.NewGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0", "attribute_tag:attribute_value"}), + } + tr = newTranslator([]string{"attribute_tag:attribute_value"}, false) + assert.ElementsMatch(t, + tr.mapSummaryMetrics("summary.example", slice, []string{"attribute_tag:attribute_value"}), + noQuantilesAttr, + ) + tr = newTranslator([]string{"attribute_tag:attribute_value"}, true) + assert.ElementsMatch(t, + tr.mapSummaryMetrics("summary.example", slice, []string{"attribute_tag:attribute_value"}), + append(noQuantilesAttr, quantilesAttr...), + ) +} + +func TestRunningMetrics(t *testing.T) { + ms := pdata.NewMetrics() + rms := ms.ResourceMetrics() + + rm := rms.AppendEmpty() + resAttrs := rm.Resource().Attributes() + resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1")) + + rm = rms.AppendEmpty() + resAttrs = rm.Resource().Attributes() + resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1")) + + rm = rms.AppendEmpty() + resAttrs = rm.Resource().Attributes() + resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-2")) + + rms.AppendEmpty() + + cfg := config.MetricsConfig{} + tr := newTranslator(zap.NewNop(), cfg) + + series, sl := tr.MapMetrics(ms) + require.Empty(t, sl) + + runningHostnames := []string{} + + for _, metric := range series { + if *metric.Metric == "otel.datadog_exporter.metrics.running" { + if metric.Host != nil { + runningHostnames = append(runningHostnames, *metric.Host) + } + } + } + + assert.ElementsMatch(t, + runningHostnames, + []string{"fallbackHostname", "resource-hostname-1", "resource-hostname-2"}, + ) + +} + +const ( + testHostname = "res-hostname" +) + +func createTestMetrics() pdata.Metrics { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + + attrs := rm.Resource().Attributes() + attrs.InsertString(attributes.AttributeDatadogHostname, testHostname) + ilms := rm.InstrumentationLibraryMetrics() + + metricsArray := ilms.AppendEmpty().Metrics() + metricsArray.AppendEmpty() // first one is TypeNone to test that it's ignored + + // IntGauge + met := metricsArray.AppendEmpty() + met.SetName("int.gauge") + met.SetDataType(pdata.MetricDataTypeGauge) + dpsInt := met.Gauge().DataPoints() + dpInt := dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(1) + + // DoubleGauge + met = metricsArray.AppendEmpty() + met.SetName("double.gauge") + met.SetDataType(pdata.MetricDataTypeGauge) + dpsDouble := met.Gauge().DataPoints() + dpDouble := dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.Pi) + + // aggregation unspecified sum + met = metricsArray.AppendEmpty() + met.SetName("unspecified.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityUnspecified) + + // Int Sum (delta) + met = metricsArray.AppendEmpty() + met.SetName("int.delta.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsInt = met.Sum().DataPoints() + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(2) + + // Double Sum (delta) + met = metricsArray.AppendEmpty() + met.SetName("double.delta.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDouble = met.Sum().DataPoints() + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.E) + + // Int Sum (delta monotonic) + met = metricsArray.AppendEmpty() + met.SetName("int.delta.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsInt = met.Sum().DataPoints() + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(2) + + // Double Sum (delta monotonic) + met = metricsArray.AppendEmpty() + met.SetName("double.delta.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDouble = met.Sum().DataPoints() + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.E) + + // aggregation unspecified histogram + met = metricsArray.AppendEmpty() + met.SetName("unspecified.histogram") + met.SetDataType(pdata.MetricDataTypeHistogram) + met.Histogram().SetAggregationTemporality(pdata.AggregationTemporalityUnspecified) + + // Histogram (delta) + met = metricsArray.AppendEmpty() + met.SetName("double.histogram") + met.SetDataType(pdata.MetricDataTypeHistogram) + met.Histogram().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDoubleHist := met.Histogram().DataPoints() + dpDoubleHist := dpsDoubleHist.AppendEmpty() + dpDoubleHist.SetCount(20) + dpDoubleHist.SetSum(math.Phi) + dpDoubleHist.SetBucketCounts([]uint64{2, 18}) + dpDoubleHist.SetExplicitBounds([]float64{0}) + dpDoubleHist.SetTimestamp(seconds(0)) + + // Int Sum (cumulative) + met = metricsArray.AppendEmpty() + met.SetName("int.cumulative.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dpsInt = met.Sum().DataPoints() + dpsInt.EnsureCapacity(2) + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(4) + + // Double Sum (cumulative) + met = metricsArray.AppendEmpty() + met.SetName("double.cumulative.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dpsDouble = met.Sum().DataPoints() + dpsDouble.EnsureCapacity(2) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(4) + + // Int Sum (cumulative monotonic) + met = metricsArray.AppendEmpty() + met.SetName("int.cumulative.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + met.Sum().SetIsMonotonic(true) + dpsInt = met.Sum().DataPoints() + dpsInt.EnsureCapacity(2) + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(0)) + dpInt.SetIntVal(4) + dpInt = dpsInt.AppendEmpty() + dpInt.SetTimestamp(seconds(2)) + dpInt.SetIntVal(7) + + // Double Sum (cumulative monotonic) + met = metricsArray.AppendEmpty() + met.SetName("double.cumulative.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + met.Sum().SetIsMonotonic(true) + dpsDouble = met.Sum().DataPoints() + dpsDouble.EnsureCapacity(2) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(4) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(2)) + dpDouble.SetDoubleVal(4 + math.Pi) + + // Summary + met = metricsArray.AppendEmpty() + met.SetName("summary") + met.SetDataType(pdata.MetricDataTypeSummary) + slice := exampleSummaryDataPointSlice(seconds(0), 1, 1) + slice.CopyTo(met.Summary().DataPoints()) + + met = metricsArray.AppendEmpty() + met.SetName("summary") + met.SetDataType(pdata.MetricDataTypeSummary) + slice = exampleSummaryDataPointSlice(seconds(2), 10_001, 101) + slice.CopyTo(met.Summary().DataPoints()) + return md +} + +func removeRunningMetrics(series []datadog.Metric) []datadog.Metric { + filtered := []datadog.Metric{} + for _, m := range series { + if m.GetMetric() != "otel.datadog_exporter.metrics.running" { + filtered = append(filtered, m) + } + } + return filtered +} + +func testGauge(name string, val float64) datadog.Metric { + m := metrics.NewGauge(name, 0, val, []string{}) + m.SetHost(testHostname) + return m +} + +func testCount(name string, val float64, seconds uint64) datadog.Metric { + m := metrics.NewCount(name, seconds*1e9, val, []string{}) + m.SetHost(testHostname) + return m +} + +func TestMapMetrics(t *testing.T) { + md := createTestMetrics() + + core, observed := observer.New(zapcore.DebugLevel) + testLogger := zap.New(core) + tr := newTranslator(testLogger, defaultCfg) + series, sl := tr.MapMetrics(md) + require.Empty(t, sl) + + filtered := removeRunningMetrics(series) + assert.ElementsMatch(t, filtered, []datadog.Metric{ + testGauge("int.gauge", 1), + testGauge("double.gauge", math.Pi), + testCount("int.delta.sum", 2, 0), + testCount("double.delta.sum", math.E, 0), + testCount("int.delta.monotonic.sum", 2, 0), + testCount("double.delta.monotonic.sum", math.E, 0), + testCount("double.histogram.sum", math.Phi, 0), + testCount("double.histogram.count", 20, 0), + testCount("summary.sum", 10_000, 2), + testCount("summary.count", 100, 2), + testGauge("int.cumulative.sum", 4), + testGauge("double.cumulative.sum", 4), + testCount("int.cumulative.monotonic.sum", 3, 2), + testCount("double.cumulative.monotonic.sum", math.Pi, 2), + }) + + // One metric type was unknown or unsupported + assert.Equal(t, observed.FilterMessage("Unknown or unsupported metric type").Len(), 1) + // Two metric aggregation temporality was unknown or unsupported + assert.Equal(t, observed.FilterMessage("Unknown or unsupported aggregation temporality").Len(), 2) +} + +func createNaNMetrics() pdata.Metrics { + md := pdata.NewMetrics() + rms := md.ResourceMetrics() + rm := rms.AppendEmpty() + + attrs := rm.Resource().Attributes() + attrs.InsertString(attributes.AttributeDatadogHostname, testHostname) + ilms := rm.InstrumentationLibraryMetrics() + + metricsArray := ilms.AppendEmpty().Metrics() + + // DoubleGauge + met := metricsArray.AppendEmpty() + met.SetName("nan.gauge") + met.SetDataType(pdata.MetricDataTypeGauge) + dpsDouble := met.Gauge().DataPoints() + dpDouble := dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Double Sum (delta) + met = metricsArray.AppendEmpty() + met.SetName("nan.delta.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDouble = met.Sum().DataPoints() + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Double Sum (delta monotonic) + met = metricsArray.AppendEmpty() + met.SetName("nan.delta.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDouble = met.Sum().DataPoints() + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Histogram + met = metricsArray.AppendEmpty() + met.SetName("nan.histogram") + met.SetDataType(pdata.MetricDataTypeHistogram) + met.Histogram().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + dpsDoubleHist := met.Histogram().DataPoints() + dpDoubleHist := dpsDoubleHist.AppendEmpty() + dpDoubleHist.SetCount(20) + dpDoubleHist.SetSum(math.NaN()) + dpDoubleHist.SetBucketCounts([]uint64{2, 18}) + dpDoubleHist.SetTimestamp(seconds(0)) + + // Double Sum (cumulative) + met = metricsArray.AppendEmpty() + met.SetName("nan.cumulative.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dpsDouble = met.Sum().DataPoints() + dpsDouble.EnsureCapacity(2) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Double Sum (cumulative monotonic) + met = metricsArray.AppendEmpty() + met.SetName("nan.cumulative.monotonic.sum") + met.SetDataType(pdata.MetricDataTypeSum) + met.Sum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + met.Sum().SetIsMonotonic(true) + dpsDouble = met.Sum().DataPoints() + dpsDouble.EnsureCapacity(2) + dpDouble = dpsDouble.AppendEmpty() + dpDouble.SetTimestamp(seconds(0)) + dpDouble.SetDoubleVal(math.NaN()) + + // Summary + met = metricsArray.AppendEmpty() + met.SetName("nan.summary") + met.SetDataType(pdata.MetricDataTypeSummary) + slice := exampleSummaryDataPointSlice(seconds(0), math.NaN(), 1) + slice.CopyTo(met.Summary().DataPoints()) + + met = metricsArray.AppendEmpty() + met.SetName("nan.summary") + met.SetDataType(pdata.MetricDataTypeSummary) + slice = exampleSummaryDataPointSlice(seconds(2), 10_001, 101) + slice.CopyTo(met.Summary().DataPoints()) + return md +} + +func TestNaNMetrics(t *testing.T) { + md := createNaNMetrics() + + core, observed := observer.New(zapcore.DebugLevel) + testLogger := zap.New(core) + tr := newTranslator(testLogger, defaultCfg) + series, sl := tr.MapMetrics(md) + require.Empty(t, sl) + + filtered := removeRunningMetrics(series) + assert.ElementsMatch(t, filtered, []datadog.Metric{ + testCount("nan.histogram.count", 20, 0), + testCount("nan.summary.count", 100, 2), + }) + + // One metric type was unknown or unsupported + assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 7) +} diff --git a/pkg/otlp/model/translator/sketches_test.go b/pkg/otlp/model/translator/sketches_test.go new file mode 100644 index 000000000000..9a83b4131a70 --- /dev/null +++ b/pkg/otlp/model/translator/sketches_test.go @@ -0,0 +1,190 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "fmt" + "math" + "testing" + + "github.com/DataDog/datadog-agent/pkg/quantile" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" +) + +func TestHistogramSketches(t *testing.T) { + N := 1_000 + M := 50_000.0 + + // Given a cumulative distribution function for a distribution + // with support [0, N], generate an OTLP Histogram data point with N buckets, + // (-inf, 0], (0, 1], ..., (N-1, N], (N, inf) + // which contains N*M uniform samples of the distribution. + fromCDF := func(cdf func(x float64) float64) pdata.HistogramDataPoint { + p := pdata.NewHistogramDataPoint() + bounds := make([]float64, N+1) + buckets := make([]uint64, N+2) + buckets[0] = 0 + count := uint64(0) + for i := 0; i < N; i++ { + bounds[i] = float64(i) + // the bucket with bounds (i, i+1) has the + // cdf delta between the bounds as a value. + buckets[i+1] = uint64((cdf(float64(i+1)) - cdf(float64(i))) * M) + count += buckets[i+1] + } + bounds[N] = float64(N) + buckets[N+1] = 0 + p.SetExplicitBounds(bounds) + p.SetBucketCounts(buckets) + p.SetCount(count) + return p + } + + tests := []struct { + // distribution name + name string + // the cumulative distribution function (within [0,N]) + cdf func(x float64) float64 + // error tolerance for testing cdf(quantile(q)) ≈ q + epsilon float64 + }{ + { + // https://en.wikipedia.org/wiki/Continuous_uniform_distribution + name: "Uniform distribution (a=0,b=N)", + cdf: func(x float64) float64 { return x / float64(N) }, + epsilon: 0.01, + }, + { + // https://en.wikipedia.org/wiki/U-quadratic_distribution + name: "U-quadratic distribution (a=0,b=N)", + cdf: func(x float64) float64 { + a := 0.0 + b := float64(N) + alpha := 12.0 / math.Pow(b-a, 3) + beta := (b + a) / 2.0 + return alpha / 3 * (math.Pow(x-beta, 3) + math.Pow(beta-alpha, 3)) + }, + epsilon: 0.025, + }, + } + + defaultEps := 1.0 / 128.0 + tol := 1e-8 + cfg := quantile.Default() + tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + p := fromCDF(test.cdf) + sk := tr.getSketchBuckets("test", 0, p, true, []string{}).Points[0].Sketch + + // Check the minimum is 0.0 + assert.Equal(t, 0.0, sk.Quantile(cfg, 0)) + // Check the quantiles are approximately correct + for i := 1; i <= 99; i++ { + q := (float64(i)) / 100.0 + assert.InEpsilon(t, + // test that the CDF is the (approximate) inverse of the quantile function + test.cdf(sk.Quantile(cfg, q)), + q, + test.epsilon, + fmt.Sprintf("error too high for p%d", i), + ) + } + + cumulSum := uint64(0) + for i := 0; i < len(p.BucketCounts())-3; i++ { + { + q := float64(cumulSum) / float64(p.Count()) * (1 - tol) + quantileValue := sk.Quantile(cfg, q) + // quantileValue, if computed from the explicit buckets, would have to be <= bounds[i]. + // Because of remapping, it is <= bounds[i+1]. + // Because of DDSketch accuracy guarantees, it is <= bounds[i+1] * (1 + defaultEps) + maxExpectedQuantileValue := p.ExplicitBounds()[i+1] * (1 + defaultEps) + assert.LessOrEqual(t, quantileValue, maxExpectedQuantileValue) + } + + cumulSum += p.BucketCounts()[i+1] + + { + q := float64(cumulSum) / float64(p.Count()) * (1 + tol) + quantileValue := sk.Quantile(cfg, q) + // quantileValue, if computed from the explicit buckets, would have to be >= bounds[i+1]. + // Because of remapping, it is >= bounds[i]. + // Because of DDSketch accuracy guarantees, it is >= bounds[i] * (1 - defaultEps) + minExpectedQuantileValue := p.ExplicitBounds()[i] * (1 - defaultEps) + assert.GreaterOrEqual(t, quantileValue, minExpectedQuantileValue) + } + } + }) + } +} + +func TestInfiniteBounds(t *testing.T) { + + tests := []struct { + name string + getHist func() pdata.HistogramDataPoint + }{ + { + name: "(-inf, inf): 100", + getHist: func() pdata.HistogramDataPoint { + p := pdata.NewHistogramDataPoint() + p.SetExplicitBounds([]float64{}) + p.SetBucketCounts([]uint64{100}) + p.SetCount(100) + p.SetSum(0) + return p + }, + }, + { + name: "(-inf, 0]: 100, (0, +inf]: 100", + getHist: func() pdata.HistogramDataPoint { + p := pdata.NewHistogramDataPoint() + p.SetExplicitBounds([]float64{0}) + p.SetBucketCounts([]uint64{100, 100}) + p.SetCount(200) + p.SetSum(0) + return p + }, + }, + { + name: "(-inf, -1]: 100, (-1, 1]: 10, (1, +inf]: 100", + getHist: func() pdata.HistogramDataPoint { + p := pdata.NewHistogramDataPoint() + p.SetExplicitBounds([]float64{-1, 1}) + p.SetBucketCounts([]uint64{100, 10, 100}) + p.SetCount(210) + p.SetSum(0) + return p + }, + }, + } + + tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + for _, testInstance := range tests { + t.Run(testInstance.name, func(t *testing.T) { + p := testInstance.getHist() + sk := tr.getSketchBuckets("test", 0, p, true, []string{}).Points[0].Sketch + assert.InDelta(t, sk.Basic.Sum, p.Sum(), 1) + assert.Equal(t, uint64(sk.Basic.Cnt), p.Count()) + }) + } + +} diff --git a/pkg/otlp/model/translator/ttlcache.go b/pkg/otlp/model/translator/ttlcache.go new file mode 100644 index 000000000000..e093f2892a8f --- /dev/null +++ b/pkg/otlp/model/translator/ttlcache.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "sort" + "strings" + "time" + + gocache "github.com/patrickmn/go-cache" +) + +const ( + metricKeySeparator = string(byte(0)) +) + +type TTLCache struct { + cache *gocache.Cache +} + +// numberCounter keeps the value of a number +// monotonic counter at a given point in time +type numberCounter struct { + ts uint64 + value float64 +} + +func NewTTLCache(sweepInterval int64, deltaTTL int64) *TTLCache { + cache := gocache.New(time.Duration(deltaTTL)*time.Second, time.Duration(sweepInterval)*time.Second) + return &TTLCache{cache} +} + +// Uses a logic similar to what is done in the span processor to build metric keys: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b2327211df976e0a57ef0425493448988772a16b/processor/spanmetricsprocessor/processor.go#L353-L387 +// TODO: make this a public util function? +func concatDimensionValue(metricKeyBuilder *strings.Builder, value string) { + metricKeyBuilder.WriteString(value) + metricKeyBuilder.WriteString(metricKeySeparator) +} + +// metricDimensionsToMapKey maps name and tags to a string to use as an identifier +// The tags order does not matter +func (*TTLCache) metricDimensionsToMapKey(name string, tags []string) string { + var metricKeyBuilder strings.Builder + + dimensions := make([]string, len(tags)) + copy(dimensions, tags) + + dimensions = append(dimensions, name) + sort.Strings(dimensions) + + for _, dim := range dimensions { + concatDimensionValue(&metricKeyBuilder, dim) + } + return metricKeyBuilder.String() +} + +// putAndGetDiff submits a new value for a given metric and returns the difference with the +// last submitted value (ordered by timestamp). The diff value is only valid if `ok` is true. +func (t *TTLCache) putAndGetDiff(name string, tags []string, ts uint64, val float64) (dx float64, ok bool) { + key := t.metricDimensionsToMapKey(name, tags) + if c, found := t.cache.Get(key); found { + cnt := c.(numberCounter) + if cnt.ts > ts { + // We were given a point older than the one in memory so we drop it + // We keep the existing point in memory since it is the most recent + return 0, false + } + // if dx < 0, we assume there was a reset, thus we save the point + // but don't export it (it's the first one so we can't do a delta) + dx = val - cnt.value + ok = dx >= 0 + } + + t.cache.Set(key, numberCounter{ts, val}, gocache.DefaultExpiration) + return +} diff --git a/pkg/otlp/model/translator/ttlcache_test.go b/pkg/otlp/model/translator/ttlcache_test.go new file mode 100644 index 000000000000..1e768ae8d94c --- /dev/null +++ b/pkg/otlp/model/translator/ttlcache_test.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func newTestCache() *TTLCache { + cache := NewTTLCache(1800, 3600) + return cache +} +func TestPutAndGetDiff(t *testing.T) { + prevPts := newTestCache() + _, ok := prevPts.putAndGetDiff("test", []string{}, 1, 5) + // no diff since it is the first point + assert.False(t, ok) + _, ok = prevPts.putAndGetDiff("test", []string{}, 0, 0) + // no diff since ts is lower than the stored point + assert.False(t, ok) + _, ok = prevPts.putAndGetDiff("test", []string{}, 2, 2) + // no diff since the value is lower than the stored value + assert.False(t, ok) + dx, ok := prevPts.putAndGetDiff("test", []string{}, 3, 4) + // diff with the most recent point (2,2) + assert.True(t, ok) + assert.Equal(t, 2.0, dx) +} + +func TestMetricDimensionsToMapKey(t *testing.T) { + metricName := "metric.name" + c := newTestCache() + noTags := c.metricDimensionsToMapKey(metricName, []string{}) + someTags := c.metricDimensionsToMapKey(metricName, []string{"key1:val1", "key2:val2"}) + sameTags := c.metricDimensionsToMapKey(metricName, []string{"key2:val2", "key1:val1"}) + diffTags := c.metricDimensionsToMapKey(metricName, []string{"key3:val3"}) + + assert.NotEqual(t, noTags, someTags) + assert.NotEqual(t, someTags, diffTags) + assert.Equal(t, someTags, sameTags) +} + +func TestMetricDimensionsToMapKeyNoTagsChange(t *testing.T) { + // The original metricDimensionsToMapKey had an issue where: + // - if the capacity of the tags array passed to it was higher than its length + // - and the metric name is earlier (in alphabetical order) than one of the tags + // then the original tag array would be modified (without a reallocation, since there is enough capacity), + // and would contain a tag labeled as the metric name, while the final tag (in alphabetical order) + // would get left out. + // This test checks that this doesn't happen anymore. + + metricName := "a.metric.name" + c := newTestCache() + + originalTags := make([]string, 2, 3) + originalTags[0] = "key1:val1" + originalTags[1] = "key2:val2" + c.metricDimensionsToMapKey(metricName, originalTags) + assert.Equal(t, []string{"key1:val1", "key2:val2"}, originalTags) + +} From 50ace63d35a16e473bc2da7bd7773d2a1de9c45b Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 24 Sep 2021 10:56:43 +0200 Subject: [PATCH 2/4] Decouple translator configuration from Collector configuration (open-telemetry/opentelemetry-collector-contrib#5270) --- pkg/otlp/model/translator/config.go | 141 ++++++++++++++++++ .../model/translator/hostname_provider.go | 31 ++++ .../model/translator/metrics_translator.go | 68 +++++---- .../translator/metrics_translator_test.go | 90 ++++++----- pkg/otlp/model/translator/sketches_test.go | 6 +- pkg/otlp/model/translator/ttlcache.go | 10 +- pkg/otlp/model/translator/ttlcache_test.go | 4 +- 7 files changed, 267 insertions(+), 83 deletions(-) create mode 100644 pkg/otlp/model/translator/config.go create mode 100644 pkg/otlp/model/translator/hostname_provider.go diff --git a/pkg/otlp/model/translator/config.go b/pkg/otlp/model/translator/config.go new file mode 100644 index 000000000000..c980337f446b --- /dev/null +++ b/pkg/otlp/model/translator/config.go @@ -0,0 +1,141 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import "fmt" + +type translatorConfig struct { + // metrics export behavior + HistMode HistogramMode + SendCountSum bool + Quantiles bool + SendMonotonic bool + ResourceAttributesAsTags bool + + // cache configuration + sweepInterval int64 + deltaTTL int64 + + // hostname provider configuration + fallbackHostnameProvider HostnameProvider +} + +// Option is a translator creation option. +type Option func(*translatorConfig) error + +// WithDeltaTTL sets the delta TTL for cumulative metrics datapoints. +// By default, 3600 seconds are used. +func WithDeltaTTL(deltaTTL int64) Option { + return func(t *translatorConfig) error { + if deltaTTL <= 0 { + return fmt.Errorf("time to live must be positive: %d", deltaTTL) + } + t.deltaTTL = deltaTTL + t.sweepInterval = 1 + if t.deltaTTL > 1 { + t.sweepInterval = t.deltaTTL / 2 + } + return nil + } +} + +// WithFallbackHostnameProvider sets the fallback hostname provider. +// By default, an empty hostname is used as a fallback. +func WithFallbackHostnameProvider(provider HostnameProvider) Option { + return func(t *translatorConfig) error { + t.fallbackHostnameProvider = provider + return nil + } +} + +// WithQuantiles enables quantiles exporting for summary metrics. +func WithQuantiles() Option { + return func(t *translatorConfig) error { + t.Quantiles = true + return nil + } +} + +// WithResourceAttributesAsTags sets resource attributes as tags. +func WithResourceAttributesAsTags() Option { + return func(t *translatorConfig) error { + t.ResourceAttributesAsTags = true + return nil + } +} + +// HistogramMode is an export mode for OTLP Histogram metrics. +type HistogramMode string + +const ( + // HistogramModeOff disables bucket export. + HistogramModeNoBuckets HistogramMode = "nobuckets" + // HistogramModeCounters exports buckets as Datadog counts. + HistogramModeCounters HistogramMode = "counters" + // HistogramModeDistributions exports buckets as Datadog distributions. + HistogramModeDistributions HistogramMode = "distributions" +) + +// WithHistogramMode sets the histograms mode. +// The default mode is HistogramModeOff. +func WithHistogramMode(mode HistogramMode) Option { + return func(t *translatorConfig) error { + + switch mode { + case HistogramModeNoBuckets, HistogramModeCounters: + t.HistMode = mode + default: + return fmt.Errorf("unknown histogram mode: %q", mode) + } + return nil + } +} + +func WithCountSumMetrics() Option { + return func(t *translatorConfig) error { + t.SendCountSum = true + return nil + } +} + +// NumberMode is an export mode for OTLP Number metrics. +type NumberMode string + +const ( + // NumberModeCumulativeToDelta calculates delta for + // cumulative monotonic metrics in the client side and reports + // them as Datadog counts. + NumberModeCumulativeToDelta NumberMode = "cumulative_to_delta" + + // NumberModeRawValue reports the raw value for cumulative monotonic + // metrics as a Datadog gauge. + NumberModeRawValue NumberMode = "raw_value" +) + +// WithNumberMode sets the number mode. +// The default mode is NumberModeCumulativeToDelta. +func WithNumberMode(mode NumberMode) Option { + return func(t *translatorConfig) error { + switch mode { + case NumberModeCumulativeToDelta: + t.SendMonotonic = true + case NumberModeRawValue: + t.SendMonotonic = false + default: + return fmt.Errorf("unknown number mode: %q", mode) + } + return nil + } +} diff --git a/pkg/otlp/model/translator/hostname_provider.go b/pkg/otlp/model/translator/hostname_provider.go new file mode 100644 index 000000000000..829d0654add2 --- /dev/null +++ b/pkg/otlp/model/translator/hostname_provider.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import "context" + +// HostnameProvider gets a hostname +type HostnameProvider interface { + // Hostname gets the hostname from the machine. + Hostname(ctx context.Context) (string, error) +} + +var _ HostnameProvider = (*noHostProvider)(nil) + +type noHostProvider struct{} + +func (*noHostProvider) Hostname(context.Context) (string, error) { + return "", nil +} diff --git a/pkg/otlp/model/translator/metrics_translator.go b/pkg/otlp/model/translator/metrics_translator.go index 538ac2250d3e..89c32955fbd3 100644 --- a/pkg/otlp/model/translator/metrics_translator.go +++ b/pkg/otlp/model/translator/metrics_translator.go @@ -27,7 +27,6 @@ import ( "go.uber.org/zap" "gopkg.in/zorkian/go-datadog-api.v2" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/sketches" @@ -35,28 +34,34 @@ import ( const metricName string = "metric name" -const ( - histogramModeNoBuckets = "nobuckets" - histogramModeCounters = "counters" - histogramModeDistributions = "distributions" -) - -// HostnameProvider gets a hostname -type HostnameProvider interface { - // Hostname gets the hostname from the machine. - Hostname(ctx context.Context) (string, error) -} - type Translator struct { - prevPts *TTLCache - logger *zap.Logger - cfg config.MetricsConfig - buildInfo component.BuildInfo - fallbackHostnameProvider HostnameProvider + prevPts *ttlCache + logger *zap.Logger + cfg translatorConfig + buildInfo component.BuildInfo } -func New(cache *TTLCache, params component.ExporterCreateSettings, cfg config.MetricsConfig, fallbackHostProvider HostnameProvider) *Translator { - return &Translator{cache, params.Logger, cfg, params.BuildInfo, fallbackHostProvider} +func New(params component.ExporterCreateSettings, options ...Option) (*Translator, error) { + cfg := translatorConfig{ + HistMode: HistogramModeNoBuckets, + SendCountSum: true, + Quantiles: false, + SendMonotonic: true, + ResourceAttributesAsTags: false, + sweepInterval: 1800, + deltaTTL: 3600, + fallbackHostnameProvider: &noHostProvider{}, + } + + for _, opt := range options { + err := opt(&cfg) + if err != nil { + return nil, err + } + } + + cache := newTTLCache(cfg.sweepInterval, cfg.deltaTTL) + return &Translator{cache, params.Logger, cfg, params.BuildInfo}, nil } // getTags maps an attributeMap into a slice of Datadog tags @@ -232,7 +237,7 @@ func (t *Translator) getLegacyBuckets(name string, p pdata.HistogramDataPoint, d func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataPointSlice, delta bool, attrTags []string) (ms []datadog.Metric, sl sketches.SketchSeriesList) { // Allocate assuming none are nil and no buckets ms = make([]datadog.Metric, 0, 2*slice.Len()) - if t.cfg.HistConfig.Mode == histogramModeDistributions { + if t.cfg.HistMode == HistogramModeDistributions { sl = make(sketches.SketchSeriesList, 0, slice.Len()) } for i := 0; i < slice.Len(); i++ { @@ -241,7 +246,7 @@ func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataP tags := getTags(p.Attributes()) tags = append(tags, attrTags...) - if t.cfg.HistConfig.SendCountSum { + if t.cfg.SendCountSum { count := float64(p.Count()) countName := fmt.Sprintf("%s.count", name) if delta { @@ -251,7 +256,7 @@ func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataP } } - if t.cfg.HistConfig.SendCountSum { + if t.cfg.SendCountSum { sum := p.Sum() sumName := fmt.Sprintf("%s.sum", name) if !t.isSkippable(sumName, p.Sum()) { @@ -263,10 +268,10 @@ func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataP } } - switch t.cfg.HistConfig.Mode { - case histogramModeCounters: + switch t.cfg.HistMode { + case HistogramModeCounters: ms = append(ms, t.getLegacyBuckets(name, p, delta, tags)...) - case histogramModeDistributions: + case HistogramModeDistributions: sl = append(sl, t.getSketchBuckets(name, ts, p, true, tags)) } } @@ -349,7 +354,7 @@ func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPoint } // MapMetrics maps OTLP metrics into the DataDog format -func (t *Translator) MapMetrics(md pdata.Metrics) (series []datadog.Metric, sl sketches.SketchSeriesList) { +func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics) (series []datadog.Metric, sl sketches.SketchSeriesList, err error) { pushTime := uint64(time.Now().UTC().UnixNano()) rms := md.ResourceMetrics() seenHosts := make(map[string]struct{}) @@ -360,16 +365,15 @@ func (t *Translator) MapMetrics(md pdata.Metrics) (series []datadog.Metric, sl s // Only fetch attribute tags if they're not already converted into labels. // Otherwise some tags would be present twice in a metric's tag list. - if !t.cfg.ExporterConfig.ResourceAttributesAsTags { + if !t.cfg.ResourceAttributesAsTags { attributeTags = attributes.TagsFromAttributes(rm.Resource().Attributes()) } host, ok := attributes.HostnameFromAttributes(rm.Resource().Attributes()) if !ok { - fallbackHost, err := t.fallbackHostnameProvider.Hostname(context.Background()) - host = "" - if err == nil { - host = fallbackHost + host, err = t.cfg.fallbackHostnameProvider.Hostname(context.Background()) + if err != nil { + return nil, nil, fmt.Errorf("failed to get fallback host: %w", err) } } seenHosts[host] = struct{}{} diff --git a/pkg/otlp/model/translator/metrics_translator_test.go b/pkg/otlp/model/translator/metrics_translator_test.go index d2e798fb37a7..504f8825646c 100644 --- a/pkg/otlp/model/translator/metrics_translator_test.go +++ b/pkg/otlp/model/translator/metrics_translator_test.go @@ -31,19 +31,10 @@ import ( "go.uber.org/zap/zaptest/observer" "gopkg.in/zorkian/go-datadog-api.v2" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" ) -var defaultCfg = config.MetricsConfig{ - SendMonotonic: true, - HistConfig: config.HistogramConfig{ - Mode: histogramModeNoBuckets, - SendCountSum: true, - }, -} - func TestMetricValue(t *testing.T) { var ( name = "name" @@ -130,7 +121,7 @@ func (t testProvider) Hostname(context.Context) (string, error) { return string(t), nil } -func newTranslator(logger *zap.Logger, cfg config.MetricsConfig) *Translator { +func newTranslator(t *testing.T, logger *zap.Logger) *Translator { params := component.ExporterCreateSettings{ BuildInfo: component.BuildInfo{ Version: "1.0", @@ -139,7 +130,17 @@ func newTranslator(logger *zap.Logger, cfg config.MetricsConfig) *Translator { Logger: logger, }, } - return New(newTestCache(), params, cfg, testProvider("fallbackHostname")) + + tr, err := New( + params, + WithFallbackHostnameProvider(testProvider("fallbackHostname")), + WithCountSumMetrics(), + WithHistogramMode(HistogramModeNoBuckets), + WithNumberMode(NumberModeCumulativeToDelta), + ) + + require.NoError(t, err) + return tr } func TestMapIntMetrics(t *testing.T) { @@ -148,7 +149,7 @@ func TestMapIntMetrics(t *testing.T) { point := slice.AppendEmpty() point.SetIntVal(17) point.SetTimestamp(ts) - tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + tr := newTranslator(t, zap.NewNop()) assert.ElementsMatch(t, tr.mapNumberMetrics("int64.test", metrics.Gauge, slice, []string{}), @@ -173,7 +174,7 @@ func TestMapDoubleMetrics(t *testing.T) { point := slice.AppendEmpty() point.SetDoubleVal(math.Pi) point.SetTimestamp(ts) - tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + tr := newTranslator(t, zap.NewNop()) assert.ElementsMatch(t, tr.mapNumberMetrics("float64.test", metrics.Gauge, slice, []string{}), @@ -221,7 +222,7 @@ func TestMapIntMonotonicMetrics(t *testing.T) { expected[i] = metrics.NewCount(metricName, uint64(seconds(i+1)), float64(val), []string{}) } - tr := newTranslator(zap.NewNop(), defaultCfg) + tr := newTranslator(t, zap.NewNop()) output := tr.mapNumberMonotonicMetrics(metricName, slice, []string{}) assert.ElementsMatch(t, output, expected) @@ -259,7 +260,7 @@ func TestMapIntMonotonicDifferentDimensions(t *testing.T) { point.SetTimestamp(seconds(1)) point.Attributes().InsertString("key1", "valB") - tr := newTranslator(zap.NewNop(), defaultCfg) + tr := newTranslator(t, zap.NewNop()) assert.ElementsMatch(t, tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), @@ -283,7 +284,7 @@ func TestMapIntMonotonicWithReboot(t *testing.T) { point.SetIntVal(val) } - tr := newTranslator(zap.NewNop(), defaultCfg) + tr := newTranslator(t, zap.NewNop()) assert.ElementsMatch(t, tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), []datadog.Metric{ @@ -307,7 +308,7 @@ func TestMapIntMonotonicOutOfOrder(t *testing.T) { point.SetIntVal(val) } - tr := newTranslator(zap.NewNop(), defaultCfg) + tr := newTranslator(t, zap.NewNop()) assert.ElementsMatch(t, tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), []datadog.Metric{ @@ -341,7 +342,7 @@ func TestMapDoubleMonotonicMetrics(t *testing.T) { expected[i] = metrics.NewCount(metricName, uint64(seconds(i+1)), val, []string{}) } - tr := newTranslator(zap.NewNop(), defaultCfg) + tr := newTranslator(t, zap.NewNop()) output := tr.mapNumberMonotonicMetrics(metricName, slice, []string{}) assert.ElementsMatch(t, expected, output) @@ -379,8 +380,7 @@ func TestMapDoubleMonotonicDifferentDimensions(t *testing.T) { point.SetTimestamp(seconds(1)) point.Attributes().InsertString("key1", "valB") - tr := newTranslator(zap.NewNop(), defaultCfg) - + tr := newTranslator(t, zap.NewNop()) assert.ElementsMatch(t, tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), []datadog.Metric{ @@ -403,7 +403,7 @@ func TestMapDoubleMonotonicWithReboot(t *testing.T) { point.SetDoubleVal(val) } - tr := newTranslator(zap.NewNop(), defaultCfg) + tr := newTranslator(t, zap.NewNop()) assert.ElementsMatch(t, tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), []datadog.Metric{ @@ -427,7 +427,7 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) { point.SetDoubleVal(val) } - tr := newTranslator(zap.NewNop(), defaultCfg) + tr := newTranslator(t, zap.NewNop()) assert.ElementsMatch(t, tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), []datadog.Metric{ @@ -457,10 +457,10 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { metrics.NewCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf"}), } - tr := newTranslator(zap.NewNop(), defaultCfg) + tr := newTranslator(t, zap.NewNop()) delta := true - tr.cfg.HistConfig.Mode = histogramModeNoBuckets + tr.cfg.HistMode = HistogramModeNoBuckets res, sl := tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) require.Empty(t, sl) assert.ElementsMatch(t, @@ -468,7 +468,7 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { noBuckets, ) - tr.cfg.HistConfig.Mode = histogramModeCounters + tr.cfg.HistMode = HistogramModeCounters res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) require.Empty(t, sl) assert.ElementsMatch(t, @@ -487,7 +487,7 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { metrics.NewCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf", "attribute_tag:attribute_value"}), } - tr.cfg.HistConfig.Mode = histogramModeNoBuckets + tr.cfg.HistMode = HistogramModeNoBuckets res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}) require.Empty(t, sl) assert.ElementsMatch(t, @@ -495,7 +495,7 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { noBucketsAttributeTags, ) - tr.cfg.HistConfig.Mode = histogramModeCounters + tr.cfg.HistMode = HistogramModeCounters res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}) require.Empty(t, sl) assert.ElementsMatch(t, @@ -527,10 +527,10 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { metrics.NewCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), } - tr := newTranslator(zap.NewNop(), defaultCfg) + tr := newTranslator(t, zap.NewNop()) delta := false - tr.cfg.HistConfig.Mode = histogramModeCounters + tr.cfg.HistMode = HistogramModeCounters res, sl := tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) require.Empty(t, sl) assert.ElementsMatch(t, @@ -541,8 +541,7 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { func TestLegacyBucketsTags(t *testing.T) { // Test that passing the same tags slice doesn't reuse the slice. - cfg := config.MetricsConfig{} - tr := newTranslator(zap.NewNop(), cfg) + tr := newTranslator(t, zap.NewNop()) tags := make([]string, 0, 10) @@ -620,7 +619,14 @@ func TestMapSummaryMetrics(t *testing.T) { c := newTestCache() c.cache.Set(c.metricDimensionsToMapKey("summary.example.count", tags), numberCounter{0, 1}, gocache.NoExpiration) c.cache.Set(c.metricDimensionsToMapKey("summary.example.sum", tags), numberCounter{0, 1}, gocache.NoExpiration) - return New(c, componenttest.NewNopExporterCreateSettings(), config.MetricsConfig{Quantiles: quantiles}, testProvider("fallbackHostname")) + options := []Option{WithFallbackHostnameProvider(testProvider("fallbackHostname"))} + if quantiles { + options = append(options, WithQuantiles()) + } + tr, err := New(componenttest.NewNopExporterCreateSettings(), options...) + require.NoError(t, err) + tr.prevPts = c + return tr } noQuantiles := []datadog.Metric{ @@ -683,12 +689,12 @@ func TestRunningMetrics(t *testing.T) { resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-2")) rms.AppendEmpty() + ctx := context.Background() + tr := newTranslator(t, zap.NewNop()) - cfg := config.MetricsConfig{} - tr := newTranslator(zap.NewNop(), cfg) - - series, sl := tr.MapMetrics(ms) + series, sl, err := tr.MapMetrics(ctx, ms) require.Empty(t, sl) + require.NoError(t, err) runningHostnames := []string{} @@ -900,9 +906,11 @@ func TestMapMetrics(t *testing.T) { core, observed := observer.New(zapcore.DebugLevel) testLogger := zap.New(core) - tr := newTranslator(testLogger, defaultCfg) - series, sl := tr.MapMetrics(md) + ctx := context.Background() + tr := newTranslator(t, testLogger) + series, sl, err := tr.MapMetrics(ctx, md) require.Empty(t, sl) + require.NoError(t, err) filtered := removeRunningMetrics(series) assert.ElementsMatch(t, filtered, []datadog.Metric{ @@ -1023,9 +1031,11 @@ func TestNaNMetrics(t *testing.T) { core, observed := observer.New(zapcore.DebugLevel) testLogger := zap.New(core) - tr := newTranslator(testLogger, defaultCfg) - series, sl := tr.MapMetrics(md) + tr := newTranslator(t, testLogger) + ctx := context.Background() + series, sl, err := tr.MapMetrics(ctx, md) require.Empty(t, sl) + require.NoError(t, err) filtered := removeRunningMetrics(series) assert.ElementsMatch(t, filtered, []datadog.Metric{ diff --git a/pkg/otlp/model/translator/sketches_test.go b/pkg/otlp/model/translator/sketches_test.go index 9a83b4131a70..af274026e426 100644 --- a/pkg/otlp/model/translator/sketches_test.go +++ b/pkg/otlp/model/translator/sketches_test.go @@ -23,8 +23,6 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" ) func TestHistogramSketches(t *testing.T) { @@ -87,7 +85,7 @@ func TestHistogramSketches(t *testing.T) { defaultEps := 1.0 / 128.0 tol := 1e-8 cfg := quantile.Default() - tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + tr := newTranslator(t, zap.NewNop()) for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -177,7 +175,7 @@ func TestInfiniteBounds(t *testing.T) { }, } - tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + tr := newTranslator(t, zap.NewNop()) for _, testInstance := range tests { t.Run(testInstance.name, func(t *testing.T) { p := testInstance.getHist() diff --git a/pkg/otlp/model/translator/ttlcache.go b/pkg/otlp/model/translator/ttlcache.go index e093f2892a8f..1d164aa6cbf1 100644 --- a/pkg/otlp/model/translator/ttlcache.go +++ b/pkg/otlp/model/translator/ttlcache.go @@ -26,7 +26,7 @@ const ( metricKeySeparator = string(byte(0)) ) -type TTLCache struct { +type ttlCache struct { cache *gocache.Cache } @@ -37,9 +37,9 @@ type numberCounter struct { value float64 } -func NewTTLCache(sweepInterval int64, deltaTTL int64) *TTLCache { +func newTTLCache(sweepInterval int64, deltaTTL int64) *ttlCache { cache := gocache.New(time.Duration(deltaTTL)*time.Second, time.Duration(sweepInterval)*time.Second) - return &TTLCache{cache} + return &ttlCache{cache} } // Uses a logic similar to what is done in the span processor to build metric keys: @@ -52,7 +52,7 @@ func concatDimensionValue(metricKeyBuilder *strings.Builder, value string) { // metricDimensionsToMapKey maps name and tags to a string to use as an identifier // The tags order does not matter -func (*TTLCache) metricDimensionsToMapKey(name string, tags []string) string { +func (*ttlCache) metricDimensionsToMapKey(name string, tags []string) string { var metricKeyBuilder strings.Builder dimensions := make([]string, len(tags)) @@ -69,7 +69,7 @@ func (*TTLCache) metricDimensionsToMapKey(name string, tags []string) string { // putAndGetDiff submits a new value for a given metric and returns the difference with the // last submitted value (ordered by timestamp). The diff value is only valid if `ok` is true. -func (t *TTLCache) putAndGetDiff(name string, tags []string, ts uint64, val float64) (dx float64, ok bool) { +func (t *ttlCache) putAndGetDiff(name string, tags []string, ts uint64, val float64) (dx float64, ok bool) { key := t.metricDimensionsToMapKey(name, tags) if c, found := t.cache.Get(key); found { cnt := c.(numberCounter) diff --git a/pkg/otlp/model/translator/ttlcache_test.go b/pkg/otlp/model/translator/ttlcache_test.go index 1e768ae8d94c..c9124fb88d53 100644 --- a/pkg/otlp/model/translator/ttlcache_test.go +++ b/pkg/otlp/model/translator/ttlcache_test.go @@ -20,8 +20,8 @@ import ( "github.com/stretchr/testify/assert" ) -func newTestCache() *TTLCache { - cache := NewTTLCache(1800, 3600) +func newTestCache() *ttlCache { + cache := newTTLCache(1800, 3600) return cache } func TestPutAndGetDiff(t *testing.T) { From de520f4d2809e2baee6c651c127555b8fbf2d4e0 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 24 Sep 2021 11:36:06 +0200 Subject: [PATCH 3/4] Use a `Consumer` interface for decoupling from zorkian's package (open-telemetry/opentelemetry-collector-contrib#5315) --- pkg/otlp/model/translator/consumer.go | 70 +++ .../model/translator/metrics_translator.go | 179 +++---- .../translator/metrics_translator_test.go | 440 +++++++++--------- pkg/otlp/model/translator/sketches_test.go | 29 +- 4 files changed, 419 insertions(+), 299 deletions(-) create mode 100644 pkg/otlp/model/translator/consumer.go diff --git a/pkg/otlp/model/translator/consumer.go b/pkg/otlp/model/translator/consumer.go new file mode 100644 index 000000000000..5cc6fb544a94 --- /dev/null +++ b/pkg/otlp/model/translator/consumer.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "context" + + "github.com/DataDog/datadog-agent/pkg/quantile" +) + +type MetricDataType int + +const ( + // Gauge is the Datadog Gauge metric type. + Gauge MetricDataType = iota + // Count is the Datadog Count metric type. + Count +) + +// TimeSeriesConsumer is timeseries consumer. +type TimeSeriesConsumer interface { + // ConsumeTimeSeries consumes a timeseries-style metric. + ConsumeTimeSeries( + ctx context.Context, + name string, + typ MetricDataType, + timestamp uint64, + value float64, + tags []string, + host string, + ) +} + +// SketchConsumer is a pkg/quantile sketch consumer. +type SketchConsumer interface { + // ConsumeSketch consumes a pkg/quantile-style sketch. + ConsumeSketch( + ctx context.Context, + name string, + timestamp uint64, + sketch *quantile.Sketch, + tags []string, + host string, + ) +} + +// Consumer is a metrics consumer. +type Consumer interface { + TimeSeriesConsumer + SketchConsumer +} + +// HostConsumer is a hostname consumer. +// It is an optional interface that can be implemented by a Consumer. +type HostConsumer interface { + // ConsumeHost consumes a hostname. + ConsumeHost(host string) +} diff --git a/pkg/otlp/model/translator/metrics_translator.go b/pkg/otlp/model/translator/metrics_translator.go index 89c32955fbd3..7a23e7e75dd3 100644 --- a/pkg/otlp/model/translator/metrics_translator.go +++ b/pkg/otlp/model/translator/metrics_translator.go @@ -19,29 +19,23 @@ import ( "fmt" "math" "strconv" - "time" "github.com/DataDog/datadog-agent/pkg/quantile" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" - "gopkg.in/zorkian/go-datadog-api.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/sketches" ) const metricName string = "metric name" type Translator struct { - prevPts *ttlCache - logger *zap.Logger - cfg translatorConfig - buildInfo component.BuildInfo + prevPts *ttlCache + logger *zap.Logger + cfg translatorConfig } -func New(params component.ExporterCreateSettings, options ...Option) (*Translator, error) { +func New(logger *zap.Logger, options ...Option) (*Translator, error) { cfg := translatorConfig{ HistMode: HistogramModeNoBuckets, SendCountSum: true, @@ -61,7 +55,7 @@ func New(params component.ExporterCreateSettings, options ...Option) (*Translato } cache := newTTLCache(cfg.sweepInterval, cfg.deltaTTL) - return &Translator{cache, params.Logger, cfg, params.BuildInfo}, nil + return &Translator{cache, logger, cfg}, nil } // getTags maps an attributeMap into a slice of Datadog tags @@ -100,8 +94,16 @@ func (t *Translator) isSkippable(name string, v float64) bool { } // mapNumberMetrics maps double datapoints into Datadog metrics -func (t *Translator) mapNumberMetrics(name string, dt metrics.MetricDataType, slice pdata.NumberDataPointSlice, attrTags []string) []datadog.Metric { - ms := make([]datadog.Metric, 0, slice.Len()) +func (t *Translator) mapNumberMetrics( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + dt MetricDataType, + slice pdata.NumberDataPointSlice, + attrTags []string, + host string, +) { + for i := 0; i < slice.Len(); i++ { p := slice.At(i) tags := getTags(p.Attributes()) @@ -118,16 +120,19 @@ func (t *Translator) mapNumberMetrics(name string, dt metrics.MetricDataType, sl continue } - ms = append(ms, - metrics.NewMetric(name, dt, uint64(p.Timestamp()), val, tags), - ) + consumer.ConsumeTimeSeries(ctx, name, dt, uint64(p.Timestamp()), val, tags, host) } - return ms } // mapNumberMonotonicMetrics maps monotonic datapoints into Datadog metrics -func (t *Translator) mapNumberMonotonicMetrics(name string, slice pdata.NumberDataPointSlice, attrTags []string) []datadog.Metric { - ms := make([]datadog.Metric, 0, slice.Len()) +func (t *Translator) mapNumberMonotonicMetrics( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + slice pdata.NumberDataPointSlice, + attrTags []string, + host string, +) { for i := 0; i < slice.Len(); i++ { p := slice.At(i) ts := uint64(p.Timestamp()) @@ -147,10 +152,9 @@ func (t *Translator) mapNumberMonotonicMetrics(name string, slice pdata.NumberDa } if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, val); ok { - ms = append(ms, metrics.NewCount(name, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, name, Count, ts, dx, tags, host) } } - return ms } func getBounds(p pdata.HistogramDataPoint, idx int) (lowerBound float64, upperBound float64) { @@ -166,7 +170,16 @@ func getBounds(p pdata.HistogramDataPoint, idx int) (lowerBound float64, upperBo return } -func (t *Translator) getSketchBuckets(name string, ts uint64, p pdata.HistogramDataPoint, delta bool, tags []string) sketches.SketchSeries { +func (t *Translator) getSketchBuckets( + ctx context.Context, + consumer SketchConsumer, + name string, + ts uint64, + p pdata.HistogramDataPoint, + delta bool, + tags []string, + host string, +) { as := &quantile.Agent{} for j := range p.BucketCounts() { lowerBound, upperBound := getBounds(p, j) @@ -186,21 +199,21 @@ func (t *Translator) getSketchBuckets(name string, ts uint64, p pdata.HistogramD } } - return sketches.SketchSeries{ - Name: name, - Tags: tags, - Interval: 1, - Points: []sketches.SketchPoint{{ - Ts: int64(p.Timestamp() / 1e9), - Sketch: as.Finish(), - }}, - } + + consumer.ConsumeSketch(ctx, name, ts, as.Finish(), tags, host) } -func (t *Translator) getLegacyBuckets(name string, p pdata.HistogramDataPoint, delta bool, tags []string) []datadog.Metric { +func (t *Translator) getLegacyBuckets( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + p pdata.HistogramDataPoint, + delta bool, + tags []string, + host string, +) { // We have a single metric, 'bucket', which is tagged with the bucket bounds. See: // https://github.com/DataDog/integrations-core/blob/7.30.1/datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/transformers/histogram.py - ms := make([]datadog.Metric, 0, len(p.BucketCounts())) fullName := fmt.Sprintf("%s.bucket", name) for idx, val := range p.BucketCounts() { lowerBound, upperBound := getBounds(p, idx) @@ -213,12 +226,11 @@ func (t *Translator) getLegacyBuckets(name string, p pdata.HistogramDataPoint, d count := float64(val) ts := uint64(p.Timestamp()) if delta { - ms = append(ms, metrics.NewCount(fullName, ts, count, bucketTags)) + consumer.ConsumeTimeSeries(ctx, fullName, Count, ts, count, bucketTags, host) } else if dx, ok := t.prevPts.putAndGetDiff(fullName, bucketTags, ts, count); ok { - ms = append(ms, metrics.NewCount(fullName, ts, dx, bucketTags)) + consumer.ConsumeTimeSeries(ctx, fullName, Count, ts, dx, bucketTags, host) } } - return ms } // mapHistogramMetrics maps double histogram metrics slices to Datadog metrics @@ -234,12 +246,15 @@ func (t *Translator) getLegacyBuckets(name string, p pdata.HistogramDataPoint, d // We follow a similar approach to our OpenMetrics check: // we report sum and count by default; buckets count can also // be reported (opt-in) tagged by lower bound. -func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataPointSlice, delta bool, attrTags []string) (ms []datadog.Metric, sl sketches.SketchSeriesList) { - // Allocate assuming none are nil and no buckets - ms = make([]datadog.Metric, 0, 2*slice.Len()) - if t.cfg.HistMode == HistogramModeDistributions { - sl = make(sketches.SketchSeriesList, 0, slice.Len()) - } +func (t *Translator) mapHistogramMetrics( + ctx context.Context, + consumer Consumer, + name string, + slice pdata.HistogramDataPointSlice, + delta bool, + attrTags []string, + host string, +) { for i := 0; i < slice.Len(); i++ { p := slice.At(i) ts := uint64(p.Timestamp()) @@ -250,9 +265,9 @@ func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataP count := float64(p.Count()) countName := fmt.Sprintf("%s.count", name) if delta { - ms = append(ms, metrics.NewCount(countName, ts, count, tags)) + consumer.ConsumeTimeSeries(ctx, countName, Count, ts, count, tags, host) } else if dx, ok := t.prevPts.putAndGetDiff(countName, tags, ts, count); ok { - ms = append(ms, metrics.NewCount(countName, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, countName, Count, ts, dx, tags, host) } } @@ -261,21 +276,20 @@ func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataP sumName := fmt.Sprintf("%s.sum", name) if !t.isSkippable(sumName, p.Sum()) { if delta { - ms = append(ms, metrics.NewCount(sumName, ts, sum, tags)) + consumer.ConsumeTimeSeries(ctx, sumName, Count, ts, sum, tags, host) } else if dx, ok := t.prevPts.putAndGetDiff(sumName, tags, ts, sum); ok { - ms = append(ms, metrics.NewCount(sumName, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, sumName, Count, ts, dx, tags, host) } } } switch t.cfg.HistMode { case HistogramModeCounters: - ms = append(ms, t.getLegacyBuckets(name, p, delta, tags)...) + t.getLegacyBuckets(ctx, consumer, name, p, delta, tags, host) case HistogramModeDistributions: - sl = append(sl, t.getSketchBuckets(name, ts, p, true, tags)) + t.getSketchBuckets(ctx, consumer, name, ts, p, true, tags, host) } } - return } // formatFloat formats a float number as close as possible to what @@ -306,9 +320,15 @@ func getQuantileTag(quantile float64) string { } // mapSummaryMetrics maps summary datapoints into Datadog metrics -func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPointSlice, attrTags []string) []datadog.Metric { - // Allocate assuming none are nil and no quantiles - ms := make([]datadog.Metric, 0, 2*slice.Len()) +func (t *Translator) mapSummaryMetrics( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + slice pdata.SummaryDataPointSlice, + attrTags []string, + host string, +) { + for i := 0; i < slice.Len(); i++ { p := slice.At(i) ts := uint64(p.Timestamp()) @@ -319,7 +339,7 @@ func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPoint { countName := fmt.Sprintf("%s.count", name) if dx, ok := t.prevPts.putAndGetDiff(countName, tags, ts, float64(p.Count())); ok && !t.isSkippable(countName, dx) { - ms = append(ms, metrics.NewCount(countName, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, countName, Count, ts, dx, tags, host) } } @@ -327,7 +347,7 @@ func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPoint sumName := fmt.Sprintf("%s.sum", name) if !t.isSkippable(sumName, p.Sum()) { if dx, ok := t.prevPts.putAndGetDiff(sumName, tags, ts, p.Sum()); ok { - ms = append(ms, metrics.NewCount(sumName, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, sumName, Count, ts, dx, tags, host) } } } @@ -344,20 +364,15 @@ func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPoint quantileTags := []string{getQuantileTag(q.Quantile())} quantileTags = append(quantileTags, tags...) - ms = append(ms, - metrics.NewGauge(fullName, ts, q.Value(), quantileTags), - ) + consumer.ConsumeTimeSeries(ctx, fullName, Gauge, ts, q.Value(), quantileTags, host) } } } - return ms } // MapMetrics maps OTLP metrics into the DataDog format -func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics) (series []datadog.Metric, sl sketches.SketchSeriesList, err error) { - pushTime := uint64(time.Now().UTC().UnixNano()) +func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics, consumer Consumer) error { rms := md.ResourceMetrics() - seenHosts := make(map[string]struct{}) for i := 0; i < rms.Len(); i++ { rm := rms.At(i) @@ -371,12 +386,17 @@ func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics) (series [ host, ok := attributes.HostnameFromAttributes(rm.Resource().Attributes()) if !ok { + var err error host, err = t.cfg.fallbackHostnameProvider.Hostname(context.Background()) if err != nil { - return nil, nil, fmt.Errorf("failed to get fallback host: %w", err) + return fmt.Errorf("failed to get fallback host: %w", err) } } - seenHosts[host] = struct{}{} + + // Track hosts if the consumer is a HostConsumer. + if c, ok := consumer.(HostConsumer); ok { + c.ConsumeHost(host) + } ilms := rm.InstrumentationLibraryMetrics() for j := 0; j < ilms.Len(); j++ { @@ -384,21 +404,19 @@ func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics) (series [ metricsArray := ilm.Metrics() for k := 0; k < metricsArray.Len(); k++ { md := metricsArray.At(k) - var datapoints []datadog.Metric - var sketchesPoints sketches.SketchSeriesList switch md.DataType() { case pdata.MetricDataTypeGauge: - datapoints = t.mapNumberMetrics(md.Name(), metrics.Gauge, md.Gauge().DataPoints(), attributeTags) + t.mapNumberMetrics(ctx, consumer, md.Name(), Gauge, md.Gauge().DataPoints(), attributeTags, host) case pdata.MetricDataTypeSum: switch md.Sum().AggregationTemporality() { case pdata.AggregationTemporalityCumulative: if t.cfg.SendMonotonic && isCumulativeMonotonic(md) { - datapoints = t.mapNumberMonotonicMetrics(md.Name(), md.Sum().DataPoints(), attributeTags) + t.mapNumberMonotonicMetrics(ctx, consumer, md.Name(), md.Sum().DataPoints(), attributeTags, host) } else { - datapoints = t.mapNumberMetrics(md.Name(), metrics.Gauge, md.Sum().DataPoints(), attributeTags) + t.mapNumberMetrics(ctx, consumer, md.Name(), Gauge, md.Sum().DataPoints(), attributeTags, host) } case pdata.AggregationTemporalityDelta: - datapoints = t.mapNumberMetrics(md.Name(), metrics.Count, md.Sum().DataPoints(), attributeTags) + t.mapNumberMetrics(ctx, consumer, md.Name(), Count, md.Sum().DataPoints(), attributeTags, host) default: // pdata.AggregationTemporalityUnspecified or any other not supported type t.logger.Debug("Unknown or unsupported aggregation temporality", zap.String(metricName, md.Name()), @@ -410,7 +428,7 @@ func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics) (series [ switch md.Histogram().AggregationTemporality() { case pdata.AggregationTemporalityCumulative, pdata.AggregationTemporalityDelta: delta := md.Histogram().AggregationTemporality() == pdata.AggregationTemporalityDelta - datapoints, sketchesPoints = t.mapHistogramMetrics(md.Name(), md.Histogram().DataPoints(), delta, attributeTags) + t.mapHistogramMetrics(ctx, consumer, md.Name(), md.Histogram().DataPoints(), delta, attributeTags, host) default: // pdata.AggregationTemporalityUnspecified or any other not supported type t.logger.Debug("Unknown or unsupported aggregation temporality", zap.String("metric name", md.Name()), @@ -419,31 +437,14 @@ func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics) (series [ continue } case pdata.MetricDataTypeSummary: - datapoints = t.mapSummaryMetrics(md.Name(), md.Summary().DataPoints(), attributeTags) + t.mapSummaryMetrics(ctx, consumer, md.Name(), md.Summary().DataPoints(), attributeTags, host) default: // pdata.MetricDataTypeNone or any other not supported type t.logger.Debug("Unknown or unsupported metric type", zap.String(metricName, md.Name()), zap.Any("data type", md.DataType())) continue } - - for i := range datapoints { - datapoints[i].SetHost(host) - } - - for i := range sl { - sl[i].Host = host - } - - series = append(series, datapoints...) - sl = append(sl, sketchesPoints...) } } } - for host := range seenHosts { - // Report the host as running - runningMetric := metrics.DefaultMetrics("metrics", host, pushTime, t.buildInfo) - series = append(series, runningMetric...) - } - - return + return nil } diff --git a/pkg/otlp/model/translator/metrics_translator_test.go b/pkg/otlp/model/translator/metrics_translator_test.go index 504f8825646c..d1e41847db7b 100644 --- a/pkg/otlp/model/translator/metrics_translator_test.go +++ b/pkg/otlp/model/translator/metrics_translator_test.go @@ -20,34 +20,18 @@ import ( "testing" "time" + "github.com/DataDog/datadog-agent/pkg/quantile" gocache "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" - "gopkg.in/zorkian/go-datadog-api.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" ) -func TestMetricValue(t *testing.T) { - var ( - name = "name" - value = math.Pi - ts = uint64(time.Now().UnixNano()) - tags = []string{"tool:opentelemetry", "version:0.1.0"} - ) - - metric := metrics.NewGauge(name, ts, value, tags) - assert.Equal(t, string(metrics.Gauge), metric.GetType()) - assert.Equal(t, tags, metric.Tags) -} - func TestGetTags(t *testing.T) { attributes := pdata.NewAttributeMapFromMap(map[string]pdata.AttributeValue{ "key1": pdata.NewAttributeValueString("val1"), @@ -122,17 +106,8 @@ func (t testProvider) Hostname(context.Context) (string, error) { } func newTranslator(t *testing.T, logger *zap.Logger) *Translator { - params := component.ExporterCreateSettings{ - BuildInfo: component.BuildInfo{ - Version: "1.0", - }, - TelemetrySettings: component.TelemetrySettings{ - Logger: logger, - }, - } - tr, err := New( - params, + logger, WithFallbackHostnameProvider(testProvider("fallbackHostname")), WithCountSumMetrics(), WithHistogramMode(HistogramModeNoBuckets), @@ -143,28 +118,79 @@ func newTranslator(t *testing.T, logger *zap.Logger) *Translator { return tr } +type metric struct { + name string + typ MetricDataType + timestamp uint64 + value float64 + tags []string + host string +} + +var _ TimeSeriesConsumer = (*mockTimeSeriesConsumer)(nil) + +type mockTimeSeriesConsumer struct { + metrics []metric +} + +func (m *mockTimeSeriesConsumer) ConsumeTimeSeries( + _ context.Context, + name string, + typ MetricDataType, + ts uint64, + val float64, + tags []string, + host string, +) { + m.metrics = append(m.metrics, + metric{ + name: name, + typ: typ, + timestamp: ts, + value: val, + tags: tags, + host: host, + }, + ) +} + +func newGauge(name string, ts uint64, val float64, tags []string) metric { + return metric{name: name, typ: Gauge, timestamp: ts, value: val, tags: tags} +} + +func newCount(name string, ts uint64, val float64, tags []string) metric { + return metric{name: name, typ: Count, timestamp: ts, value: val, tags: tags} +} + func TestMapIntMetrics(t *testing.T) { ts := pdata.NewTimestampFromTime(time.Now()) slice := pdata.NewNumberDataPointSlice() point := slice.AppendEmpty() point.SetIntVal(17) point.SetTimestamp(ts) + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "int64.test", Gauge, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("int64.test", metrics.Gauge, slice, []string{}), - []datadog.Metric{metrics.NewGauge("int64.test", uint64(ts), 17, []string{})}, + consumer.metrics, + []metric{newGauge("int64.test", uint64(ts), 17, []string{})}, ) + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "int64.delta.test", Count, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("int64.delta.test", metrics.Count, slice, []string{}), - []datadog.Metric{metrics.NewCount("int64.delta.test", uint64(ts), 17, []string{})}, + consumer.metrics, + []metric{newCount("int64.delta.test", uint64(ts), 17, []string{})}, ) // With attribute tags + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "int64.test", Gauge, slice, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("int64.test", metrics.Gauge, slice, []string{"attribute_tag:attribute_value"}), - []datadog.Metric{metrics.NewGauge("int64.test", uint64(ts), 17, []string{"attribute_tag:attribute_value"})}, + consumer.metrics, + []metric{newGauge("int64.test", uint64(ts), 17, []string{"attribute_tag:attribute_value"})}, ) } @@ -174,22 +200,29 @@ func TestMapDoubleMetrics(t *testing.T) { point := slice.AppendEmpty() point.SetDoubleVal(math.Pi) point.SetTimestamp(ts) + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "float64.test", Gauge, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("float64.test", metrics.Gauge, slice, []string{}), - []datadog.Metric{metrics.NewGauge("float64.test", uint64(ts), math.Pi, []string{})}, + consumer.metrics, + []metric{newGauge("float64.test", uint64(ts), math.Pi, []string{})}, ) + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "float64.delta.test", Count, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("float64.delta.test", metrics.Count, slice, []string{}), - []datadog.Metric{metrics.NewCount("float64.delta.test", uint64(ts), math.Pi, []string{})}, + consumer.metrics, + []metric{newCount("float64.delta.test", uint64(ts), math.Pi, []string{})}, ) // With attribute tags + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "float64.test", Gauge, slice, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("float64.test", metrics.Gauge, slice, []string{"attribute_tag:attribute_value"}), - []datadog.Metric{metrics.NewGauge("float64.test", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"})}, + consumer.metrics, + []metric{newGauge("float64.test", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"})}, ) } @@ -217,15 +250,17 @@ func TestMapIntMonotonicMetrics(t *testing.T) { // Map to Datadog format metricName := "metric.example" - expected := make([]datadog.Metric, len(deltas)) + expected := make([]metric, len(deltas)) for i, val := range deltas { - expected[i] = metrics.NewCount(metricName, uint64(seconds(i+1)), float64(val), []string{}) + expected[i] = newCount(metricName, uint64(seconds(i+1)), float64(val), []string{}) } + ctx := context.Background() + consumer := &mockTimeSeriesConsumer{} tr := newTranslator(t, zap.NewNop()) - output := tr.mapNumberMonotonicMetrics(metricName, slice, []string{}) + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") - assert.ElementsMatch(t, output, expected) + assert.ElementsMatch(t, expected, consumer.metrics) } func TestMapIntMonotonicDifferentDimensions(t *testing.T) { @@ -260,14 +295,17 @@ func TestMapIntMonotonicDifferentDimensions(t *testing.T) { point.SetTimestamp(seconds(1)) point.Attributes().InsertString("key1", "valB") + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(1)), 20, []string{}), - metrics.NewCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), - metrics.NewCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(1)), 20, []string{}), + newCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), + newCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), }, ) } @@ -284,12 +322,15 @@ func TestMapIntMonotonicWithReboot(t *testing.T) { point.SetIntVal(val) } + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(1)), 30, []string{}), - metrics.NewCount(metricName, uint64(seconds(3)), 20, []string{}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(1)), 30, []string{}), + newCount(metricName, uint64(seconds(3)), 20, []string{}), }, ) } @@ -308,12 +349,15 @@ func TestMapIntMonotonicOutOfOrder(t *testing.T) { point.SetIntVal(val) } + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(2)), 2, []string{}), - metrics.NewCount(metricName, uint64(seconds(3)), 1, []string{}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(2)), 2, []string{}), + newCount(metricName, uint64(seconds(3)), 1, []string{}), }, ) } @@ -337,15 +381,17 @@ func TestMapDoubleMonotonicMetrics(t *testing.T) { // Map to Datadog format metricName := "metric.example" - expected := make([]datadog.Metric, len(deltas)) + expected := make([]metric, len(deltas)) for i, val := range deltas { - expected[i] = metrics.NewCount(metricName, uint64(seconds(i+1)), val, []string{}) + expected[i] = newCount(metricName, uint64(seconds(i+1)), val, []string{}) } + ctx := context.Background() + consumer := &mockTimeSeriesConsumer{} tr := newTranslator(t, zap.NewNop()) - output := tr.mapNumberMonotonicMetrics(metricName, slice, []string{}) + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") - assert.ElementsMatch(t, expected, output) + assert.ElementsMatch(t, expected, consumer.metrics) } func TestMapDoubleMonotonicDifferentDimensions(t *testing.T) { @@ -380,13 +426,17 @@ func TestMapDoubleMonotonicDifferentDimensions(t *testing.T) { point.SetTimestamp(seconds(1)) point.Attributes().InsertString("key1", "valB") + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) + + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(1)), 20, []string{}), - metrics.NewCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), - metrics.NewCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(1)), 20, []string{}), + newCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), + newCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), }, ) } @@ -403,12 +453,15 @@ func TestMapDoubleMonotonicWithReboot(t *testing.T) { point.SetDoubleVal(val) } + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(2)), 30, []string{}), - metrics.NewCount(metricName, uint64(seconds(6)), 20, []string{}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(2)), 30, []string{}), + newCount(metricName, uint64(seconds(6)), 20, []string{}), }, ) } @@ -427,16 +480,28 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) { point.SetDoubleVal(val) } + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(2)), 2, []string{}), - metrics.NewCount(metricName, uint64(seconds(3)), 1, []string{}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(2)), 2, []string{}), + newCount(metricName, uint64(seconds(3)), 1, []string{}), }, ) } +type mockFullConsumer struct { + mockTimeSeriesConsumer + anySketch bool +} + +func (c *mockFullConsumer) ConsumeSketch(_ context.Context, _ string, _ uint64, _ *quantile.Sketch, _ []string, _ string) { + c.anySketch = true +} + func TestMapDeltaHistogramMetrics(t *testing.T) { ts := pdata.NewTimestampFromTime(time.Now()) slice := pdata.NewHistogramDataPointSlice() @@ -447,61 +512,54 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { point.SetExplicitBounds([]float64{0}) point.SetTimestamp(ts) - noBuckets := []datadog.Metric{ - metrics.NewCount("doubleHist.test.count", uint64(ts), 20, []string{}), - metrics.NewCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{}), + noBuckets := []metric{ + newCount("doubleHist.test.count", uint64(ts), 20, []string{}), + newCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{}), } - buckets := []datadog.Metric{ - metrics.NewCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0"}), - metrics.NewCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf"}), + buckets := []metric{ + newCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0"}), + newCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf"}), } + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) delta := true tr.cfg.HistMode = HistogramModeNoBuckets - res, sl := tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) - require.Empty(t, sl) - assert.ElementsMatch(t, - res, // No buckets - noBuckets, - ) + consumer := &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, noBuckets, consumer.metrics) + assert.False(t, consumer.anySketch) tr.cfg.HistMode = HistogramModeCounters - res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) - require.Empty(t, sl) - assert.ElementsMatch(t, - res, // buckets - append(noBuckets, buckets...), - ) + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, append(noBuckets, buckets...), consumer.metrics) + assert.False(t, consumer.anySketch) // With attribute tags - noBucketsAttributeTags := []datadog.Metric{ - metrics.NewCount("doubleHist.test.count", uint64(ts), 20, []string{"attribute_tag:attribute_value"}), - metrics.NewCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"}), + noBucketsAttributeTags := []metric{ + newCount("doubleHist.test.count", uint64(ts), 20, []string{"attribute_tag:attribute_value"}), + newCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"}), } - bucketsAttributeTags := []datadog.Metric{ - metrics.NewCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0", "attribute_tag:attribute_value"}), - metrics.NewCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf", "attribute_tag:attribute_value"}), + bucketsAttributeTags := []metric{ + newCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0", "attribute_tag:attribute_value"}), + newCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf", "attribute_tag:attribute_value"}), } tr.cfg.HistMode = HistogramModeNoBuckets - res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}) - require.Empty(t, sl) - assert.ElementsMatch(t, - res, // No buckets - noBucketsAttributeTags, - ) + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics) + assert.False(t, consumer.anySketch) tr.cfg.HistMode = HistogramModeCounters - res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}) - require.Empty(t, sl) - assert.ElementsMatch(t, - res, // buckets - append(noBucketsAttributeTags, bucketsAttributeTags...), - ) + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, append(noBucketsAttributeTags, bucketsAttributeTags...), consumer.metrics) + assert.False(t, consumer.anySketch) } func TestMapCumulativeHistogramMetrics(t *testing.T) { @@ -520,27 +578,30 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { point.SetExplicitBounds([]float64{0}) point.SetTimestamp(seconds(2)) - expected := []datadog.Metric{ - metrics.NewCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}), - metrics.NewCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}), - metrics.NewCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}), - metrics.NewCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), + expected := []metric{ + newCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}), + newCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}), + newCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}), + newCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), } + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) delta := false tr.cfg.HistMode = HistogramModeCounters - res, sl := tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) - require.Empty(t, sl) + consumer := &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.False(t, consumer.anySketch) assert.ElementsMatch(t, - res, + consumer.metrics, expected, ) } func TestLegacyBucketsTags(t *testing.T) { // Test that passing the same tags slice doesn't reuse the slice. + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) tags := make([]string, 0, 10) @@ -549,16 +610,20 @@ func TestLegacyBucketsTags(t *testing.T) { pointOne.SetBucketCounts([]uint64{2, 18}) pointOne.SetExplicitBounds([]float64{0}) pointOne.SetTimestamp(seconds(0)) - seriesOne := tr.getLegacyBuckets("test.histogram.one", pointOne, true, tags) + consumer := &mockTimeSeriesConsumer{} + tr.getLegacyBuckets(ctx, consumer, "test.histogram.one", pointOne, true, tags, "") + seriesOne := consumer.metrics pointTwo := pdata.NewHistogramDataPoint() pointTwo.SetBucketCounts([]uint64{2, 18}) pointTwo.SetExplicitBounds([]float64{1}) pointTwo.SetTimestamp(seconds(0)) - seriesTwo := tr.getLegacyBuckets("test.histogram.two", pointTwo, true, tags) + consumer = &mockTimeSeriesConsumer{} + tr.getLegacyBuckets(ctx, consumer, "test.histogram.two", pointTwo, true, tags, "") + seriesTwo := consumer.metrics - assert.ElementsMatch(t, seriesOne[0].Tags, []string{"lower_bound:-inf", "upper_bound:0"}) - assert.ElementsMatch(t, seriesTwo[0].Tags, []string{"lower_bound:-inf", "upper_bound:1.0"}) + assert.ElementsMatch(t, seriesOne[0].tags, []string{"lower_bound:-inf", "upper_bound:0"}) + assert.ElementsMatch(t, seriesTwo[0].tags, []string{"lower_bound:-inf", "upper_bound:1.0"}) } func TestFormatFloat(t *testing.T) { @@ -623,96 +688,65 @@ func TestMapSummaryMetrics(t *testing.T) { if quantiles { options = append(options, WithQuantiles()) } - tr, err := New(componenttest.NewNopExporterCreateSettings(), options...) + tr, err := New(zap.NewNop(), options...) require.NoError(t, err) tr.prevPts = c return tr } - noQuantiles := []datadog.Metric{ - metrics.NewCount("summary.example.count", uint64(ts), 100, []string{}), - metrics.NewCount("summary.example.sum", uint64(ts), 10_000, []string{}), + noQuantiles := []metric{ + newCount("summary.example.count", uint64(ts), 100, []string{}), + newCount("summary.example.sum", uint64(ts), 10_000, []string{}), } - quantiles := []datadog.Metric{ - metrics.NewGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0"}), + quantiles := []metric{ + newGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0"}), + newGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5"}), + newGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999"}), + newGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0"}), } + ctx := context.Background() tr := newTranslator([]string{}, false) + consumer := &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{}, "") assert.ElementsMatch(t, - tr.mapSummaryMetrics("summary.example", slice, []string{}), + consumer.metrics, noQuantiles, ) tr = newTranslator([]string{}, true) + consumer = &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{}, "") assert.ElementsMatch(t, - tr.mapSummaryMetrics("summary.example", slice, []string{}), + consumer.metrics, append(noQuantiles, quantiles...), ) - noQuantilesAttr := []datadog.Metric{ - metrics.NewCount("summary.example.count", uint64(ts), 100, []string{"attribute_tag:attribute_value"}), - metrics.NewCount("summary.example.sum", uint64(ts), 10_000, []string{"attribute_tag:attribute_value"}), + noQuantilesAttr := []metric{ + newCount("summary.example.count", uint64(ts), 100, []string{"attribute_tag:attribute_value"}), + newCount("summary.example.sum", uint64(ts), 10_000, []string{"attribute_tag:attribute_value"}), } - quantilesAttr := []datadog.Metric{ - metrics.NewGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0", "attribute_tag:attribute_value"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5", "attribute_tag:attribute_value"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999", "attribute_tag:attribute_value"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0", "attribute_tag:attribute_value"}), + + quantilesAttr := []metric{ + newGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0", "attribute_tag:attribute_value"}), + newGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5", "attribute_tag:attribute_value"}), + newGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999", "attribute_tag:attribute_value"}), + newGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0", "attribute_tag:attribute_value"}), } tr = newTranslator([]string{"attribute_tag:attribute_value"}, false) + consumer = &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, - tr.mapSummaryMetrics("summary.example", slice, []string{"attribute_tag:attribute_value"}), + consumer.metrics, noQuantilesAttr, ) tr = newTranslator([]string{"attribute_tag:attribute_value"}, true) + consumer = &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, - tr.mapSummaryMetrics("summary.example", slice, []string{"attribute_tag:attribute_value"}), + consumer.metrics, append(noQuantilesAttr, quantilesAttr...), ) } -func TestRunningMetrics(t *testing.T) { - ms := pdata.NewMetrics() - rms := ms.ResourceMetrics() - - rm := rms.AppendEmpty() - resAttrs := rm.Resource().Attributes() - resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1")) - - rm = rms.AppendEmpty() - resAttrs = rm.Resource().Attributes() - resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1")) - - rm = rms.AppendEmpty() - resAttrs = rm.Resource().Attributes() - resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-2")) - - rms.AppendEmpty() - ctx := context.Background() - tr := newTranslator(t, zap.NewNop()) - - series, sl, err := tr.MapMetrics(ctx, ms) - require.Empty(t, sl) - require.NoError(t, err) - - runningHostnames := []string{} - - for _, metric := range series { - if *metric.Metric == "otel.datadog_exporter.metrics.running" { - if metric.Host != nil { - runningHostnames = append(runningHostnames, *metric.Host) - } - } - } - - assert.ElementsMatch(t, - runningHostnames, - []string{"fallbackHostname", "resource-hostname-1", "resource-hostname-2"}, - ) - -} - const ( testHostname = "res-hostname" ) @@ -879,25 +913,15 @@ func createTestMetrics() pdata.Metrics { return md } -func removeRunningMetrics(series []datadog.Metric) []datadog.Metric { - filtered := []datadog.Metric{} - for _, m := range series { - if m.GetMetric() != "otel.datadog_exporter.metrics.running" { - filtered = append(filtered, m) - } - } - return filtered -} - -func testGauge(name string, val float64) datadog.Metric { - m := metrics.NewGauge(name, 0, val, []string{}) - m.SetHost(testHostname) +func testGauge(name string, val float64) metric { + m := newGauge(name, 0, val, []string{}) + m.host = testHostname return m } -func testCount(name string, val float64, seconds uint64) datadog.Metric { - m := metrics.NewCount(name, seconds*1e9, val, []string{}) - m.SetHost(testHostname) +func testCount(name string, val float64, seconds uint64) metric { + m := newCount(name, seconds*1e9, val, []string{}) + m.host = testHostname return m } @@ -907,13 +931,13 @@ func TestMapMetrics(t *testing.T) { core, observed := observer.New(zapcore.DebugLevel) testLogger := zap.New(core) ctx := context.Background() + consumer := &mockFullConsumer{} tr := newTranslator(t, testLogger) - series, sl, err := tr.MapMetrics(ctx, md) - require.Empty(t, sl) + err := tr.MapMetrics(ctx, md, consumer) require.NoError(t, err) + assert.False(t, consumer.anySketch) - filtered := removeRunningMetrics(series) - assert.ElementsMatch(t, filtered, []datadog.Metric{ + assert.ElementsMatch(t, consumer.metrics, []metric{ testGauge("int.gauge", 1), testGauge("double.gauge", math.Pi), testCount("int.delta.sum", 2, 0), @@ -1031,14 +1055,14 @@ func TestNaNMetrics(t *testing.T) { core, observed := observer.New(zapcore.DebugLevel) testLogger := zap.New(core) - tr := newTranslator(t, testLogger) ctx := context.Background() - series, sl, err := tr.MapMetrics(ctx, md) - require.Empty(t, sl) + tr := newTranslator(t, testLogger) + consumer := &mockFullConsumer{} + err := tr.MapMetrics(ctx, md, consumer) + assert.False(t, consumer.anySketch) require.NoError(t, err) - filtered := removeRunningMetrics(series) - assert.ElementsMatch(t, filtered, []datadog.Metric{ + assert.ElementsMatch(t, consumer.metrics, []metric{ testCount("nan.histogram.count", 20, 0), testCount("nan.summary.count", 100, 2), }) diff --git a/pkg/otlp/model/translator/sketches_test.go b/pkg/otlp/model/translator/sketches_test.go index af274026e426..05c1eb7365f8 100644 --- a/pkg/otlp/model/translator/sketches_test.go +++ b/pkg/otlp/model/translator/sketches_test.go @@ -15,6 +15,7 @@ package translator import ( + "context" "fmt" "math" "testing" @@ -25,6 +26,24 @@ import ( "go.uber.org/zap" ) +var _ SketchConsumer = (*sketchConsumer)(nil) + +type sketchConsumer struct { + sk *quantile.Sketch +} + +// ConsumeSketch implements the translator.Consumer interface. +func (c *sketchConsumer) ConsumeSketch( + _ context.Context, + _ string, + _ uint64, + sketch *quantile.Sketch, + _ []string, + _ string, +) { + c.sk = sketch +} + func TestHistogramSketches(t *testing.T) { N := 1_000 M := 50_000.0 @@ -85,12 +104,15 @@ func TestHistogramSketches(t *testing.T) { defaultEps := 1.0 / 128.0 tol := 1e-8 cfg := quantile.Default() + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := fromCDF(test.cdf) - sk := tr.getSketchBuckets("test", 0, p, true, []string{}).Points[0].Sketch + consumer := &sketchConsumer{} + tr.getSketchBuckets(ctx, consumer, "test", 0, p, true, []string{}, "") + sk := consumer.sk // Check the minimum is 0.0 assert.Equal(t, 0.0, sk.Quantile(cfg, 0)) @@ -175,11 +197,14 @@ func TestInfiniteBounds(t *testing.T) { }, } + ctx := context.Background() tr := newTranslator(t, zap.NewNop()) for _, testInstance := range tests { t.Run(testInstance.name, func(t *testing.T) { p := testInstance.getHist() - sk := tr.getSketchBuckets("test", 0, p, true, []string{}).Points[0].Sketch + consumer := &sketchConsumer{} + tr.getSketchBuckets(ctx, consumer, "test", 0, p, true, []string{}, "") + sk := consumer.sk assert.InDelta(t, sk.Basic.Sum, p.Sum(), 1) assert.Equal(t, uint64(sk.Basic.Cnt), p.Count()) }) From a258ba7ee11fa2a7704244744dc9d6921855c3f9 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 24 Sep 2021 12:15:58 +0200 Subject: [PATCH 4/4] Make tests pass --- LICENSE-3rdparty.csv | 2 +- pkg/otlp/model/translator/config.go | 3 ++- pkg/otlp/model/translator/consumer.go | 1 + pkg/otlp/model/translator/metrics_translator.go | 4 +++- pkg/otlp/model/translator/metrics_translator_test.go | 2 +- 5 files changed, 8 insertions(+), 4 deletions(-) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index fcba24688590..0ab92eb03805 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -63,8 +63,8 @@ core,github.com/DataDog/sketches-go/ddsketch,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/sketches-go/ddsketch/encoding,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/sketches-go/ddsketch/mapping,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/sketches-go/ddsketch/pb/sketchpb,Apache-2.0,"Datadog, Inc." -core,github.com/DataDog/sketches-go/ddsketch/store,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/sketches-go/ddsketch/stat,Apache-2.0,"Datadog, Inc." +core,github.com/DataDog/sketches-go/ddsketch/store,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/viper,MIT,"Datadog, Inc." core,github.com/DataDog/watermarkpodautoscaler/api/v1alpha1,Apache-2.0,"Datadog, Inc." core,github.com/DataDog/zstd,BSD-3-Clause,"Datadog, Inc." diff --git a/pkg/otlp/model/translator/config.go b/pkg/otlp/model/translator/config.go index c980337f446b..0cb09daa162b 100644 --- a/pkg/otlp/model/translator/config.go +++ b/pkg/otlp/model/translator/config.go @@ -80,7 +80,7 @@ func WithResourceAttributesAsTags() Option { type HistogramMode string const ( - // HistogramModeOff disables bucket export. + // HistogramModeNoBuckets disables bucket export. HistogramModeNoBuckets HistogramMode = "nobuckets" // HistogramModeCounters exports buckets as Datadog counts. HistogramModeCounters HistogramMode = "counters" @@ -103,6 +103,7 @@ func WithHistogramMode(mode HistogramMode) Option { } } +// WithCountSumMetrics exports .count and .sum histogram metrics. func WithCountSumMetrics() Option { return func(t *translatorConfig) error { t.SendCountSum = true diff --git a/pkg/otlp/model/translator/consumer.go b/pkg/otlp/model/translator/consumer.go index 5cc6fb544a94..87653209eb28 100644 --- a/pkg/otlp/model/translator/consumer.go +++ b/pkg/otlp/model/translator/consumer.go @@ -20,6 +20,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/quantile" ) +// MetricDataType is a timeseries-style metric type. type MetricDataType int const ( diff --git a/pkg/otlp/model/translator/metrics_translator.go b/pkg/otlp/model/translator/metrics_translator.go index 7a23e7e75dd3..2e35c20f08fc 100644 --- a/pkg/otlp/model/translator/metrics_translator.go +++ b/pkg/otlp/model/translator/metrics_translator.go @@ -24,17 +24,19 @@ import ( "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" + "github.com/DataDog/datadog-agent/pkg/otlp/model/attributes" ) const metricName string = "metric name" +// Translator is a metrics translator. type Translator struct { prevPts *ttlCache logger *zap.Logger cfg translatorConfig } +// New creates a new translator with given options. func New(logger *zap.Logger, options ...Option) (*Translator, error) { cfg := translatorConfig{ HistMode: HistogramModeNoBuckets, diff --git a/pkg/otlp/model/translator/metrics_translator_test.go b/pkg/otlp/model/translator/metrics_translator_test.go index d1e41847db7b..8b59d8d3aa9a 100644 --- a/pkg/otlp/model/translator/metrics_translator_test.go +++ b/pkg/otlp/model/translator/metrics_translator_test.go @@ -29,7 +29,7 @@ import ( "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" + "github.com/DataDog/datadog-agent/pkg/otlp/model/attributes" ) func TestGetTags(t *testing.T) {