From d7f5f5827e3cf89c608dff69111d7fa2890ab10a Mon Sep 17 00:00:00 2001 From: Saylor Berman Date: Tue, 11 Feb 2025 09:58:24 -0700 Subject: [PATCH] Support leader election --- internal/mode/static/manager.go | 5 +- internal/mode/static/provisioner/handler.go | 8 ++- .../mode/static/provisioner/provisioner.go | 71 ++++++++++++++++--- internal/mode/static/provisioner/store.go | 4 ++ 4 files changed, 77 insertions(+), 11 deletions(-) diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index 3c7fde6b74..119c49a8c3 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -199,7 +199,7 @@ func StartManager(cfg config.Config) error { return fmt.Errorf("cannot register grpc server: %w", err) } - prov, provLoop, err := provisioner.NewNginxProvisioner( + nginxProvisioner, provLoop, err := provisioner.NewNginxProvisioner( ctx, mgr, provisioner.Config{ @@ -223,7 +223,7 @@ func StartManager(cfg config.Config) error { eventHandler := newEventHandlerImpl(eventHandlerConfig{ ctx: ctx, nginxUpdater: nginxUpdater, - nginxProvisioner: prov, + nginxProvisioner: nginxProvisioner, metricsCollector: handlerCollector, statusUpdater: groupStatusUpdater, processor: processor, @@ -266,6 +266,7 @@ func StartManager(cfg config.Config) error { if err = mgr.Add(runnables.NewCallFunctionsAfterBecameLeader([]func(context.Context){ groupStatusUpdater.Enable, + nginxProvisioner.Enable, healthChecker.setAsLeader, eventHandler.eventHandlerEnable, })); err != nil { diff --git a/internal/mode/static/provisioner/handler.go b/internal/mode/static/provisioner/handler.go index fd71ad9ffa..405b670c18 100644 --- a/internal/mode/static/provisioner/handler.go +++ b/internal/mode/static/provisioner/handler.go @@ -106,7 +106,7 @@ func (h *eventHandler) HandleEventBatch(ctx context.Context, logger logr.Logger, } } -// updateOrDeletResources ensures that nginx resources are either: +// updateOrDeleteResources ensures that nginx resources are either: // - deleted if the Gateway no longer exists (this is for when the controller first starts up) // - are updated to the proper state in case a user makes a change directly to the resource. func (h *eventHandler) updateOrDeleteResources( @@ -115,6 +115,12 @@ func (h *eventHandler) updateOrDeleteResources( gatewayNSName types.NamespacedName, ) error { if gw := h.store.getGateway(gatewayNSName); gw == nil { + if !h.provisioner.isLeader() { + h.provisioner.setResourceToDelete(gatewayNSName) + + return nil + } + if err := h.provisioner.deprovisionNginx(ctx, gatewayNSName); err != nil { return fmt.Errorf("error deprovisioning nginx resources: %w", err) } diff --git a/internal/mode/static/provisioner/provisioner.go b/internal/mode/static/provisioner/provisioner.go index e329f82f9f..a505cf90ab 100644 --- a/internal/mode/static/provisioner/provisioner.go +++ b/internal/mode/static/provisioner/provisioner.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/go-logr/logr" @@ -51,10 +52,16 @@ type Config struct { // NginxProvisioner handles provisioning nginx kubernetes resources. type NginxProvisioner struct { - store *store - k8sClient client.Client - baseLabelSelector metav1.LabelSelector - cfg Config + store *store + k8sClient client.Client + // resourcesToDeleteOnStartup contains a list of Gateway names that no longer exist + // but have nginx resources tied to them that need to be deleted. + resourcesToDeleteOnStartup []types.NamespacedName + baseLabelSelector metav1.LabelSelector + cfg Config + leader bool + + lock sync.RWMutex } // NewNginxProvisioner returns a new instance of a Provisioner that will deploy nginx resources. @@ -76,10 +83,11 @@ func NewNginxProvisioner( } provisioner := &NginxProvisioner{ - k8sClient: mgr.GetClient(), - store: store, - baseLabelSelector: selector, - cfg: cfg, + k8sClient: mgr.GetClient(), + store: store, + baseLabelSelector: selector, + resourcesToDeleteOnStartup: []types.NamespacedName{}, + cfg: cfg, } handler, err := newEventHandler(store, provisioner, selector, cfg.GCName) @@ -95,6 +103,42 @@ func NewNginxProvisioner( return provisioner, eventLoop, nil } +// Enable is called when the Pod becomes leader and allows the provisioner to manage resources. +func (p *NginxProvisioner) Enable(ctx context.Context) { + p.lock.Lock() + p.leader = true + p.lock.Unlock() + + p.lock.RLock() + for _, gatewayNSName := range p.resourcesToDeleteOnStartup { + if err := p.deprovisionNginx(ctx, gatewayNSName); err != nil { + p.cfg.Logger.Error(err, "error deprovisioning nginx resources on startup") + } + } + p.lock.RUnlock() + + p.lock.Lock() + p.resourcesToDeleteOnStartup = []types.NamespacedName{} + p.lock.Unlock() +} + +// isLeader returns whether or not this provisioner is the leader. +func (p *NginxProvisioner) isLeader() bool { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.leader +} + +// setResourceToDelete is called when there are resources to delete, but this pod is not leader. +// Once it becomes leader, it will delete those resources. +func (p *NginxProvisioner) setResourceToDelete(gatewayNSName types.NamespacedName) { + p.lock.Lock() + defer p.lock.Unlock() + + p.resourcesToDeleteOnStartup = append(p.resourcesToDeleteOnStartup, gatewayNSName) +} + //nolint:gocyclo // will refactor at some point func (p *NginxProvisioner) provisionNginx( ctx context.Context, @@ -102,6 +146,10 @@ func (p *NginxProvisioner) provisionNginx( gateway *gatewayv1.Gateway, nProxyCfg *graph.EffectiveNginxProxy, ) error { + if !p.isLeader() { + return nil + } + objects := p.buildNginxResourceObjects(resourceName, gateway, nProxyCfg) p.cfg.Logger.Info( @@ -208,6 +256,9 @@ func (p *NginxProvisioner) reprovisionNginx( gateway *gatewayv1.Gateway, nProxyCfg *graph.EffectiveNginxProxy, ) error { + if !p.isLeader() { + return nil + } objects := p.buildNginxResourceObjects(resourceName, gateway, nProxyCfg) p.cfg.Logger.Info( @@ -236,6 +287,10 @@ func (p *NginxProvisioner) reprovisionNginx( } func (p *NginxProvisioner) deprovisionNginx(ctx context.Context, gatewayNSName types.NamespacedName) error { + if !p.isLeader() { + return nil + } + p.cfg.Logger.Info( "Removing nginx resources for Gateway", "name", gatewayNSName.Name, diff --git a/internal/mode/static/provisioner/store.go b/internal/mode/static/provisioner/store.go index 5be8417c63..bf78ee21c0 100644 --- a/internal/mode/static/provisioner/store.go +++ b/internal/mode/static/provisioner/store.go @@ -130,6 +130,10 @@ func (s *store) registerResourceInGatewayConfig(gatewayNSName types.NamespacedNa } func gatewayChanged(original, updated *graph.Gateway) bool { + if original == nil { + return true + } + if original.Valid != updated.Valid { return true }