Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
e4f0761
Client configured with TLS using HCP config and retry/throttle
Achooo Apr 11, 2023
8bbc289
run go mod tidy
Achooo Apr 25, 2023
f6182b4
Remove one abstraction to use the config from deps
Achooo Apr 26, 2023
009f08e
Address PR feedback
Achooo Apr 28, 2023
41ba7ee
Client configured with TLS using HCP config and retry/throttle
Achooo Apr 11, 2023
8fd34e4
run go mod tidy
Achooo Apr 25, 2023
894cef4
Create new OTELExporter which uses the MetricsClient
Achooo Apr 24, 2023
b50057c
Fix lint error
Achooo Apr 25, 2023
0c01542
early return when there are no metrics
Achooo Apr 26, 2023
da20fe3
Add NewOTELExporter() function
Achooo Apr 28, 2023
749b1c8
Downgrade to metrics SDK version: v1.15.0-rc.1
Achooo May 1, 2023
383b366
Fix imports
Achooo May 1, 2023
1d222b1
fix small nits with comments and url.URL
Achooo May 9, 2023
5564bce
Fix tests by asserting actual error for context cancellation, fix par…
Achooo May 10, 2023
424a065
Cleanup error handling and clarify empty metrics case
Achooo May 10, 2023
470a11d
Fix input/expected naming in otel_transform_test.go
Achooo May 10, 2023
be0b01b
add comment for metric tracking
Achooo May 10, 2023
325bb4d
Add a general isEmpty method
Achooo May 10, 2023
a0352ac
Add clear error types
Achooo May 10, 2023
2d356f6
update to latest version 1.15.0 of OTEL
Achooo May 11, 2023
22bb2ee
Client configured with TLS using HCP config and retry/throttle
Achooo Apr 11, 2023
22be78f
run go mod tidy
Achooo Apr 25, 2023
478181a
Remove one abstraction to use the config from deps
Achooo Apr 26, 2023
c2ffaab
Address PR feedback
Achooo Apr 28, 2023
864e6d7
Initialize OTELSink with sync.Map for all the instrument stores.
Achooo Apr 24, 2023
05c418b
Moved PeriodicReader init to NewOtelReader function. This allows us t…
Achooo Apr 26, 2023
72ae205
Switch to mutex instead of sync.Map to avoid type assertion
Achooo Apr 26, 2023
83fba0a
Add gauge store
Achooo Apr 26, 2023
520ba9f
Clarify comments
Achooo Apr 26, 2023
190ef2a
return concrete sink type
Achooo Apr 26, 2023
659a7dd
Fix lint errors
Achooo Apr 27, 2023
9659a87
Move gauge store to be within sink
Achooo Apr 27, 2023
80e01c7
Use context.TODO,rebase and clenaup opts handling
Achooo Apr 28, 2023
7cbed58
Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1
Achooo May 1, 2023
91fcfc7
Fix imports
Achooo May 1, 2023
48d69e3
Update to latest stable version by rebasing on cc-4933, fix import, r…
Achooo May 11, 2023
563330e
Add lots of documentation to the OTELSink
Achooo May 11, 2023
b98481d
Fix gauge store comment and check ok
Achooo May 11, 2023
0162bb6
Add select and ctx.Done() check to gauge callback
Achooo May 11, 2023
899dbba
use require.Equal for attributes
Achooo May 11, 2023
542d23a
Fixed import naming
Achooo May 11, 2023
2d8a18a
Remove float64 calls and add a NewGaugeStore method
Achooo May 11, 2023
5defe6a
Change name Store to Set in gaugeStore, add concurrency tests in both…
Achooo May 11, 2023
a893c32
Generate 100 gauge operations
Achooo May 11, 2023
9d5f5ef
Seperate the labels into goroutines in sink test
Achooo May 11, 2023
80a534b
Generate kv store for the test case keys to avoid using uuid
Achooo May 11, 2023
aa2a971
Added a race test with 300 samples for OTELSink
Achooo May 12, 2023
e3e8d3f
Merge and fix conflicts
Achooo May 15, 2023
d13849b
Do not pass in waitgroup and use error channel instead.
Achooo May 15, 2023
713c5fa
Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build fai…
Achooo May 15, 2023
506e867
Fix nits
Achooo May 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions agent/hcp/telemetry/gauge_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package telemetry

import (
"context"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// gaugeStore holds last seen Gauge values for a particular metric (<name,last_value>) in the store.
// OTEL does not currently have a synchronous Gauge instrument. Instead, it allows the registration of callbacks.
// The callbacks are called during export, where the Gauge value must be returned.
// This store is a workaround, which holds last seen Gauge values until the callback is called.
type gaugeStore struct {
store map[string]*gaugeValue
mutex sync.Mutex
}

// gaugeValues are the last seen measurement for a Gauge metric, which contains a float64 value and labels.
type gaugeValue struct {
Value float64
Attributes []attribute.KeyValue
}

// NewGaugeStore returns an initialized empty gaugeStore.
func NewGaugeStore() *gaugeStore {
return &gaugeStore{
store: make(map[string]*gaugeValue, 0),
}
}

// LoadAndDelete will read a Gauge value and delete it.
// Once registered for a metric name, a Gauge callback will continue to execute every collection cycel.
// We must delete the value once we have read it, to avoid repeat values being sent.
func (g *gaugeStore) LoadAndDelete(key string) (*gaugeValue, bool) {
g.mutex.Lock()
defer g.mutex.Unlock()

gauge, ok := g.store[key]
if !ok {
return nil, ok
}

delete(g.store, key)

return gauge, ok
}

// Set adds a gaugeValue to the global gauge store.
func (g *gaugeStore) Set(key string, value float64, labels []attribute.KeyValue) {
g.mutex.Lock()
defer g.mutex.Unlock()

gv := &gaugeValue{
Value: value,
Attributes: labels,
}

g.store[key] = gv
}

// gaugeCallback returns a callback which gets called when metrics are collected for export.
func (g *gaugeStore) gaugeCallback(key string) metric.Float64Callback {
// Closures keep a reference to the key string, that get garbage collected when code completes.
return func(ctx context.Context, obs metric.Float64Observer) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
if gauge, ok := g.LoadAndDelete(key); ok {
obs.Observe(gauge.Value, metric.WithAttributes(gauge.Attributes...))
}
return nil
}
}
}
89 changes: 89 additions & 0 deletions agent/hcp/telemetry/gauge_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package telemetry

import (
"context"
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
)

func TestGaugeStore(t *testing.T) {
t.Parallel()

gaugeStore := NewGaugeStore()

attributes := []attribute.KeyValue{
{
Key: attribute.Key("test_key"),
Value: attribute.StringValue("test_value"),
},
}

gaugeStore.Set("test", 1.23, attributes)

// Should store a new gauge.
val, ok := gaugeStore.LoadAndDelete("test")
require.True(t, ok)
require.Equal(t, val.Value, 1.23)
require.Equal(t, val.Attributes, attributes)

// Gauge with key "test" have been deleted.
val, ok = gaugeStore.LoadAndDelete("test")
require.False(t, ok)
require.Nil(t, val)

gaugeStore.Set("duplicate", 1.5, nil)
gaugeStore.Set("duplicate", 6.7, nil)

// Gauge with key "duplicate" should hold the latest (last seen) value.
val, ok = gaugeStore.LoadAndDelete("duplicate")
require.True(t, ok)
require.Equal(t, val.Value, 6.7)
}

func TestGaugeCallback_Failure(t *testing.T) {
t.Parallel()

k := "consul.raft.apply"
gaugeStore := NewGaugeStore()
gaugeStore.Set(k, 1.23, nil)

cb := gaugeStore.gaugeCallback(k)
ctx, cancel := context.WithCancel(context.Background())

cancel()
err := cb(ctx, nil)
require.ErrorIs(t, err, context.Canceled)
}

// TestGaugeStore_Race induces a race condition. When run with go test -race,
// this test should pass if implementation is concurrency safe.
func TestGaugeStore_Race(t *testing.T) {
t.Parallel()

gaugeStore := NewGaugeStore()

wg := &sync.WaitGroup{}
samples := 100
errCh := make(chan error, samples)
for i := 0; i < samples; i++ {
wg.Add(1)
key := fmt.Sprintf("consul.test.%d", i)
value := 12.34
go func() {
defer wg.Done()
gaugeStore.Set(key, value, nil)
gv, _ := gaugeStore.LoadAndDelete(key)
if gv.Value != value {
errCh <- fmt.Errorf("expected value: '%f', but got: '%f' for key: '%s'", value, gv.Value, key)
}
}()
}

wg.Wait()

require.Empty(t, errCh)
}
209 changes: 209 additions & 0 deletions agent/hcp/telemetry/otel_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package telemetry

import (
"bytes"
"context"
"fmt"
"net/url"
"strings"
"sync"
"time"

gometrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
otelsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"

"github.com/hashicorp/consul/agent/hcp/client"
)

type OTELSinkOpts struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this struct as we have one last follow up PR that adds default filters and labels to the sink (so we'll have 4 parameters)

Reader otelsdk.Reader
Ctx context.Context
}

// OTELSink captures and aggregates telemetry data as per the OpenTelemetry (OTEL) specification.
// Metric data is exported in OpenTelemetry Protocol (OTLP) wire format.
// This should be used as a Go Metrics backend, as it implements the MetricsSink interface.
type OTELSink struct {
// spaceReplacer cleans the flattened key by removing any spaces.
spaceReplacer *strings.Replacer
logger hclog.Logger

// meterProvider is an OTEL MeterProvider, the entrypoint to the OTEL Metrics SDK.
// It handles reading/export of aggregated metric data.
// It enables creation and usage of an OTEL Meter.
meterProvider *otelsdk.MeterProvider

// meter is an OTEL Meter, which enables the creation of OTEL instruments.
meter *otelmetric.Meter

// Instrument stores contain an OTEL Instrument per metric name (<name, instrument>)
// for each gauge, counter and histogram types.
// An instrument allows us to record a measurement for a particular metric, and continuously aggregates metrics.
// We lazy load the creation of these intruments until a metric is seen, and use them repeatedly to record measurements.
gaugeInstruments map[string]otelmetric.Float64ObservableGauge
counterInstruments map[string]otelmetric.Float64Counter
histogramInstruments map[string]otelmetric.Float64Histogram

// gaugeStore is required to hold last-seen values of gauges
// This is a workaround, as OTEL currently does not have synchronous gauge instruments.
// It only allows the registration of "callbacks", which obtain values when the callback is called.
// We must hold gauge values until the callback is called, when the measurement is exported, and can be removed.
gaugeStore *gaugeStore

mutex sync.Mutex
}

// NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds.
// It configures the reader with a custom OTELExporter with a MetricsClient to transform and export
// metrics in OTLP format to an external url.
func NewOTELReader(client client.MetricsClient, url url.URL, exportInterval time.Duration) otelsdk.Reader {
exporter := NewOTELExporter(client, url)
return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval))
}

// NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface.
// It sets up a MeterProvider and Meter, key pieces of the OTEL Metrics SDK which
// enable us to create OTEL Instruments to record measurements.
func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) {
if opts.Reader == nil {
return nil, fmt.Errorf("ferror: provide valid reader")
}

if opts.Ctx == nil {
return nil, fmt.Errorf("ferror: provide valid context")
}

// Setup OTEL Metrics SDK to aggregate, convert and export metrics.
res := resource.NewSchemaless()
meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader))
meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry")

return &OTELSink{
spaceReplacer: strings.NewReplacer(" ", "_"),
logger: hclog.FromContext(opts.Ctx).Named("otel_sink"),
meterProvider: meterProvider,
meter: &meter,
gaugeStore: NewGaugeStore(),
gaugeInstruments: make(map[string]otelmetric.Float64ObservableGauge, 0),
counterInstruments: make(map[string]otelmetric.Float64Counter, 0),
histogramInstruments: make(map[string]otelmetric.Float64Histogram, 0),
}, nil
}

// SetGauge emits a Consul gauge metric.
func (o *OTELSink) SetGauge(key []string, val float32) {
o.SetGaugeWithLabels(key, val, nil)
}

// AddSample emits a Consul histogram metric.
func (o *OTELSink) AddSample(key []string, val float32) {
o.AddSampleWithLabels(key, val, nil)
}

// IncrCounter emits a Consul counter metric.
func (o *OTELSink) IncrCounter(key []string, val float32) {
o.IncrCounterWithLabels(key, val, nil)
}

// AddSampleWithLabels emits a Consul gauge metric that gets
// registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)

// Set value in global Gauge store.
o.gaugeStore.Set(k, float64(val), toAttributes(labels))

o.mutex.Lock()
defer o.mutex.Unlock()

// If instrument does not exist, create it and register callback to emit last value in global Gauge store.
if _, ok := o.gaugeInstruments[k]; !ok {
// The registration of a callback only needs to happen once, when the instrument is created.
// The callback will be triggered every export cycle for that metric.
// It must be explicitly de-registered to be removed (which we do not do), to ensure new gauge values are exported every cycle.
inst, err := (*o.meter).Float64ObservableGauge(k, otelmetric.WithFloat64Callback(o.gaugeStore.gaugeCallback(k)))
if err != nil {
o.logger.Error("Failed to emit gauge: %w", err)
return
}
o.gaugeInstruments[k] = inst
}
}

// AddSampleWithLabels emits a Consul sample metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)

o.mutex.Lock()
defer o.mutex.Unlock()

inst, ok := o.histogramInstruments[k]
if !ok {
histogram, err := (*o.meter).Float64Histogram(k)
if err != nil {
o.logger.Error("Failed to emit gauge: %w", err)
return
}
inst = histogram
o.histogramInstruments[k] = inst
}

attrs := toAttributes(labels)
inst.Record(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...))
}

// IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)

o.mutex.Lock()
defer o.mutex.Unlock()

inst, ok := o.counterInstruments[k]
if !ok {
counter, err := (*o.meter).Float64Counter(k)
if err != nil {
o.logger.Error("Failed to emit gauge: %w", err)
return
}

inst = counter
o.counterInstruments[k] = inst
}

attrs := toAttributes(labels)
inst.Add(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...))
}

// EmitKey unsupported.
func (o *OTELSink) EmitKey(key []string, val float32) {}

// flattenKey key along with its labels.
func (o *OTELSink) flattenKey(parts []string) string {
buf := &bytes.Buffer{}
joined := strings.Join(parts, ".")

o.spaceReplacer.WriteString(buf, joined)

return buf.String()
}

// toAttributes converts go metrics Labels into OTEL format []attributes.KeyValue
func toAttributes(labels []gometrics.Label) []attribute.KeyValue {
if len(labels) == 0 {
return nil
}
attrs := make([]attribute.KeyValue, len(labels))
for i, label := range labels {
attrs[i] = attribute.KeyValue{
Key: attribute.Key(label.Name),
Value: attribute.StringValue(label.Value),
}
}

return attrs
}
Loading