-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[HCP Observability] New MetricsClient #17100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6057424
3a28685
d5ec1cc
8ff9c56
3f875e9
cbbf5ef
b2a1289
e9ce142
eae4e7f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,152 @@ | ||
| package client | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "time" | ||
|
|
||
| "github.com/hashicorp/go-cleanhttp" | ||
| "github.com/hashicorp/go-hclog" | ||
| "github.com/hashicorp/go-retryablehttp" | ||
| hcpcfg "github.com/hashicorp/hcp-sdk-go/config" | ||
| colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" | ||
| metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" | ||
| "golang.org/x/oauth2" | ||
| "google.golang.org/protobuf/proto" | ||
| ) | ||
|
|
||
| const ( | ||
| // HTTP Client config | ||
| defaultStreamTimeout = 15 * time.Second | ||
|
|
||
| // Retry config | ||
| // TODO: Evenutally, we'd like to configure these values dynamically. | ||
| defaultRetryWaitMin = 1 * time.Second | ||
| defaultRetryWaitMax = 15 * time.Second | ||
| defaultRetryMax = 4 | ||
| ) | ||
|
|
||
| // MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway. | ||
| type MetricsClient interface { | ||
| ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error | ||
| } | ||
|
|
||
| // cloudConfig represents cloud config for TLS abstracted in an interface for easy testing. | ||
| type CloudConfig interface { | ||
| HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) | ||
| } | ||
|
|
||
| // otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle. | ||
| // It also holds default HTTP headers to add to export requests. | ||
| type otlpClient struct { | ||
| client *retryablehttp.Client | ||
| header *http.Header | ||
| } | ||
|
|
||
| // NewMetricsClient returns a configured MetricsClient. | ||
| // The current implementation uses otlpClient to provide retry functionality. | ||
| func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, error) { | ||
| if cfg == nil { | ||
| return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)") | ||
| } | ||
|
|
||
| if ctx == nil { | ||
| return nil, fmt.Errorf("failed to init telemetry client: provide a valid context") | ||
| } | ||
|
|
||
| logger := hclog.FromContext(ctx) | ||
|
|
||
| c, err := newHTTPClient(cfg, logger) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to init telemetry client: %v", err) | ||
| } | ||
|
|
||
| header := make(http.Header) | ||
| header.Set("Content-Type", "application/x-protobuf") | ||
|
|
||
| return &otlpClient{ | ||
| client: c, | ||
| header: &header, | ||
| }, nil | ||
| } | ||
|
|
||
| // newHTTPClient configures the retryable HTTP client. | ||
| func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) { | ||
| hcpCfg, err := cloudCfg.HCPConfig() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| tlsTransport := cleanhttp.DefaultPooledTransport() | ||
| tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig() | ||
|
|
||
| var transport http.RoundTripper = &oauth2.Transport{ | ||
| Base: tlsTransport, | ||
| Source: hcpCfg, | ||
| } | ||
|
|
||
| client := &http.Client{ | ||
| Transport: transport, | ||
| Timeout: defaultStreamTimeout, | ||
| } | ||
|
|
||
| retryClient := &retryablehttp.Client{ | ||
| HTTPClient: client, | ||
| Logger: logger.Named("hcp_telemetry_client"), | ||
| RetryWaitMin: defaultRetryWaitMin, | ||
Achooo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| RetryWaitMax: defaultRetryWaitMax, | ||
| RetryMax: defaultRetryMax, | ||
| CheckRetry: retryablehttp.DefaultRetryPolicy, | ||
| Backoff: retryablehttp.DefaultBackoff, | ||
| } | ||
|
|
||
| return retryClient, nil | ||
| } | ||
|
|
||
| // ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint. | ||
| // The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config. | ||
| // By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request. | ||
| func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { | ||
| pbRequest := &colmetricpb.ExportMetricsServiceRequest{ | ||
| ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, | ||
| } | ||
|
|
||
| body, err := proto.Marshal(pbRequest) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| } | ||
|
|
||
| req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body)) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| } | ||
| req.Header = *o.header | ||
|
|
||
| resp, err := o.client.Do(req.WithContext(ctx)) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| var respData bytes.Buffer | ||
| if _, err := io.Copy(&respData, resp.Body); err != nil { | ||
Achooo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| } | ||
|
|
||
| if respData.Len() != 0 { | ||
| var respProto colmetricpb.ExportMetricsServiceResponse | ||
| if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { | ||
| return fmt.Errorf("failed to export metrics: %v", err) | ||
| } | ||
|
|
||
| if respProto.PartialSuccess != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trying to figure out what to do about this on the CTGW side as well. Will get back to you about it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I followed suit of this exporter if it helps: https://github.com/open-telemetry/opentelemetry-go/blob/8e76ab23b495ce2411d936b497fb8f1a9b35f974/exporters/otlp/otlpmetric/otlpmetrichttp/client.go#L200
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now I am going to never return this but good to keep this here if we decide to implement partial success. |
||
| msg := respProto.PartialSuccess.GetErrorMessage() | ||
| return fmt.Errorf("failed to export metrics: partial success: %s", msg) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| package client | ||
|
|
||
| import ( | ||
| "context" | ||
| "net/http" | ||
| "net/http/httptest" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" | ||
| metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" | ||
| "google.golang.org/protobuf/proto" | ||
| ) | ||
|
|
||
| func TestNewMetricsClient(t *testing.T) { | ||
| for name, test := range map[string]struct { | ||
| wantErr string | ||
| cfg CloudConfig | ||
| ctx context.Context | ||
| }{ | ||
| "success": { | ||
| cfg: &MockCloudCfg{}, | ||
| ctx: context.Background(), | ||
| }, | ||
| "failsWithoutCloudCfg": { | ||
| wantErr: "failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)", | ||
| cfg: nil, | ||
| ctx: context.Background(), | ||
| }, | ||
| "failsWithoutContext": { | ||
| wantErr: "failed to init telemetry client: provide a valid context", | ||
| cfg: MockCloudCfg{}, | ||
| ctx: nil, | ||
| }, | ||
| "failsHCPConfig": { | ||
| wantErr: "failed to init telemetry client", | ||
| cfg: MockErrCloudCfg{}, | ||
| ctx: context.Background(), | ||
| }, | ||
| } { | ||
| t.Run(name, func(t *testing.T) { | ||
| client, err := NewMetricsClient(test.cfg, test.ctx) | ||
| if test.wantErr != "" { | ||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), test.wantErr) | ||
| return | ||
| } | ||
|
|
||
| require.Nil(t, err) | ||
| require.NotNil(t, client) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestExportMetrics(t *testing.T) { | ||
| for name, test := range map[string]struct { | ||
| wantErr string | ||
| status int | ||
| }{ | ||
| "success": { | ||
| status: http.StatusOK, | ||
| }, | ||
| "failsWithNonRetryableError": { | ||
| status: http.StatusBadRequest, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will partial success always return badrequest?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessarily actually, I just added it to test out that part of my code which handles the partial success (without kicking off the retry logic to avoid timing out my tests) |
||
| wantErr: "failed to export metrics", | ||
| }, | ||
| } { | ||
| t.Run(name, func(t *testing.T) { | ||
| srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf") | ||
|
|
||
| require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") | ||
|
|
||
| body := colpb.ExportMetricsServiceResponse{} | ||
|
|
||
| if test.wantErr != "" { | ||
| body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{ | ||
| ErrorMessage: "partial failure", | ||
| } | ||
| } | ||
| bytes, err := proto.Marshal(&body) | ||
|
|
||
| require.NoError(t, err) | ||
|
|
||
| w.Header().Set("Content-Type", "application/x-protobuf") | ||
| w.WriteHeader(test.status) | ||
| w.Write(bytes) | ||
| })) | ||
| defer srv.Close() | ||
|
|
||
| client, err := NewMetricsClient(MockCloudCfg{}, context.Background()) | ||
| require.NoError(t, err) | ||
|
|
||
| ctx := context.Background() | ||
| metrics := &metricpb.ResourceMetrics{} | ||
| err = client.ExportMetrics(ctx, metrics, srv.URL) | ||
|
|
||
| if test.wantErr != "" { | ||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), test.wantErr) | ||
| return | ||
| } | ||
|
|
||
| require.NoError(t, err) | ||
| }) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| package client | ||
|
|
||
| import ( | ||
| "crypto/tls" | ||
| "errors" | ||
| "net/url" | ||
|
|
||
| hcpcfg "github.com/hashicorp/hcp-sdk-go/config" | ||
| "golang.org/x/oauth2" | ||
| ) | ||
|
|
||
| type mockHCPCfg struct{} | ||
|
|
||
| func (m *mockHCPCfg) Token() (*oauth2.Token, error) { | ||
| return &oauth2.Token{ | ||
| AccessToken: "test-token", | ||
| }, nil | ||
| } | ||
|
|
||
| func (m *mockHCPCfg) APITLSConfig() *tls.Config { return nil } | ||
|
|
||
| func (m *mockHCPCfg) SCADAAddress() string { return "" } | ||
|
|
||
| func (m *mockHCPCfg) SCADATLSConfig() *tls.Config { return &tls.Config{} } | ||
|
|
||
| func (m *mockHCPCfg) APIAddress() string { return "" } | ||
|
|
||
| func (m *mockHCPCfg) PortalURL() *url.URL { return &url.URL{} } | ||
|
|
||
| type MockCloudCfg struct{} | ||
|
|
||
| func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { | ||
| return &mockHCPCfg{}, nil | ||
| } | ||
|
|
||
| type MockErrCloudCfg struct{} | ||
|
|
||
| func (m MockErrCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { | ||
| return nil, errors.New("test bad HCP config") | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.