Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17460.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
hcp: Add new metrics sink to collect, aggregate and export server metrics to HCP in OTEL format.
```
3 changes: 2 additions & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
agentgrpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/services/subscribe"
"github.com/hashicorp/consul/agent/hcp"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
logdrop "github.com/hashicorp/consul/agent/log-drop"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
Expand Down Expand Up @@ -1882,7 +1883,7 @@ func (s *Server) trackLeaderChanges() {
// hcpServerStatus is the callback used by the HCP manager to emit status updates to the HashiCorp Cloud Platform when
// enabled.
func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback {
return func(ctx context.Context) (status hcp.ServerStatus, err error) {
return func(ctx context.Context) (status hcpclient.ServerStatus, err error) {
status.Name = s.config.NodeName
status.ID = string(s.config.NodeID)
status.Version = cslversion.GetHumanVersion()
Expand Down
9 changes: 4 additions & 5 deletions agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ import (
"golang.org/x/time/rate"
"google.golang.org/grpc"

"github.com/hashicorp/consul/agent/hcp"

"github.com/hashicorp/consul-net-rpc/net/rpc"

"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/multilimiter"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
external "github.com/hashicorp/consul/agent/grpc-external"
grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -2075,10 +2074,10 @@ func TestServer_hcpManager(t *testing.T) {
_, conf1 := testServerConfig(t)
conf1.BootstrapExpect = 1
conf1.RPCAdvertise = &net.TCPAddr{IP: []byte{127, 0, 0, 2}, Port: conf1.RPCAddr.Port}
hcp1 := hcp.NewMockClient(t)
hcp1.EXPECT().PushServerStatus(mock.Anything, mock.MatchedBy(func(status *hcp.ServerStatus) bool {
hcp1 := hcpclient.NewMockClient(t)
hcp1.EXPECT().PushServerStatus(mock.Anything, mock.MatchedBy(func(status *hcpclient.ServerStatus) bool {
return status.ID == string(conf1.NodeID)
})).Run(func(ctx context.Context, status *hcp.ServerStatus) {
})).Run(func(ctx context.Context, status *hcpclient.ServerStatus) {
require.Equal(t, status.LanAddress, "127.0.0.2")
}).Call.Return(nil)

Expand Down
10 changes: 5 additions & 5 deletions agent/hcp/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/hcp"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-uuid"
Expand Down Expand Up @@ -62,7 +62,7 @@ type RawBootstrapConfig struct {
// fetch from HCP servers if the local data is incomplete.
// It must be passed a (CLI) UI implementation so it can deliver progress
// updates to the user, for example if it is waiting to retry for a long period.
func LoadConfig(ctx context.Context, client hcp.Client, dataDir string, loader ConfigLoader, ui UI) (ConfigLoader, error) {
func LoadConfig(ctx context.Context, client hcpclient.Client, dataDir string, loader ConfigLoader, ui UI) (ConfigLoader, error) {
ui.Output("Loading configuration from HCP")

// See if we have existing config on disk
Expand Down Expand Up @@ -178,14 +178,14 @@ func finalizeRuntimeConfig(rc *config.RuntimeConfig, cfg *RawBootstrapConfig) {

// fetchBootstrapConfig will fetch boostrap configuration from remote servers and persist it to disk.
// It will retry until successful or a terminal error condition is found (e.g. permission denied).
func fetchBootstrapConfig(ctx context.Context, client hcp.Client, dataDir string, ui UI) (*RawBootstrapConfig, error) {
func fetchBootstrapConfig(ctx context.Context, client hcpclient.Client, dataDir string, ui UI) (*RawBootstrapConfig, error) {
w := retry.Waiter{
MinWait: 1 * time.Second,
MaxWait: 5 * time.Minute,
Jitter: retry.NewJitter(50),
}

var bsCfg *hcp.BootstrapConfig
var bsCfg *hcpclient.BootstrapConfig
for {
// Note we don't want to shadow `ctx` here since we need that for the Wait
// below.
Expand Down Expand Up @@ -222,7 +222,7 @@ func fetchBootstrapConfig(ctx context.Context, client hcp.Client, dataDir string
// persistAndProcessConfig is called when we receive data from CCM.
// We validate and persist everything that was received, then also update
// the JSON config as needed.
func persistAndProcessConfig(dataDir string, devMode bool, bsCfg *hcp.BootstrapConfig) (string, error) {
func persistAndProcessConfig(dataDir string, devMode bool, bsCfg *hcpclient.BootstrapConfig) (string, error) {
if devMode {
// Agent in dev mode, we still need somewhere to persist the certs
// temporarily though to be able to start up at all since we don't support
Expand Down
3 changes: 2 additions & 1 deletion agent/hcp/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/hcp"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-uuid"
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestLoadConfig_Persistence(t *testing.T) {

// Override the client TLS config so that the test server can be trusted.
initial.RuntimeConfig.Cloud.WithTLSConfig(clientTLS)
client, err := hcp.NewClient(initial.RuntimeConfig.Cloud)
client, err := hcpclient.NewClient(initial.RuntimeConfig.Cloud)
require.NoError(t, err)

loader, err := LoadConfig(context.Background(), client, initial.RuntimeConfig.DataDir, baseLoader, ui)
Expand Down
81 changes: 80 additions & 1 deletion agent/hcp/client.go → agent/hcp/client/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package hcp
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package client

import (
"context"
Expand All @@ -8,6 +11,8 @@ import (

httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"

hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
hcpgnm "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/client/global_network_manager_service"
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
"github.com/hashicorp/hcp-sdk-go/httpclient"
Expand All @@ -17,15 +22,34 @@ import (
"github.com/hashicorp/consul/version"
)

// metricsGatewayPath is the default path for metrics export request on the Telemetry Gateway.
const metricsGatewayPath = "/v1/metrics"

// Client interface exposes HCP operations that can be invoked by Consul
//
//go:generate mockery --name Client --with-expecter --inpackage
type Client interface {
FetchBootstrap(ctx context.Context) (*BootstrapConfig, error)
FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error)
PushServerStatus(ctx context.Context, status *ServerStatus) error
DiscoverServers(ctx context.Context) ([]string, error)
}

// MetricsConfig holds metrics specific configuration for the TelemetryConfig.
// The endpoint field overrides the TelemetryConfig endpoint.
type MetricsConfig struct {
Filters []string
Endpoint string
}

// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers
// to the HCP Telemetry gateway.
type TelemetryConfig struct {
Endpoint string
Labels map[string]string
MetricsConfig *MetricsConfig
}

type BootstrapConfig struct {
Name string
BootstrapExpect int
Expand All @@ -41,6 +65,7 @@ type hcpClient struct {
hc *httptransport.Runtime
cfg config.CloudConfig
gnm hcpgnm.ClientService
tgw hcptelemetry.ClientService
resource resource.Resource
}

Expand All @@ -61,6 +86,8 @@ func NewClient(cfg config.CloudConfig) (Client, error) {
}

client.gnm = hcpgnm.New(client.hc, nil)
client.tgw = hcptelemetry.New(client.hc, nil)

return client, nil
}

Expand All @@ -76,6 +103,29 @@ func httpClient(c config.CloudConfig) (*httptransport.Runtime, error) {
})
}

// FetchTelemetryConfig obtains telemetry configuration from the Telemetry Gateway.
func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error) {
params := hcptelemetry.NewAgentTelemetryConfigParamsWithContext(ctx).
WithLocationOrganizationID(c.resource.Organization).
WithLocationProjectID(c.resource.Project).
WithClusterID(c.resource.ID)

resp, err := c.tgw.AgentTelemetryConfig(params, nil)
if err != nil {
return nil, err
}

payloadConfig := resp.Payload.TelemetryConfig
return &TelemetryConfig{
Endpoint: payloadConfig.Endpoint,
Labels: payloadConfig.Labels,
MetricsConfig: &MetricsConfig{
Filters: payloadConfig.Metrics.IncludeList,
Endpoint: payloadConfig.Metrics.Endpoint,
},
}, nil
}

func (c *hcpClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) {
version := version.GetHumanVersion()
params := hcpgnm.NewAgentBootstrapConfigParamsWithContext(ctx).
Expand Down Expand Up @@ -230,3 +280,32 @@ func (c *hcpClient) DiscoverServers(ctx context.Context) ([]string, error) {

return servers, nil
}

// Enabled verifies if telemetry is enabled by ensuring a valid endpoint has been retrieved.
// It returns full metrics endpoint and true if a valid endpoint was obtained.
func (t *TelemetryConfig) Enabled() (string, bool) {
endpoint := t.Endpoint
if override := t.MetricsConfig.Endpoint; override != "" {
endpoint = override
}

if endpoint == "" {
return "", false
}

// The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added.
return endpoint + metricsGatewayPath, true
}

// DefaultLabels returns a set of <key, value> string pairs that must be added as attributes to all exported telemetry data.
func (t *TelemetryConfig) DefaultLabels(nodeID string) map[string]string {
labels := map[string]string{
"node_id": nodeID, // used to delineate Consul nodes in graphs
}

for k, v := range t.Labels {
labels[k] = v
}

return labels
}
75 changes: 75 additions & 0 deletions agent/hcp/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package client

import (
"context"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestFetchTelemetryConfig(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
metricsEndpoint string
expect func(*MockClient)
disabled bool
}{
"success": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"overrideMetricsEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "https://test.com",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"disabledWithEmptyEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
disabled: true,
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()

mock := NewMockClient(t)
test.expect(mock)

telemetryCfg, err := mock.FetchTelemetryConfig(context.Background())
require.NoError(t, err)

if test.disabled {
endpoint, ok := telemetryCfg.Enabled()
require.False(t, ok)
require.Empty(t, endpoint)
return
}

endpoint, ok := telemetryCfg.Enabled()

require.True(t, ok)
require.Equal(t, test.metricsEndpoint, endpoint)
})
}
}
Loading