From 790fa0286504d47423f2024d9ae30057159a4cc1 Mon Sep 17 00:00:00 2001 From: Poonam Jadhav Date: Tue, 18 Apr 2023 11:03:05 -0400 Subject: [PATCH] feat: set up reporting agent (#16991) --- agent/consul/acl_server.go | 2 +- agent/consul/leader.go | 8 +++++- agent/consul/leader_connect.go | 8 +++--- agent/consul/leader_intentions.go | 4 +-- agent/consul/leader_intentions_test.go | 2 +- agent/consul/leader_test.go | 6 ++-- agent/consul/reporting/reporting.go | 38 +++++++++++++++++++++++++ agent/consul/reporting/reporting_oss.go | 21 ++++++++++++++ agent/consul/server.go | 6 ++++ agent/consul/server_oss.go | 6 ++++ agent/consul/system_metadata.go | 4 +-- agent/consul/system_metadata_test.go | 8 +++--- 12 files changed, 95 insertions(+), 18 deletions(-) create mode 100644 agent/consul/reporting/reporting.go create mode 100644 agent/consul/reporting/reporting_oss.go diff --git a/agent/consul/acl_server.go b/agent/consul/acl_server.go index 48e80191e38..5ea8f915cfd 100644 --- a/agent/consul/acl_server.go +++ b/agent/consul/acl_server.go @@ -110,7 +110,7 @@ type serverACLResolverBackend struct { } func (s *serverACLResolverBackend) IsServerManagementToken(token string) bool { - mgmt, err := s.getSystemMetadata(structs.ServerManagementTokenAccessorID) + mgmt, err := s.GetSystemMetadata(structs.ServerManagementTokenAccessorID) if err != nil { s.logger.Debug("failed to fetch server management token: %w", err) return false diff --git a/agent/consul/leader.go b/agent/consul/leader.go index d5eb00fbbfe..76c3380700e 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -338,6 +338,10 @@ func (s *Server) establishLeadership(ctx context.Context) error { s.startLogVerification(ctx) } + if s.config.Reporting.License.Enabled && s.reportingManager != nil { + s.reportingManager.StartReportingAgent() + } + s.logger.Debug("successfully established leadership", "duration", time.Since(start)) return nil } @@ -374,6 +378,8 @@ func (s *Server) revokeLeadership() { s.resetConsistentReadReady() s.autopilot.DisableReconciliation() + + s.reportingManager.StopReportingAgent() } // initializeACLs is used to setup the ACLs if we are the leader @@ -522,7 +528,7 @@ func (s *Server) initializeACLs(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to generate the secret ID for the server management token: %w", err) } - if err := s.setSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil { + if err := s.SetSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil { return fmt.Errorf("failed to persist server management token: %w", err) } diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index 0c57bc13a7f..7166ad75cd6 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -204,7 +204,7 @@ func (s *Server) setVirtualIPFlags() (bool, error) { } func (s *Server) setVirtualIPVersionFlag() (bool, error) { - val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled) + val, err := s.GetSystemMetadata(structs.SystemMetadataVirtualIPsEnabled) if err != nil { return false, err } @@ -217,7 +217,7 @@ func (s *Server) setVirtualIPVersionFlag() (bool, error) { minVirtualIPVersion.String()) } - if err := s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil { + if err := s.SetSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil { return false, nil } @@ -225,7 +225,7 @@ func (s *Server) setVirtualIPVersionFlag() (bool, error) { } func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) { - val, err := s.getSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled) + val, err := s.GetSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled) if err != nil { return false, err } @@ -238,7 +238,7 @@ func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) { minVirtualIPTerminatingGatewayVersion.String()) } - if err := s.setSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil { + if err := s.SetSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil { return false, nil } diff --git a/agent/consul/leader_intentions.go b/agent/consul/leader_intentions.go index 9adc26795de..2894acd5846 100644 --- a/agent/consul/leader_intentions.go +++ b/agent/consul/leader_intentions.go @@ -22,7 +22,7 @@ func (s *Server) startIntentionConfigEntryMigration(ctx context.Context) error { // Check for the system metadata first, as that's the most trustworthy in // both the primary and secondaries. - intentionFormat, err := s.getSystemMetadata(structs.SystemMetadataIntentionFormatKey) + intentionFormat, err := s.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey) if err != nil { return err } @@ -240,7 +240,7 @@ func (s *Server) legacyIntentionMigrationInSecondaryDC(ctx context.Context) erro // error. for { // Check for the system metadata first, as that's the most trustworthy. - intentionFormat, err := s.getSystemMetadata(structs.SystemMetadataIntentionFormatKey) + intentionFormat, err := s.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey) if err != nil { return err } diff --git a/agent/consul/leader_intentions_test.go b/agent/consul/leader_intentions_test.go index e0dcb8b3d10..89dbc9d6256 100644 --- a/agent/consul/leader_intentions_test.go +++ b/agent/consul/leader_intentions_test.go @@ -520,7 +520,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) { // Wait until the migration routine is complete. retry.Run(t, func(r *retry.R) { - intentionFormat, err := s1.getSystemMetadata(structs.SystemMetadataIntentionFormatKey) + intentionFormat, err := s1.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey) require.NoError(r, err) if intentionFormat != structs.SystemMetadataIntentionFormatConfigValue { r.Fatal("intention migration is not yet complete") diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index e8bcb39a653..fe8e1b678a7 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -1300,7 +1300,7 @@ func TestLeader_ACL_Initialization(t *testing.T) { require.NoError(t, err) require.NotNil(t, policy) - serverToken, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID) + serverToken, err := s1.GetSystemMetadata(structs.ServerManagementTokenAccessorID) require.NoError(t, err) require.NotEmpty(t, serverToken) @@ -1338,14 +1338,14 @@ func TestLeader_ACL_Initialization_SecondaryDC(t *testing.T) { testrpc.WaitForTestAgent(t, s2.RPC, "dc2") // Check dc1's management token - serverToken1, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID) + serverToken1, err := s1.GetSystemMetadata(structs.ServerManagementTokenAccessorID) require.NoError(t, err) require.NotEmpty(t, serverToken1) _, err = uuid.ParseUUID(serverToken1) require.NoError(t, err) // Check dc2's management token - serverToken2, err := s2.getSystemMetadata(structs.ServerManagementTokenAccessorID) + serverToken2, err := s2.GetSystemMetadata(structs.ServerManagementTokenAccessorID) require.NoError(t, err) require.NotEmpty(t, serverToken2) _, err = uuid.ParseUUID(serverToken2) diff --git a/agent/consul/reporting/reporting.go b/agent/consul/reporting/reporting.go new file mode 100644 index 00000000000..982ac0225d5 --- /dev/null +++ b/agent/consul/reporting/reporting.go @@ -0,0 +1,38 @@ +package reporting + +import ( + "sync" + + "github.com/hashicorp/go-hclog" +) + +type ReportingManager struct { + logger hclog.Logger + server ServerDelegate + EntDeps + sync.RWMutex +} + +const ( + SystemMetadataReportingProcessID = "reporting-process-id" +) + +//go:generate mockery --name ServerDelegate --inpackage +type ServerDelegate interface { + GetSystemMetadata(key string) (string, error) + SetSystemMetadataKey(key, val string) error + IsLeader() bool +} + +func NewReportingManager(logger hclog.Logger, deps EntDeps, server ServerDelegate) *ReportingManager { + rm := &ReportingManager{ + logger: logger.Named("reporting"), + server: server, + } + err := rm.initEnterpriseReporting(deps) + if err != nil { + rm.logger.Error("Error initializing reporting manager", "error", err) + return nil + } + return rm +} diff --git a/agent/consul/reporting/reporting_oss.go b/agent/consul/reporting/reporting_oss.go new file mode 100644 index 00000000000..67355990250 --- /dev/null +++ b/agent/consul/reporting/reporting_oss.go @@ -0,0 +1,21 @@ +//go:build !consulent +// +build !consulent + +package reporting + +type EntDeps struct{} + +func (rm *ReportingManager) initEnterpriseReporting(entDeps EntDeps) error { + // no op + return nil +} + +func (rm *ReportingManager) StartReportingAgent() error { + // no op + return nil +} + +func (rm *ReportingManager) StopReportingAgent() error { + // no op + return nil +} diff --git a/agent/consul/server.go b/agent/consul/server.go index d2c8c3cfe33..b3187ba1b0e 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -39,6 +39,7 @@ import ( "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/multilimiter" rpcRate "github.com/hashicorp/consul/agent/consul/rate" + "github.com/hashicorp/consul/agent/consul/reporting" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/usagemetrics" @@ -409,6 +410,9 @@ type Server struct { // embedded struct to hold all the enterprise specific data EnterpriseServer operatorServer *operator.Server + + // handles metrics reporting to HashiCorp + reportingManager *reporting.ReportingManager } type connHandler interface { @@ -728,6 +732,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval) go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + s.reportingManager = reporting.NewReportingManager(s.logger, getEnterpriseReportingDeps(flat), s) + // Initialize external gRPC server - register services on external gRPC server. s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{ ACLsEnabled: s.config.ACLsEnabled, diff --git a/agent/consul/server_oss.go b/agent/consul/server_oss.go index 67731e45019..5e285c96353 100644 --- a/agent/consul/server_oss.go +++ b/agent/consul/server_oss.go @@ -15,6 +15,7 @@ import ( "google.golang.org/grpc" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/reporting" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" ) @@ -178,3 +179,8 @@ func addSerfMetricsLabels(conf *serf.Config, wan bool, segment string, partition func (s *Server) updateReportingConfig(config ReloadableConfig) { // no-op } + +func getEnterpriseReportingDeps(deps Deps) reporting.EntDeps { + // no-op + return reporting.EntDeps{} +} diff --git a/agent/consul/system_metadata.go b/agent/consul/system_metadata.go index f1603225738..5ceb5fc5e71 100644 --- a/agent/consul/system_metadata.go +++ b/agent/consul/system_metadata.go @@ -4,7 +4,7 @@ import ( "github.com/hashicorp/consul/agent/structs" ) -func (s *Server) getSystemMetadata(key string) (string, error) { +func (s *Server) GetSystemMetadata(key string) (string, error) { _, entry, err := s.fsm.State().SystemMetadataGet(nil, key) if err != nil { return "", err @@ -16,7 +16,7 @@ func (s *Server) getSystemMetadata(key string) (string, error) { return entry.Value, nil } -func (s *Server) setSystemMetadataKey(key, val string) error { +func (s *Server) SetSystemMetadataKey(key, val string) error { args := &structs.SystemMetadataRequest{ Op: structs.SystemMetadataUpsert, Entry: &structs.SystemMetadataEntry{Key: key, Value: val}, diff --git a/agent/consul/system_metadata_test.go b/agent/consul/system_metadata_test.go index 633aa6bc407..b17790d2246 100644 --- a/agent/consul/system_metadata_test.go +++ b/agent/consul/system_metadata_test.go @@ -39,9 +39,9 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) { require.Len(t, entries, 0) // Create 3 - require.NoError(t, srv.setSystemMetadataKey("key1", "val1")) - require.NoError(t, srv.setSystemMetadataKey("key2", "val2")) - require.NoError(t, srv.setSystemMetadataKey("key3", "")) + require.NoError(t, srv.SetSystemMetadataKey("key1", "val1")) + require.NoError(t, srv.SetSystemMetadataKey("key2", "val2")) + require.NoError(t, srv.SetSystemMetadataKey("key3", "")) mapify := func(entries []*structs.SystemMetadataEntry) map[string]string { m := make(map[string]string) @@ -62,7 +62,7 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) { }, mapify(entries)) // Update one and delete one. - require.NoError(t, srv.setSystemMetadataKey("key3", "val3")) + require.NoError(t, srv.SetSystemMetadataKey("key3", "val3")) require.NoError(t, srv.deleteSystemMetadataKey("key1")) _, entries, err = state.SystemMetadataList(nil)