Skip to content
Merged
117 changes: 117 additions & 0 deletions exporter/datadogexporter/internal/metrics/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"context"

"github.com/DataDog/datadog-agent/pkg/quantile"
"go.opentelemetry.io/collector/component"
"gopkg.in/zorkian/go-datadog-api.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/sketches"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/translator"
)

var _ translator.Consumer = (*Consumer)(nil)
var _ translator.HostConsumer = (*Consumer)(nil)

// Consumer is the metrics Consumer.
type Consumer struct {
ms []datadog.Metric
sl sketches.SketchSeriesList
seenHosts map[string]struct{}
}

// NewConsumer creates a new zorkian consumer.
func NewConsumer() *Consumer {
return &Consumer{
seenHosts: make(map[string]struct{}),
}
}

// toDataType maps translator datatypes to zorkian's datatypes.
func (c *Consumer) toDataType(dt translator.MetricDataType) (out MetricDataType) {
out = MetricDataType("unknown")

switch dt {
case translator.Count:
out = Count
case translator.Gauge:
out = Gauge
}

return
}

// runningMetrics gets the running metrics for the exporter.
func (c *Consumer) runningMetrics(timestamp uint64, buildInfo component.BuildInfo) (series []datadog.Metric) {
for host := range c.seenHosts {
// Report the host as running
runningMetric := DefaultMetrics("metrics", host, timestamp, buildInfo)
series = append(series, runningMetric...)
}

return
}

// All gets all metrics (consumed metrics and running metrics).
func (c *Consumer) All(timestamp uint64, buildInfo component.BuildInfo) ([]datadog.Metric, sketches.SketchSeriesList) {
series := c.ms
series = append(series, c.runningMetrics(timestamp, buildInfo)...)
return series, c.sl
}

// ConsumeTimeSeries implements the translator.Consumer interface.
func (c *Consumer) ConsumeTimeSeries(
_ context.Context,
name string,
typ translator.MetricDataType,
timestamp uint64,
value float64,
tags []string,
host string,
) {
dt := c.toDataType(typ)
met := NewMetric(name, dt, timestamp, value, tags)
met.SetHost(host)
c.ms = append(c.ms, met)
}

// ConsumeSketch implements the translator.Consumer interface.
func (c *Consumer) ConsumeSketch(
_ context.Context,
name string,
timestamp uint64,
sketch *quantile.Sketch,
tags []string,
host string,
) {
c.sl = append(c.sl, sketches.SketchSeries{
Name: name,
Tags: tags,
Host: host,
Interval: 1,
Points: []sketches.SketchPoint{{
Ts: int64(timestamp / 1e9),
Sketch: sketch,
}},
})
}

// ConsumeHost implements the translator.HostConsumer interface.
func (c *Consumer) ConsumeHost(host string) {
c.seenHosts[host] = struct{}{}
}
85 changes: 85 additions & 0 deletions exporter/datadogexporter/internal/metrics/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/translator"
)

type testProvider string

func (t testProvider) Hostname(context.Context) (string, error) {
return string(t), nil
}

func newTranslator(t *testing.T, logger *zap.Logger) *translator.Translator {
tr, err := translator.New(logger,
translator.WithCountSumMetrics(),
translator.WithHistogramMode(translator.HistogramModeNoBuckets),
translator.WithNumberMode(translator.NumberModeCumulativeToDelta),
translator.WithFallbackHostnameProvider(testProvider("fallbackHostname")),
)
require.NoError(t, err)
return tr
}

func TestRunningMetrics(t *testing.T) {
ms := pdata.NewMetrics()
rms := ms.ResourceMetrics()

rm := rms.AppendEmpty()
resAttrs := rm.Resource().Attributes()
resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1"))

rm = rms.AppendEmpty()
resAttrs = rm.Resource().Attributes()
resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1"))

rm = rms.AppendEmpty()
resAttrs = rm.Resource().Attributes()
resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-2"))

rms.AppendEmpty()

logger, _ := zap.NewProduction()
tr := newTranslator(t, logger)

ctx := context.Background()
consumer := NewConsumer()
tr.MapMetrics(ctx, ms, consumer)

runningHostnames := []string{}
for _, metric := range consumer.runningMetrics(0, component.BuildInfo{}) {
if metric.Host != nil {
runningHostnames = append(runningHostnames, *metric.Host)
}
}

assert.ElementsMatch(t,
runningHostnames,
[]string{"fallbackHostname", "resource-hostname-1", "resource-hostname-2"},
)

}
71 changes: 71 additions & 0 deletions exporter/datadogexporter/internal/translator/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package translator

import (
"context"

"github.com/DataDog/datadog-agent/pkg/quantile"
)

// MetricDataType is a timeseries-style metric type.
type MetricDataType int

const (
// Gauge is the Datadog Gauge metric type.
Gauge MetricDataType = iota
// Count is the Datadog Count metric type.
Count
)

// TimeSeriesConsumer is timeseries consumer.
type TimeSeriesConsumer interface {
// ConsumeTimeSeries consumes a timeseries-style metric.
ConsumeTimeSeries(
ctx context.Context,
name string,
typ MetricDataType,
timestamp uint64,
value float64,
tags []string,
host string,
)
}

// SketchConsumer is a pkg/quantile sketch consumer.
type SketchConsumer interface {
// ConsumeSketch consumes a pkg/quantile-style sketch.
ConsumeSketch(
ctx context.Context,
name string,
timestamp uint64,
sketch *quantile.Sketch,
tags []string,
host string,
)
}

// Consumer is a metrics consumer.
type Consumer interface {
TimeSeriesConsumer
SketchConsumer
}

// HostConsumer is a hostname consumer.
// It is an optional interface that can be implemented by a Consumer.
type HostConsumer interface {
// ConsumeHost consumes a hostname.
ConsumeHost(host string)
}
Loading