-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[HCP Observability] Add custom metrics for OTEL sink, improve logging, upgrade modules and cleanup metrics client #17455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
55109e5
f6b3a9d
1b2500f
01338ee
64899a4
46e0918
ca89326
0305ed3
dd2f7a8
32b1525
b24ab3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ import ( | |
| metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" | ||
| "golang.org/x/oauth2" | ||
| "google.golang.org/protobuf/proto" | ||
|
|
||
| "github.com/hashicorp/consul/version" | ||
| ) | ||
|
|
||
| const ( | ||
|
|
@@ -72,8 +74,9 @@ func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, erro | |
| } | ||
|
|
||
| header := make(http.Header) | ||
| header.Set("Content-Type", "application/x-protobuf") | ||
| header.Set("content-type", "application/x-protobuf") | ||
| header.Set("x-hcp-resource-id", r.String()) | ||
| header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion())) | ||
|
|
||
| return &otlpClient{ | ||
| client: c, | ||
|
|
@@ -124,36 +127,28 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R | |
|
|
||
| body, err := proto.Marshal(pbRequest) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| return fmt.Errorf("failed to marshal the request: %w", err) | ||
| } | ||
|
|
||
| req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body)) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| return fmt.Errorf("failed to create request: %w", err) | ||
| } | ||
| req.Header = *o.header | ||
|
|
||
| resp, err := o.client.Do(req.WithContext(ctx)) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| return fmt.Errorf("failed to post metrics: %w", err) | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| var respData bytes.Buffer | ||
| if _, err := io.Copy(&respData, resp.Body); err != nil { | ||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| return fmt.Errorf("failed to read body: %w", err) | ||
| } | ||
|
|
||
| if respData.Len() != 0 { | ||
|
||
| var respProto colmetricpb.ExportMetricsServiceResponse | ||
| if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { | ||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| } | ||
|
|
||
| if respProto.PartialSuccess != nil { | ||
| msg := respProto.PartialSuccess.GetErrorMessage() | ||
| return fmt.Errorf("failed to export metrics: partial success: %s", msg) | ||
| } | ||
| if resp.StatusCode != http.StatusOK { | ||
| return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body)) | ||
| } | ||
|
|
||
| return nil | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMPROVEMENT: Cleanup
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| package telemetry | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keys for custom metrics for the operations of the sink. These metrics do not need to be registered in consul as a whole (e.g. for prometheus) as they will only exist when the HCP sink is created, so only the OTELSink / Inmemsink will be receiving these metrics when the OTELSink is running. These metrics will be used to monitor the sink, and potentially also in E2E tests to ensure metrics are exported correctly. Is a separate file overkill?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Separate file is ok. I would change the naming slightly to be more specific. I would also prefix with a generic prefix (something like var (
internalMetricTransformFailure = []string{"hcp", "otel", "transform", "failure"}
internalMetricExportSuccess = []string{"hcp", "otel", "exporter", "export", "sucess"}
internalMetricExportFailure = []string{"hcp", "otel", "exporter", "export", "failure"}
internalMetricExporterShutdown = []string{"hcp", "otel", "exporter", "shutdown"}
internalMetricExporterForceFlush = []string{"hcp", "otel", "exporter", "force_flush"}
) |
||
|
|
||
| // Keys for custom Go Metrics metrics emitted only for the OTEL | ||
| // export (exporter.go) and transform (transform.go) failures and successes. | ||
| // These enable us to monitor OTEL operations. | ||
| var ( | ||
| internalMetricTransformFailure []string = []string{"hcp", "otel", "transform", "failure"} | ||
|
|
||
| internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "sucess"} | ||
| internalMetricExportFailure []string = []string{"hcp", "otel", "exporter", "export", "failure"} | ||
|
|
||
| internalMetricExporterShutdown []string = []string{"hcp", "otel", "exporter", "shutdown"} | ||
| internalMetricExporterForceFlush []string = []string{"hcp", "otel", "exporter", "force_flush"} | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,8 +4,11 @@ import ( | |
| "context" | ||
| "fmt" | ||
| "net/url" | ||
| "strings" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/armon/go-metrics" | ||
| "github.com/stretchr/testify/require" | ||
| "go.opentelemetry.io/otel/sdk/metric" | ||
| "go.opentelemetry.io/otel/sdk/metric/aggregation" | ||
|
|
@@ -16,6 +19,14 @@ import ( | |
| "github.com/hashicorp/consul/agent/hcp/client" | ||
| ) | ||
|
|
||
| type mockMetricsClient struct { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just moved this to the top... no changes. |
||
| exportErr error | ||
| } | ||
|
|
||
| func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { | ||
| return m.exportErr | ||
| } | ||
|
|
||
| func TestTemporality(t *testing.T) { | ||
| t.Parallel() | ||
| exp := &OTELExporter{} | ||
|
|
@@ -50,14 +61,6 @@ func TestAggregation(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| type mockMetricsClient struct { | ||
| exportErr error | ||
| } | ||
|
|
||
| func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { | ||
| return m.exportErr | ||
| } | ||
|
|
||
| func TestExport(t *testing.T) { | ||
| t.Parallel() | ||
| for name, test := range map[string]struct { | ||
|
|
@@ -111,6 +114,72 @@ func TestExport(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| // TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted | ||
| // for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal() | ||
| // sets a shared global sink. | ||
| func TestExport_CustomMetrics(t *testing.T) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test for the new exporter metrics metric, which makes a call to export/shutdown/force_flush and checks that the metric is in the inmemsink. |
||
| for name, tc := range map[string]struct { | ||
| client client.MetricsClient | ||
| metricKey []string | ||
| operation string | ||
| }{ | ||
| "exportSuccessEmitsCustomMetric": { | ||
| client: &mockMetricsClient{}, | ||
| metricKey: internalMetricExportSuccess, | ||
| operation: "export", | ||
| }, | ||
| "exportFailureEmitsCustomMetric": { | ||
| client: &mockMetricsClient{ | ||
| exportErr: fmt.Errorf("client err"), | ||
| }, | ||
| metricKey: internalMetricExportFailure, | ||
| operation: "export", | ||
| }, | ||
| "shutdownEmitsCustomMetric": { | ||
| metricKey: internalMetricExporterShutdown, | ||
| operation: "shutdown", | ||
| }, | ||
| "forceFlushEmitsCustomMetric": { | ||
| metricKey: internalMetricExporterForceFlush, | ||
| operation: "flush", | ||
| }, | ||
| } { | ||
| t.Run(name, func(t *testing.T) { | ||
| // Init global sink. | ||
| serviceName := "test.transform" | ||
| cfg := metrics.DefaultConfig(serviceName) | ||
| cfg.EnableHostname = false | ||
|
|
||
| sink := metrics.NewInmemSink(10*time.Second, 10*time.Second) | ||
| metrics.NewGlobal(cfg, sink) | ||
|
|
||
| // Perform operation that emits metric. | ||
| exp := NewOTELExporter(tc.client, &url.URL{}) | ||
|
|
||
| ctx := context.Background() | ||
| switch tc.operation { | ||
| case "flush": | ||
| exp.ForceFlush(ctx) | ||
| case "shutdown": | ||
| exp.Shutdown(ctx) | ||
| default: | ||
| exp.Export(ctx, inputResourceMetrics) | ||
| } | ||
|
|
||
| // Collect sink metrics. | ||
| intervals := sink.Data() | ||
| require.Len(t, intervals, 1) | ||
| key := serviceName + "." + strings.Join(tc.metricKey, ".") | ||
| sv := intervals[0].Counters[key] | ||
|
|
||
| // Verify count for transform failure metric. | ||
| require.NotNil(t, sv) | ||
| require.NotNil(t, sv.AggregateSample) | ||
| require.Equal(t, 1, sv.AggregateSample.Count) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestForceFlush(t *testing.T) { | ||
| t.Parallel() | ||
| exp := &OTELExporter{} | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lowercased to be consistent with other ones.