Skip to content

Commit

Permalink
Support leader election
Browse files Browse the repository at this point in the history
  • Loading branch information
sjberman committed Feb 11, 2025
1 parent 8265e5d commit d7f5f58
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 11 deletions.
5 changes: 3 additions & 2 deletions internal/mode/static/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func StartManager(cfg config.Config) error {
return fmt.Errorf("cannot register grpc server: %w", err)
}

prov, provLoop, err := provisioner.NewNginxProvisioner(
nginxProvisioner, provLoop, err := provisioner.NewNginxProvisioner(
ctx,
mgr,
provisioner.Config{
Expand All @@ -223,7 +223,7 @@ func StartManager(cfg config.Config) error {
eventHandler := newEventHandlerImpl(eventHandlerConfig{
ctx: ctx,
nginxUpdater: nginxUpdater,
nginxProvisioner: prov,
nginxProvisioner: nginxProvisioner,
metricsCollector: handlerCollector,
statusUpdater: groupStatusUpdater,
processor: processor,
Expand Down Expand Up @@ -266,6 +266,7 @@ func StartManager(cfg config.Config) error {

if err = mgr.Add(runnables.NewCallFunctionsAfterBecameLeader([]func(context.Context){
groupStatusUpdater.Enable,
nginxProvisioner.Enable,
healthChecker.setAsLeader,
eventHandler.eventHandlerEnable,
})); err != nil {
Expand Down
8 changes: 7 additions & 1 deletion internal/mode/static/provisioner/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (h *eventHandler) HandleEventBatch(ctx context.Context, logger logr.Logger,
}
}

// updateOrDeletResources ensures that nginx resources are either:
// updateOrDeleteResources ensures that nginx resources are either:
// - deleted if the Gateway no longer exists (this is for when the controller first starts up)
// - are updated to the proper state in case a user makes a change directly to the resource.
func (h *eventHandler) updateOrDeleteResources(
Expand All @@ -115,6 +115,12 @@ func (h *eventHandler) updateOrDeleteResources(
gatewayNSName types.NamespacedName,
) error {
if gw := h.store.getGateway(gatewayNSName); gw == nil {
if !h.provisioner.isLeader() {
h.provisioner.setResourceToDelete(gatewayNSName)

return nil
}

if err := h.provisioner.deprovisionNginx(ctx, gatewayNSName); err != nil {
return fmt.Errorf("error deprovisioning nginx resources: %w", err)
}
Expand Down
71 changes: 63 additions & 8 deletions internal/mode/static/provisioner/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -51,10 +52,16 @@ type Config struct {

// NginxProvisioner handles provisioning nginx kubernetes resources.
type NginxProvisioner struct {
store *store
k8sClient client.Client
baseLabelSelector metav1.LabelSelector
cfg Config
store *store
k8sClient client.Client
// resourcesToDeleteOnStartup contains a list of Gateway names that no longer exist
// but have nginx resources tied to them that need to be deleted.
resourcesToDeleteOnStartup []types.NamespacedName
baseLabelSelector metav1.LabelSelector
cfg Config
leader bool

lock sync.RWMutex
}

// NewNginxProvisioner returns a new instance of a Provisioner that will deploy nginx resources.
Expand All @@ -76,10 +83,11 @@ func NewNginxProvisioner(
}

provisioner := &NginxProvisioner{
k8sClient: mgr.GetClient(),
store: store,
baseLabelSelector: selector,
cfg: cfg,
k8sClient: mgr.GetClient(),
store: store,
baseLabelSelector: selector,
resourcesToDeleteOnStartup: []types.NamespacedName{},
cfg: cfg,
}

handler, err := newEventHandler(store, provisioner, selector, cfg.GCName)
Expand All @@ -95,13 +103,53 @@ func NewNginxProvisioner(
return provisioner, eventLoop, nil
}

// Enable is called when the Pod becomes leader and allows the provisioner to manage resources.
func (p *NginxProvisioner) Enable(ctx context.Context) {
p.lock.Lock()
p.leader = true
p.lock.Unlock()

p.lock.RLock()
for _, gatewayNSName := range p.resourcesToDeleteOnStartup {
if err := p.deprovisionNginx(ctx, gatewayNSName); err != nil {
p.cfg.Logger.Error(err, "error deprovisioning nginx resources on startup")
}
}
p.lock.RUnlock()

p.lock.Lock()
p.resourcesToDeleteOnStartup = []types.NamespacedName{}
p.lock.Unlock()
}

// isLeader returns whether or not this provisioner is the leader.
func (p *NginxProvisioner) isLeader() bool {
p.lock.RLock()
defer p.lock.RUnlock()

return p.leader
}

// setResourceToDelete is called when there are resources to delete, but this pod is not leader.
// Once it becomes leader, it will delete those resources.
func (p *NginxProvisioner) setResourceToDelete(gatewayNSName types.NamespacedName) {
p.lock.Lock()
defer p.lock.Unlock()

p.resourcesToDeleteOnStartup = append(p.resourcesToDeleteOnStartup, gatewayNSName)
}

//nolint:gocyclo // will refactor at some point
func (p *NginxProvisioner) provisionNginx(
ctx context.Context,
resourceName string,
gateway *gatewayv1.Gateway,
nProxyCfg *graph.EffectiveNginxProxy,
) error {
if !p.isLeader() {
return nil
}

objects := p.buildNginxResourceObjects(resourceName, gateway, nProxyCfg)

p.cfg.Logger.Info(
Expand Down Expand Up @@ -208,6 +256,9 @@ func (p *NginxProvisioner) reprovisionNginx(
gateway *gatewayv1.Gateway,
nProxyCfg *graph.EffectiveNginxProxy,
) error {
if !p.isLeader() {
return nil
}
objects := p.buildNginxResourceObjects(resourceName, gateway, nProxyCfg)

p.cfg.Logger.Info(
Expand Down Expand Up @@ -236,6 +287,10 @@ func (p *NginxProvisioner) reprovisionNginx(
}

func (p *NginxProvisioner) deprovisionNginx(ctx context.Context, gatewayNSName types.NamespacedName) error {
if !p.isLeader() {
return nil
}

p.cfg.Logger.Info(
"Removing nginx resources for Gateway",
"name", gatewayNSName.Name,
Expand Down
4 changes: 4 additions & 0 deletions internal/mode/static/provisioner/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ func (s *store) registerResourceInGatewayConfig(gatewayNSName types.NamespacedNa
}

func gatewayChanged(original, updated *graph.Gateway) bool {
if original == nil {
return true
}

if original.Valid != updated.Valid {
return true
}
Expand Down

0 comments on commit d7f5f58

Please sign in to comment.