Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions agent/hcp/client/telemetry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

var (
// defaultMetricFilters is a regex that matches all metric names.
defaultMetricFilters = regexp.MustCompile(".+")
DefaultMetricFilters = regexp.MustCompile(".+")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Export this to use as default in https://github.com/hashicorp/consul/pull/18318/files#diff-9fd2535b13a1b9701661bdd379418cb004a41b5b747ff9911e8cbbffea54b9afR60.

In case the first fetch is not available, this default is set to all metrics.


// Validation errors for AgentTelemetryConfigOK response.
errMissingPayload = errors.New("missing payload")
Expand All @@ -29,6 +29,7 @@ var (
errMissingMetricsConfig = errors.New("missing metrics config")
errInvalidRefreshInterval = errors.New("invalid refresh interval")
errInvalidEndpoint = errors.New("invalid metrics endpoint")
errEmptyEndpoint = errors.New("empty metrics endpoint")
)

// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers
Expand All @@ -43,18 +44,14 @@ type MetricsConfig struct {
Labels map[string]string
Filters *regexp.Regexp
Endpoint *url.URL
Disabled bool
}

// RefreshConfig contains configuration for the periodic fetch of configuration from HCP.
type RefreshConfig struct {
RefreshInterval time.Duration
}

// MetricsEnabled returns true if metrics export is enabled, i.e. a valid metrics endpoint exists.
Copy link
Contributor Author

@Achooo Achooo Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this func and the empty endpoint logic since we now have a disabled flag

func (t *TelemetryConfig) MetricsEnabled() bool {
return t.MetricsConfig.Endpoint != nil
}

// validateAgentTelemetryConfigPayload ensures the returned payload from HCP is valid.
func validateAgentTelemetryConfigPayload(resp *hcptelemetry.AgentTelemetryConfigOK) error {
if resp.Payload == nil {
Expand Down Expand Up @@ -86,7 +83,7 @@ func convertAgentTelemetryResponse(ctx context.Context, resp *hcptelemetry.Agent
telemetryConfig := resp.Payload.TelemetryConfig
metricsEndpoint, err := convertMetricEndpoint(telemetryConfig.Endpoint, telemetryConfig.Metrics.Endpoint)
if err != nil {
return nil, errInvalidEndpoint
return nil, err
}

metricsFilters := convertMetricFilters(ctx, telemetryConfig.Metrics.IncludeList)
Expand All @@ -97,6 +94,7 @@ func convertAgentTelemetryResponse(ctx context.Context, resp *hcptelemetry.Agent
Endpoint: metricsEndpoint,
Labels: metricLabels,
Filters: metricsFilters,
Disabled: telemetryConfig.Metrics.Disabled,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: refreshInterval,
Expand All @@ -114,9 +112,8 @@ func convertMetricEndpoint(telemetryEndpoint string, metricsEndpoint string) (*u
endpoint = metricsEndpoint
}

// If endpoint is empty, server not registered with CCM, no error returned.
if endpoint == "" {
return nil, nil
return nil, errEmptyEndpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like known errors. Good stuff.

}

// Endpoint from CTW has no metrics path, so it must be added.
Expand Down Expand Up @@ -145,15 +142,15 @@ func convertMetricFilters(ctx context.Context, payloadFilters []string) *regexp.

if len(validFilters) == 0 {
logger.Error("no valid filters")
return defaultMetricFilters
return DefaultMetricFilters
}

// Combine the valid regex strings with OR.
finalRegex := strings.Join(validFilters, "|")
composedRegex, err := regexp.Compile(finalRegex)
if err != nil {
logger.Error("failed to compile final regex", "error", err)
return defaultMetricFilters
return DefaultMetricFilters
}

return composedRegex
Expand Down
47 changes: 5 additions & 42 deletions agent/hcp/client/telemetry_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
resp *consul_telemetry_service.AgentTelemetryConfigOK
expectedTelemetryCfg *TelemetryConfig
wantErr error
expectedEnabled bool
}{
"success": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand All @@ -115,34 +114,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"successNoEndpoint": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "",
Labels: map[string]string{"test": "test"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
IncludeList: []string{"test", "consul"},
},
},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "2s",
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: nil,
Labels: map[string]string{"test": "test"},
Filters: validTestFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: false,
},
"successBadFilters": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand All @@ -163,13 +134,12 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
MetricsConfig: &MetricsConfig{
Endpoint: validTestURL,
Labels: map[string]string{"test": "test"},
Filters: defaultMetricFilters,
Filters: DefaultMetricFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"errorsWithInvalidRefreshInterval": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand Down Expand Up @@ -209,7 +179,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
}
require.NoError(t, err)
require.Equal(t, tc.expectedTelemetryCfg, telemetryCfg)
require.Equal(t, tc.expectedEnabled, telemetryCfg.MetricsEnabled())
})
}
}
Expand All @@ -231,10 +200,10 @@ func TestConvertMetricEndpoint(t *testing.T) {
override: "https://override.com",
expected: "https://override.com/v1/metrics",
},
"noErrorWithEmptyEndpoints": {
"errorWithEmptyEndpoints": {
endpoint: "",
override: "",
expected: "",
wantErr: errEmptyEndpoint,
},
"errorWithInvalidURL": {
endpoint: " ",
Expand All @@ -252,12 +221,6 @@ func TestConvertMetricEndpoint(t *testing.T) {
return
}

if tc.expected == "" {
require.Nil(t, u)
require.NoError(t, err)
return
}

require.NotNil(t, u)
require.NoError(t, err)
require.Equal(t, tc.expected, u.String())
Expand All @@ -277,13 +240,13 @@ func TestConvertMetricFilters(t *testing.T) {
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"emptyRegex": {
filters: []string{},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
Expand Down
39 changes: 11 additions & 28 deletions agent/hcp/deps.go
Copy link
Contributor Author

@Achooo Achooo Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire first fetchTelemetryConfig call is removed simplifying this file quite a lot.
The TelemetryConfigProvider will now try to perform this first fetch.

Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ package hcp
import (
"context"
"fmt"
"time"

"github.com/armon/go-metrics"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/hashicorp/go-hclog"
)

// Deps contains the interfaces that the rest of Consul core depends on for HCP integration.
type Deps struct {
Client hcpclient.Client
Client client.Client
Provider scada.Provider
Sink metrics.MetricSink
}
Expand All @@ -27,7 +27,7 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
ctx := context.Background()
ctx = hclog.WithContext(ctx, logger)

client, err := hcpclient.NewClient(cfg)
hcpClient, err := client.NewClient(cfg)
if err != nil {
return Deps{}, fmt.Errorf("failed to init client: %w", err)
}
Expand All @@ -37,50 +37,33 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
return Deps{}, fmt.Errorf("failed to init scada: %w", err)
}

metricsClient, err := hcpclient.NewMetricsClient(ctx, &cfg)
metricsClient, err := client.NewMetricsClient(ctx, &cfg)
if err != nil {
logger.Error("failed to init metrics client", "error", err)
return Deps{}, fmt.Errorf("failed to init metrics client: %w", err)
}

sink, err := sink(ctx, client, metricsClient)
sink, err := sink(ctx, metricsClient, NewHCPProvider(ctx, hcpClient))
if err != nil {
// Do not prevent server start if sink init fails, only log error.
logger.Error("failed to init sink", "error", err)
}

return Deps{
Client: client,
Client: hcpClient,
Provider: provider,
Sink: sink,
}, nil
}

// sink initializes an OTELSink which forwards Consul metrics to HCP.
// The sink is only initialized if the server is registered with the management plane (CCM).
// This step should not block server initialization, errors are returned, only to be logged.
func sink(
ctx context.Context,
hcpClient hcpclient.Client,
metricsClient telemetry.MetricsClient,
cfgProvider *hcpProviderImpl,
) (metrics.MetricSink, error) {
logger := hclog.FromContext(ctx).Named("sink")
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

telemetryCfg, err := hcpClient.FetchTelemetryConfig(reqCtx)
if err != nil {
return nil, fmt.Errorf("failed to fetch telemetry config: %w", err)
}

if !telemetryCfg.MetricsEnabled() {
return nil, nil
}

cfgProvider, err := NewHCPProvider(ctx, hcpClient, telemetryCfg)
if err != nil {
return nil, fmt.Errorf("failed to init config provider: %w", err)
}
logger := hclog.FromContext(ctx)

reader := telemetry.NewOTELReader(metricsClient, cfgProvider)
sinkOpts := &telemetry.OTELSinkOpts{
Expand All @@ -90,7 +73,7 @@ func sink(

sink, err := telemetry.NewOTELSink(ctx, sinkOpts)
if err != nil {
return nil, fmt.Errorf("failed create OTELSink: %w", err)
return nil, fmt.Errorf("failed to create OTELSink: %w", err)
}

logger.Debug("initialized HCP metrics sink")
Expand Down
84 changes: 5 additions & 79 deletions agent/hcp/deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,10 @@ package hcp

import (
"context"
"fmt"
"net/url"
"regexp"
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/telemetry"
)

Expand All @@ -24,79 +18,11 @@ type mockMetricsClient struct {

func TestSink(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
expect func(*client.MockClient)
wantErr string
expectedSink bool
}{
"success": {
expect: func(mockClient *client.MockClient) {
u, _ := url.Parse("https://test.com/v1/metrics")
filters, _ := regexp.Compile("test")
mt := mockTelemetryConfig(1*time.Second, u, filters)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
expectedSink: true,
},
"noSinkWhenFetchTelemetryConfigFails": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed"))
},
wantErr: "failed to fetch telemetry config",
},
"noSinkWhenServerNotRegisteredWithCCM": {
expect: func(mockClient *client.MockClient) {
mt := mockTelemetryConfig(1*time.Second, nil, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
},
"noSinkWhenTelemetryConfigProviderInitFails": {
expect: func(mockClient *client.MockClient) {
u, _ := url.Parse("https://test.com/v1/metrics")
// Bad refresh interval forces ConfigProvider creation failure.
mt := mockTelemetryConfig(0*time.Second, u, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
wantErr: "failed to init config provider",
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
c := client.NewMockClient(t)
mc := mockMetricsClient{}

test.expect(c)
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s, err := sink(ctx, mockMetricsClient{}, &hcpProviderImpl{})

s, err := sink(ctx, c, mc)

if test.wantErr != "" {
require.NotNil(t, err)
require.Contains(t, err.Error(), test.wantErr)
require.Nil(t, s)
return
}

if !test.expectedSink {
require.Nil(t, s)
require.Nil(t, err)
return
}

require.NotNil(t, s)
})
}
}

func mockTelemetryConfig(refreshInterval time.Duration, metricsEndpoint *url.URL, filters *regexp.Regexp) *client.TelemetryConfig {
return &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Endpoint: metricsEndpoint,
Filters: filters,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: refreshInterval,
},
}
require.NotNil(t, s)
require.NoError(t, err)
}
Loading