diff --git a/agent/agent.go b/agent/agent.go index e3af2f1c982..29a4b868638 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -653,46 +653,11 @@ func (a *Agent) Start(ctx context.Context) error { return fmt.Errorf("failed to start Consul enterprise component: %v", err) } - // Create proxy config manager now because it is a dependency of creating the proxyWatcher - // which will be passed to consul.NewServer so that it is then passed to the - // controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode. - var intentionDefaultAllow bool - switch a.config.ACLResolverSettings.ACLDefaultPolicy { - case "allow": - intentionDefaultAllow = true - case "deny": - intentionDefaultAllow = false - default: - return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy) - } + // proxyTracker will be used in the creation of the XDS server and also + // in the registration of the v2 xds controller + var proxyTracker *proxytracker.ProxyTracker - go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) - - // Start the proxy config manager. - a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ - DataSources: a.proxyDataSources(), - Logger: a.logger.Named(logging.ProxyConfig), - Source: &structs.QuerySource{ - Datacenter: a.config.Datacenter, - Segment: a.config.SegmentName, - Node: a.config.NodeName, - NodePartition: a.config.PartitionOrEmpty(), - }, - DNSConfig: proxycfg.DNSConfig{ - Domain: a.config.DNSDomain, - AltDomain: a.config.DNSAltDomain, - }, - TLSConfigurator: a.tlsConfigurator, - IntentionDefaultAllow: intentionDefaultAllow, - UpdateRateLimit: a.config.XDSUpdateRateLimit, - }) - if err != nil { - return err - } - - // proxyWatcher will be used in the creation of the XDS server and also - // in the registration of the xds controller. - proxyWatcher := a.getProxyWatcher() + var consulServer *consul.Server // Setup either the client or the server. if c.ServerMode { @@ -727,16 +692,18 @@ func (a *Agent) Start(ctx context.Context) error { incomingRPCLimiter, ) - var pt *proxytracker.ProxyTracker if a.useV2Resources() { - pt = proxyWatcher.(*proxytracker.ProxyTracker) + proxyTracker = proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{ + Logger: a.logger.Named("proxy-tracker"), + SessionLimiter: a.baseDeps.XDSStreamLimiter, + }) } - server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt) + consulServer, err = consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, proxyTracker) if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } - incomingRPCLimiter.Register(server) - a.delegate = server + incomingRPCLimiter.Register(consulServer) + a.delegate = consulServer if a.config.PeeringEnabled && a.config.ConnectEnabled { d := servercert.Deps{ @@ -746,7 +713,7 @@ func (a *Agent) Start(ctx context.Context) error { ACLsEnabled: a.config.ACLsEnabled, }, LeafCertManager: a.leafCertManager, - GetStore: func() servercert.Store { return server.FSM().State() }, + GetStore: func() servercert.Store { return consulServer.FSM().State() }, TLSConfigurator: a.tlsConfigurator, } a.certManager = servercert.NewCertManager(d) @@ -798,6 +765,40 @@ func (a *Agent) Start(ctx context.Context) error { return err } + var intentionDefaultAllow bool + switch a.config.ACLResolverSettings.ACLDefaultPolicy { + case "allow": + intentionDefaultAllow = true + case "deny": + intentionDefaultAllow = false + default: + return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy) + } + + go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) + + // Start the proxy config manager. + a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ + DataSources: a.proxyDataSources(consulServer), + Logger: a.logger.Named(logging.ProxyConfig), + Source: &structs.QuerySource{ + Datacenter: a.config.Datacenter, + Segment: a.config.SegmentName, + Node: a.config.NodeName, + NodePartition: a.config.PartitionOrEmpty(), + }, + DNSConfig: proxycfg.DNSConfig{ + Domain: a.config.DNSDomain, + AltDomain: a.config.DNSAltDomain, + }, + TLSConfigurator: a.tlsConfigurator, + IntentionDefaultAllow: intentionDefaultAllow, + UpdateRateLimit: a.config.XDSUpdateRateLimit, + }) + if err != nil { + return err + } + go localproxycfg.Sync( &lib.StopChannelContext{StopCh: a.shutdownCh}, localproxycfg.SyncConfig{ @@ -850,7 +851,7 @@ func (a *Agent) Start(ctx context.Context) error { } // Start grpc and grpc_tls servers. - if err := a.listenAndServeGRPC(proxyWatcher); err != nil { + if err := a.listenAndServeGRPC(proxyTracker, consulServer); err != nil { return err } @@ -930,43 +931,35 @@ func (a *Agent) useV2Resources() bool { return false } -// getProxyWatcher returns the proper implementation of the ProxyWatcher interface. -// It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise, -// it will return a ConfigSource. -func (a *Agent) getProxyWatcher() xds.ProxyWatcher { - if a.useV2Resources() { - a.logger.Trace("returning proxyTracker for getProxyWatcher") - return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{ - Logger: a.logger.Named("proxy-tracker"), - SessionLimiter: a.baseDeps.XDSStreamLimiter, - }) - } else { - a.logger.Trace("returning configSource for getProxyWatcher") - return localproxycfg.NewConfigSource(a.proxyConfig) - } -} - // configureXDSServer configures an XDS server with the proper implementation of // the PRoxyWatcher interface and registers the XDS server with Consul's // external facing GRPC server. -func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) { +func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher, server *consul.Server) { // TODO(agentless): rather than asserting the concrete type of delegate, we // should add a method to the Delegate interface to build a ConfigSource. - if server, ok := a.delegate.(*consul.Server); ok { - catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{ - NodeName: a.config.NodeName, - LocalState: a.State, - LocalConfigSource: proxyWatcher, - Manager: a.proxyConfig, - GetStore: func() catalogproxycfg.Store { return server.FSM().State() }, - Logger: a.proxyConfig.Logger.Named("server-catalog"), - SessionLimiter: a.baseDeps.XDSStreamLimiter, - }) - go func() { - <-a.shutdownCh - catalogCfg.Shutdown() - }() - proxyWatcher = catalogCfg + if server != nil { + switch proxyWatcher.(type) { + case *proxytracker.ProxyTracker: + go func() { + <-a.shutdownCh + proxyWatcher.(*proxytracker.ProxyTracker).Shutdown() + }() + default: + catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{ + NodeName: a.config.NodeName, + LocalState: a.State, + LocalConfigSource: proxyWatcher, + Manager: a.proxyConfig, + GetStore: func() catalogproxycfg.Store { return server.FSM().State() }, + Logger: a.proxyConfig.Logger.Named("server-catalog"), + SessionLimiter: a.baseDeps.XDSStreamLimiter, + }) + go func() { + <-a.shutdownCh + catalogCfg.Shutdown() + }() + proxyWatcher = catalogCfg + } } a.xdsServer = xds.NewServer( a.config.NodeName, @@ -980,12 +973,19 @@ func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) { a.xdsServer.Register(a.externalGRPCServer) } -func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error { +func (a *Agent) listenAndServeGRPC(proxyTracker *proxytracker.ProxyTracker, server *consul.Server) error { if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 { return nil } - a.configureXDSServer(proxyWatcher) + var proxyWatcher xds.ProxyWatcher + if a.useV2Resources() { + proxyWatcher = proxyTracker + } else { + proxyWatcher = localproxycfg.NewConfigSource(a.proxyConfig) + } + + a.configureXDSServer(proxyWatcher, server) // Attempt to spawn listeners var listeners []net.Listener @@ -4571,7 +4571,7 @@ func (a *Agent) listenerPortLocked(svcID structs.ServiceID, checkID structs.Chec return port, nil } -func (a *Agent) proxyDataSources() proxycfg.DataSources { +func (a *Agent) proxyDataSources(server *consul.Server) proxycfg.DataSources { sources := proxycfg.DataSources{ CARoots: proxycfgglue.CacheCARoots(a.cache), CompiledDiscoveryChain: proxycfgglue.CacheCompiledDiscoveryChain(a.cache), @@ -4598,7 +4598,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { ExportedPeeredServices: proxycfgglue.CacheExportedPeeredServices(a.cache), } - if server, ok := a.delegate.(*consul.Server); ok { + if server != nil { deps := proxycfgglue.ServerDataSourceDeps{ Datacenter: a.config.Datacenter, EventPublisher: a.baseDeps.EventPublisher, diff --git a/agent/agent_test.go b/agent/agent_test.go index c597f1d5d57..96dd1de1255 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -14,11 +14,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/hashicorp/consul/agent/grpc-external/limiter" - "github.com/hashicorp/consul/agent/proxycfg" - "github.com/hashicorp/consul/agent/proxycfg-sources/local" - "github.com/hashicorp/consul/agent/xds" - proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" mathrand "math/rand" "net" "net/http" @@ -27,7 +22,6 @@ import ( "os" "path" "path/filepath" - "reflect" "strconv" "strings" "sync" @@ -6364,73 +6358,6 @@ func TestAgent_checkServerLastSeen(t *testing.T) { }) } -func TestAgent_getProxyWatcher(t *testing.T) { - type testcase struct { - description string - getExperiments func() []string - expectedType xds.ProxyWatcher - } - testscases := []testcase{ - { - description: "config source is returned when api-resources experiment is not configured", - expectedType: &local.ConfigSource{}, - getExperiments: func() []string { - return []string{} - }, - }, - { - description: "proxy tracker is returned when api-resources experiment is configured", - expectedType: &proxytracker.ProxyTracker{}, - getExperiments: func() []string { - return []string{consul.CatalogResourceExperimentName} - }, - }, - } - for _, tc := range testscases { - caConfig := tlsutil.Config{} - tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil)) - require.NoError(t, err) - - bd := BaseDeps{ - Deps: consul.Deps{ - Logger: hclog.NewInterceptLogger(nil), - Tokens: new(token.Store), - TLSConfigurator: tlsConf, - GRPCConnPool: &fakeGRPCConnPool{}, - Registry: resource.NewRegistry(), - }, - RuntimeConfig: &config.RuntimeConfig{ - HTTPAddrs: []net.Addr{ - &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: freeport.GetOne(t)}, - }, - }, - Cache: cache.New(cache.Options{}), - NetRPC: &LazyNetRPC{}, - } - - bd.XDSStreamLimiter = limiter.NewSessionLimiter() - bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{ - CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC), - RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"), - Config: leafcert.Config{}, - }) - - cfg := config.RuntimeConfig{ - BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), - } - bd, err = initEnterpriseBaseDeps(bd, &cfg) - require.NoError(t, err) - - bd.Experiments = tc.getExperiments() - - agent, err := New(bd) - require.NoError(t, err) - agent.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{Logger: bd.Logger, Source: &structs.QuerySource{}}) - require.NoError(t, err) - require.IsTypef(t, tc.expectedType, agent.getProxyWatcher(), fmt.Sprintf("Expected proxyWatcher to be of type %s", reflect.TypeOf(tc.expectedType))) - } - -} func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool { pool := x509.NewCertPool() data, err := os.ReadFile("../test/ca/root.cer")