Skip to content
12 changes: 2 additions & 10 deletions agent/hcp/client/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,8 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R
return fmt.Errorf("failed to export metrics: %v", err)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we skip reading the response body now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah wasn't sure if the response body could contain an error message 🤔 ? How it's used :

return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body))

}

if respData.Len() != 0 {

@Achooo Achooo May 24, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIX: Check status code and remove useless logic in MetricsClient

When testing the export metrics, I hit a case where I would get a 404: not found because the endpoint wasn't hooked up. This made the code continue and try to deserialize an object that wasn't there.

So I add check to see if the status isn't 200: OK.

The colmetricpb.ExportMetricsServiceResponse returned is an empty struct on the Telemetry Gateway side, so don't bother handling it - delete all that code.

var respProto colmetricpb.ExportMetricsServiceResponse
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}

if respProto.PartialSuccess != nil {
msg := respProto.PartialSuccess.GetErrorMessage()
return fmt.Errorf("failed to export metrics: partial success: %s", msg)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body))
}

return nil
Expand Down
8 changes: 1 addition & 7 deletions agent/hcp/client/metrics_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestExportMetrics(t *testing.T) {
},
"failsWithNonRetryableError": {
status: http.StatusBadRequest,
wantErr: "failed to export metrics",
wantErr: "failed to export metrics: code 400",
},
} {
t.Run(name, func(t *testing.T) {
Expand All @@ -82,12 +82,6 @@ func TestExportMetrics(t *testing.T) {
require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token")

body := colpb.ExportMetricsServiceResponse{}

if test.wantErr != "" {
body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{
ErrorMessage: "partial failure",
}
}
bytes, err := proto.Marshal(&body)

require.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions agent/hcp/deps.go

@Achooo Achooo May 24, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPROVEMENT: Cleanup

  • Removed named params for code readability
  • Add a debug "Initialized HCP Metrics Sink" log to confirm the Sink has been initialized successfully
  • Fix the naming of loggers (to make logs nicer). I added a "hcp" prefix in setup.go, and these here have the related operations as names.

Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ type Deps struct {
func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (d Deps, err error) {
Comment thread
jjti marked this conversation as resolved.
Outdated
d.Client, err = hcpclient.NewClient(cfg)
if err != nil {
logger.Error("failed to init HCP deps - client:", "error", err)
Comment thread
Achooo marked this conversation as resolved.
Outdated
return
}

d.Provider, err = scada.New(cfg, logger.Named("hcp.scada"))
d.Provider, err = scada.New(cfg, logger.Named("scada"))
if err != nil {
logger.Error("failed to init HCP deps - scada:", "error", err)
Comment thread
Achooo marked this conversation as resolved.
Outdated
return
}

d.Sink = sink(d.Client, &cfg, logger, nodeID)
d.Sink = sink(d.Client, &cfg, logger.Named("sink"), nodeID)

return
}
Expand Down Expand Up @@ -86,5 +88,7 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo
return nil
}

logger.Info("Initialized HCP Metrics Sink")
Comment thread
Achooo marked this conversation as resolved.
Outdated

return sink
}
14 changes: 14 additions & 0 deletions agent/hcp/telemetry/custom_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package telemetry

@Achooo Achooo May 24, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keys for custom metrics for the operations of the sink. These metrics do not need to be registered in consul as a whole (e.g. for prometheus) as they will only exist when the HCP sink is created, so only the OTELSink / Inmemsink will be receiving these metrics when the OTELSink is running.

These metrics will be used to monitor the sink, and potentially also in E2E tests to ensure metrics are exported correctly.

Is a separate file overkill?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate file is ok. I would change the naming slightly to be more specific. I would also prefix with a generic prefix (something like IntetnalMetric) and drop the type:

var (
	internalMetricTransformFailure = []string{"hcp", "otel", "transform", "failure"}

	internalMetricExportSuccess = []string{"hcp", "otel", "exporter", "export", "sucess"}
	internalMetricExportFailure = []string{"hcp", "otel", "exporter", "export", "failure"}

	internalMetricExporterShutdown = []string{"hcp", "otel", "exporter", "shutdown"}
	internalMetricExporterForceFlush = []string{"hcp", "otel", "exporter", "force_flush"}
)


// Keys for custom Go Metrics metrics emitted only for the OTEL
// export (exporter.go) and transform (transform.go) failures and successes.
// These enable us to monitor OTEL operations.
var (
transformFailureMetric []string = []string{"hcp", "otel", "transform", "failure"}

exportSuccessMetric []string = []string{"hcp", "otel", "exporter", "export", "sucess"}
exportFailureMetric []string = []string{"hcp", "otel", "exporter", "export", "failure"}

exporterShutdownMetric []string = []string{"hcp", "otel", "exporter", "shutdown"}
exporterForceFlushMetric []string = []string{"hcp", "otel", "exporter", "force_flush"}
)
14 changes: 11 additions & 3 deletions agent/hcp/telemetry/otel_exporter.go

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPROVEMENT: Address TODO's to instrument Exporter operations in this file

Send a "hcp.otel.exporter.export.failure" for export failure Send a "hcp.otel.exporter.export.success" for export success
Send a "hcp.otel.exporter.export.force_flush" for force flush Send a "hcp.otel.exporter.export.shutdown" for shutdown

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/url"

goMetrics "github.com/armon/go-metrics"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
Expand Down Expand Up @@ -56,17 +57,24 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM
if isEmpty(otlpMetrics) {
return nil
}
return e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String())
err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String())
if err != nil {
goMetrics.IncrCounter(exportFailureMetric, 1)
return err
}

goMetrics.IncrCounter(exportSuccessMetric, 1)
return nil
}

// ForceFlush is a no-op, as the MetricsClient client holds no state.
func (e *OTELExporter) ForceFlush(ctx context.Context) error {
// TODO: Emit metric when this operation occurs.
goMetrics.IncrCounter(exporterForceFlushMetric, 1)
return ctx.Err()
}

// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown.
func (e *OTELExporter) Shutdown(ctx context.Context) error {
// TODO: Emit metric when this operation occurs.
goMetrics.IncrCounter(exporterShutdownMetric, 1)
return ctx.Err()
}
83 changes: 75 additions & 8 deletions agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"fmt"
"net/url"
"strings"
"testing"
"time"

"github.com/armon/go-metrics"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
Expand All @@ -16,6 +19,14 @@ import (
"github.com/hashicorp/consul/agent/hcp/client"
)

type mockMetricsClient struct {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just moved this to the top... no changes.

exportErr error
}

func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return m.exportErr
}

func TestTemporality(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
Expand Down Expand Up @@ -50,14 +61,6 @@ func TestAggregation(t *testing.T) {
}
}

type mockMetricsClient struct {
exportErr error
}

func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return m.exportErr
}

func TestExport(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
Expand Down Expand Up @@ -111,6 +114,58 @@ func TestExport(t *testing.T) {
}
}

// TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted
// for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal()
// sets a shared global sink.
func TestExport_CustomMetrics(t *testing.T) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test for the new exporter metrics metric, which makes a call to export/shutdown/force_flush and checks that the metric is in the inmemsink.

for name, tc := range map[string]struct {
client client.MetricsClient
metricKey []string
}{
"exportSuccess": {
client: &mockMetricsClient{},
metricKey: exportSuccessMetric,
},
"exportFailure": {
client: &mockMetricsClient{
exportErr: fmt.Errorf("failed to export metrics"),
},
metricKey: exportFailureMetric,
},
"shutdown": {
metricKey: exporterShutdownMetric,
},
"forceFlush": {
metricKey: exporterForceFlushMetric,
},
} {
t.Run(name, func(t *testing.T) {
// Init global sink.
serviceName := "test.transform"
cfg := metrics.DefaultConfig(serviceName)
cfg.EnableHostname = false

sink := metrics.NewInmemSink(10*time.Second, 10*time.Second)
metrics.NewGlobal(cfg, sink)

// Perform operation that emits metric.
exp := NewOTELExporter(tc.client, &url.URL{})
performExporterOperation(exp, name)

// Collect sink metrics.
intervals := sink.Data()
require.Len(t, intervals, 1)
key := serviceName + "." + strings.Join(tc.metricKey, ".")
sv := intervals[0].Counters[key]

// Verify count for transform failure metric.
require.NotNil(t, sv)
require.NotNil(t, sv.AggregateSample)
require.Equal(t, 1, sv.AggregateSample.Count)
})
}
}

func TestForceFlush(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
Expand All @@ -137,3 +192,15 @@ func mutateMetrics(m []metricdata.ScopeMetrics) *metricdata.ResourceMetrics {
ScopeMetrics: m,
}
}

func performExporterOperation(exp metric.Exporter, operation string) {

@Achooo Achooo May 24, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the nicest way I could think of doing this, but lacked imagination, open to ideas 😓

The alternative was to export, force flush and shutdown in a row, but I'd have to re-init the exporter for the exportFailure with a bad client... so it wasn't looking better.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal opinion/approach would be to make operation a field on the test struct. It's a bit tricky where the operation here is the name of the test, means having to hop back and forth to see what's tested. If you had this switch statement within the test body I think that's cleaner (albeit a longer test)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Josh's comment.

@Achooo Achooo May 25, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated! Looks much better, let me know if this is what you were thinking

  • Added operation as field name in test struct
  • Moved switch to within the test
  • Clarified test names (so they're not repeats of the operation)

ctx := context.Background()
switch operation {
case "forceFlush":
exp.ForceFlush(ctx)
case "shutdown":
exp.Shutdown(ctx)
default:
exp.Export(ctx, inputResourceMetrics)
}
}
3 changes: 2 additions & 1 deletion agent/hcp/telemetry/otlp_transform.go

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPROVEMENT: Address TODO to instrument transform operations in this file

Send a `"hcp.otel.transform.failure" for transform failure

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"

goMetrics "github.com/armon/go-metrics"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
Expand Down Expand Up @@ -70,7 +71,7 @@ func metricsToPB(metrics []metricdata.Metrics) []*mpb.Metric {
for _, m := range metrics {
o, err := metricTypeToPB(m)
if err != nil {
// TODO: Emit metric when a transformation failure occurs.
goMetrics.IncrCounter(transformFailureMetric, 1)
continue
}
out = append(out, o)
Expand Down
Loading