From ff25f2d6418226541607fd3f35708ec9f209dc66 Mon Sep 17 00:00:00 2001 From: Mihalkin Grigoriy Aleksandrovich Date: Sat, 1 Jun 2019 18:32:13 +0300 Subject: [PATCH 1/2] added leader election for runnables --- pkg/builder/controller_test.go | 44 +++- pkg/cache/informer_cache.go | 6 + pkg/controller/controller.go | 21 ++ pkg/controller/controller_integration_test.go | 3 + pkg/controller/controller_test.go | 28 ++- pkg/internal/controller/controller.go | 16 ++ .../recorder/recorder_integration_test.go | 3 + pkg/manager/internal.go | 211 +++++++++++++----- pkg/manager/manager.go | 83 +++---- pkg/manager/manager_test.go | 81 ++++--- pkg/webhook/server.go | 6 + 11 files changed, 375 insertions(+), 127 deletions(-) diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 9f1d5062e5..30e5056a05 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -62,6 +62,11 @@ var _ = Describe("application", func() { instance, err := ControllerManagedBy(m). For(&appsv1.ReplicaSet{}). Owns(&appsv1.ReplicaSet{}). + WithOptions(controller.Options{ + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }). Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(instance).NotTo(BeNil()) @@ -76,6 +81,11 @@ var _ = Describe("application", func() { instance, err := ControllerManagedBy(m). For(&fakeType{}). Owns(&appsv1.ReplicaSet{}). + WithOptions(controller.Options{ + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }). Build(noop) Expect(err).To(MatchError(ContainSubstring("no kind is registered for the type builder.fakeType"))) Expect(instance).To(BeNil()) @@ -98,6 +108,11 @@ var _ = Describe("application", func() { instance, err := ControllerManagedBy(m). For(&appsv1.ReplicaSet{}). Owns(&appsv1.ReplicaSet{}). + WithOptions(controller.Options{ + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }). Build(noop) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("expected error")) @@ -121,7 +136,12 @@ var _ = Describe("application", func() { instance, err := ControllerManagedBy(m). For(&appsv1.ReplicaSet{}). Owns(&appsv1.ReplicaSet{}). - WithOptions(controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: maxConcurrentReconciles, + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }). Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(instance).NotTo(BeNil()) @@ -164,6 +184,11 @@ var _ = Describe("application", func() { ctrl1, err := ControllerManagedBy(m). For(&TestDefaultValidator{}). Owns(&appsv1.ReplicaSet{}). + WithOptions(controller.Options{ + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }). Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(ctrl1).NotTo(BeNil()) @@ -172,6 +197,11 @@ var _ = Describe("application", func() { ctrl2, err := ControllerManagedBy(m). For(&TestDefaultValidator{}). Owns(&appsv1.ReplicaSet{}). + WithOptions(controller.Options{ + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }). Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(ctrl2).NotTo(BeNil()) @@ -185,7 +215,12 @@ var _ = Describe("application", func() { bldr := ControllerManagedBy(m). For(&appsv1.Deployment{}). - Owns(&appsv1.ReplicaSet{}) + Owns(&appsv1.ReplicaSet{}). + WithOptions(controller.Options{ + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }) doReconcileTest("3", stop, bldr, m, false) close(done) }, 10) @@ -196,6 +231,11 @@ var _ = Describe("application", func() { bldr := ControllerManagedBy(m). For(&appsv1.Deployment{}). + WithOptions(controller.Options{ + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }). Watches( // Equivalent of Owns &source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true}) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 5b2d1ee1f0..8fa836c960 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -169,6 +169,12 @@ func (ip *informerCache) NeedLeaderElection() bool { return false } +// GetID implements the LeaderElectionRunnable interface. +// It's dummy method that always returns empty string as informerCache doesn't need leader election. +func (ip *informerCache) GetID() string { + return "" +} + // IndexField adds an indexer to the underlying cache, using extraction function to get // value(s) from the given field. This index can then be used by passing a field selector // to List. For one-to-one compatibility with "normal" field selectors, only return one value. diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e75f13b906..64621854db 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -41,6 +41,18 @@ type Options struct { // Defaults to MaxOfRateLimiter which has both overall and per-item rate limiting. // The overall is a token bucket and the per-item is exponential. RateLimiter ratelimiter.RateLimiter + + // Leader election is by default + LeaderElection *LeaderElectionOptions +} + +// Leader Election options +type LeaderElectionOptions struct { + // NeedLeaderElection determines whether or not to use leader election when starting the controller. + NeedLeaderElection bool + + // LeaderElectionID determines the name of the configmap that leader election will use for holding the leader lock. + LeaderElectionID string } // Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests @@ -83,6 +95,13 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) options.RateLimiter = workqueue.DefaultControllerRateLimiter() } + if options.LeaderElection == nil { + options.LeaderElection = &LeaderElectionOptions{ + NeedLeaderElection: true, + LeaderElectionID: name, + } + } + // Inject dependencies into Reconciler if err := mgr.SetFields(options.Reconciler); err != nil { return nil, err @@ -101,6 +120,8 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, Name: name, + LeaderElection: options.LeaderElection.NeedLeaderElection, + LeaderElectionID: options.LeaderElection.LeaderElectionID, } // Add the controller as a Manager components diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index a620d64c8c..56fa8d115a 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -65,6 +65,9 @@ var _ = Describe("controller", func() { reconciled <- request return reconcile.Result{}, nil }), + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 2194220e31..1520128954 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -48,7 +48,12 @@ var _ = Describe("controller.Controller", func() { It("should return an error if Name is not Specified", func(done Done) { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - c, err := controller.New("", m, controller.Options{Reconciler: rec}) + c, err := controller.New("", m, controller.Options{ + Reconciler: rec, + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }) Expect(c).To(BeNil()) Expect(err.Error()).To(ContainSubstring("must specify Name for Controller")) @@ -70,7 +75,12 @@ var _ = Describe("controller.Controller", func() { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - c, err := controller.New("foo", m, controller.Options{Reconciler: &failRec{}}) + c, err := controller.New("foo", m, controller.Options{ + Reconciler: &failRec{}, + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }) Expect(c).To(BeNil()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("expected error")) @@ -82,11 +92,21 @@ var _ = Describe("controller.Controller", func() { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) - c1, err := controller.New("c1", m, controller.Options{Reconciler: rec}) + c1, err := controller.New("c1", m, controller.Options{ + Reconciler: rec, + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }) Expect(err).NotTo(HaveOccurred()) Expect(c1).ToNot(BeNil()) - c2, err := controller.New("c2", m, controller.Options{Reconciler: rec}) + c2, err := controller.New("c2", m, controller.Options{ + Reconciler: rec, + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, + }) Expect(err).NotTo(HaveOccurred()) Expect(c2).ToNot(BeNil()) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 946ef82852..0d965492f5 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -50,6 +50,14 @@ type Controller struct { // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. MaxConcurrentReconciles int + // LeaderElection determines whether or not to use leader election when + // starting the controller. + LeaderElection bool + + // LeaderElectionID determines the name of the configmap that leader election + // will use for holding the leader lock. + LeaderElectionID string + // Reconciler is a function that can be called at any time with the Name / Namespace of an object and // ensures that the state of the system matches the state specified in the object. // Defaults to the DefaultReconcileFunc. @@ -296,3 +304,11 @@ func (c *Controller) InjectFunc(f inject.Func) error { func (c *Controller) updateMetrics(reconcileTime time.Duration) { ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds()) } + +func (c *Controller) NeedLeaderElection() bool { + return c.LeaderElection +} + +func (c *Controller) GetID() string { + return c.LeaderElectionID +} diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index f3134b0cb6..6f5fbd866e 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -61,6 +61,9 @@ var _ = Describe("recorder", func() { recorder.Event(dp, corev1.EventTypeNormal, "test-reason", "test-msg") return reconcile.Result{}, nil }), + LeaderElection: &controller.LeaderElectionOptions{ + NeedLeaderElection: false, + }, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 2f969fcd36..049f54a305 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -18,6 +18,7 @@ package manager import ( "context" + "errors" "fmt" "net" "net/http" @@ -36,6 +37,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" + crleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/recorder" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" @@ -62,9 +64,10 @@ type controllerManager struct { // to scheme.scheme. scheme *runtime.Scheme - // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. + // leaderElectionRunnables is the map that groups runnables that use same leader election ID. // These Runnables are managed by lead election. - leaderElectionRunnables []Runnable + leaderElectionRunnables map[string][]Runnable + // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. // These Runnables will not be blocked by lead election. nonLeaderElectionRunnables []Runnable @@ -86,8 +89,16 @@ type controllerManager struct { // (and EventHandlers, Sources and Predicates). recorderProvider recorder.Provider - // resourceLock forms the basis for leader election - resourceLock resourcelock.Interface + // defaultLeaderElection determines whether or not to use leader election by default + // for runnables that don't implement LeaderElectionRunnable interface. + defaultLeaderElection bool + + // defaultLeaderElectionID is used for runnables that don't implement LeaderElectionRunnable interface. + defaultLeaderElectionID string + + // leaderElectionNamespace determines the namespace in which the leader + // election configmaps will be created. + leaderElectionNamespace string // mapper is used to map resources to kind, and map kind and version. mapper meta.RESTMapper @@ -112,7 +123,6 @@ type controllerManager struct { mu sync.Mutex started bool - startedLeader bool healthzStarted bool // NB(directxman12): we don't just use an error channel here to avoid the situation where the @@ -152,6 +162,19 @@ type controllerManager struct { // retryPeriod is the duration the LeaderElector clients should wait // between tries of actions. retryPeriod time.Duration + + // electedLeaderElectionIDs is the map from leader election ID to it's election status + electedLeaderElectionIDs map[string]bool + + // leaderElectionGroupStopChannels is map from leader election ID + // to channel used to stop all runnables in this LE ID group + leaderElectionGroupStopChannels map[string]<-chan struct{} + + // leaderElectionIDResourceLocks is map from leader election ID to resourceLock + leaderElectionIDResourceLocks map[string]resourcelock.Interface + + // Dependency injection for testing + newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options crleaderelection.Options) (resourcelock.Interface, error) } type errSignaler struct { @@ -209,24 +232,56 @@ func (cm *controllerManager) Add(r Runnable) error { return err } - var shouldStart bool + if cm.leaderElectionRunnables == nil { + cm.leaderElectionRunnables = make(map[string][]Runnable) + } // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { - shouldStart = cm.started + if leRunnable, ok := r.(LeaderElectionRunnable); (ok && !leRunnable.NeedLeaderElection()) || (!ok && !cm.defaultLeaderElection) { cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) + + if cm.started { + // If already started, start the controller + go func() { + err := r.Start(cm.internalStop) + if err != nil { + cm.errSignal.SignalError(err) + } + }() + } } else { - shouldStart = cm.startedLeader - cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) - } + var leID string - if shouldStart { - // If already started, start the controller - go func() { - if err := r.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) + if ok { + leID := leRunnable.GetID() + + // Check that leader election ID is defined + if leID == "" { + return errors.New("LeaderElectionID must be configured") } - }() + } else if cm.defaultLeaderElection { + // If runnable doesn't implement LeaderElectionRunnable interface and defaultLeaderElection is true + // it's assumed that it needs leader election. + // This is done to maintain backwards compatibility + leID = cm.defaultLeaderElectionID + } + + cm.leaderElectionRunnables[leID] = append(cm.leaderElectionRunnables[leID], r) + + if cm.started { + // If Leader Election ID is already used and elected, start the controller. + // If it's appeared first time, start leader election for it. + if cm.electedLeaderElectionIDs[leID] { + go func() { + err := r.Start(cm.leaderElectionGroupStopChannels[leID]) + if err != nil { + cm.errSignal.SignalError(err) + } + }() + } else { + go cm.startLeaderElectionRunnable(leID) + } + } } return nil @@ -417,14 +472,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { go cm.startNonLeaderElectionRunnables() - if cm.resourceLock != nil { - err := cm.startLeaderElection() - if err != nil { - return err - } - } else { - go cm.startLeaderElectionRunnables() - } + go cm.startLeaderElectionRunnables() select { case <-stop: @@ -465,21 +513,9 @@ func (cm *controllerManager) startLeaderElectionRunnables() { cm.waitForCache() // Start the leader election Runnables after the cache has synced - for _, c := range cm.leaderElectionRunnables { - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - ctrl := c - go func() { - if err := ctrl.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - // we use %T here because we don't have a good stand-in for "name", - // and the full runnable might not serialize (mutexes, etc) - log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl)) - }() + for leID := range cm.leaderElectionRunnables { + go cm.startLeaderElectionRunnable(leID) } - - cm.startedLeader = true } func (cm *controllerManager) waitForCache() { @@ -503,23 +539,94 @@ func (cm *controllerManager) waitForCache() { cm.started = true } -func (cm *controllerManager) startLeaderElection() (err error) { +func (cm *controllerManager) startLeaderElectionRunnable(leaderElectionID string) { + cm.mu.Lock() + defer cm.mu.Unlock() + + // Get or create resource lock + if cm.leaderElectionIDResourceLocks == nil { + cm.leaderElectionIDResourceLocks = make(map[string]resourcelock.Interface) + } + + if _, ok := cm.leaderElectionIDResourceLocks[leaderElectionID]; !ok { + resourceLock, err := cm.newResourceLock(cm.config, cm.recorderProvider, crleaderelection.Options{ + LeaderElection: true, + LeaderElectionID: leaderElectionID, + LeaderElectionNamespace: cm.leaderElectionNamespace, + }) + + // Controllers block, but we want to return an error if any have an error starting. + // Write any Start errors to a channel so we can return them + if err != nil { + cm.errSignal.SignalError(err) + return + } + + cm.leaderElectionIDResourceLocks[leaderElectionID] = resourceLock + } + + // Channel to stop all runnables in LE group + groupStopChan := make(chan struct{}) + + err := cm.startLeaderElection(cm.leaderElectionIDResourceLocks[leaderElectionID], leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + cm.mu.Lock() + defer cm.mu.Unlock() + + if cm.electedLeaderElectionIDs == nil { + cm.electedLeaderElectionIDs = make(map[string]bool) + } + cm.electedLeaderElectionIDs[leaderElectionID] = true + + if cm.leaderElectionGroupStopChannels == nil { + cm.leaderElectionGroupStopChannels = make(map[string]<-chan struct{}) + } + cm.leaderElectionGroupStopChannels[leaderElectionID] = groupStopChan + + runnables := cm.leaderElectionRunnables[leaderElectionID] + for _, r := range runnables { + runnable := r + go func() { + err := runnable.Start(cm.leaderElectionGroupStopChannels[leaderElectionID]) + if err != nil { + cm.errSignal.SignalError(err) + } + + // we use %T here because we don't have a good stand-in for "name", + // and the full runnable might not serialize (mutexes, etc) + log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", runnable)) + }() + } + }, + OnStoppedLeading: func() { + cm.mu.Lock() + + cm.electedLeaderElectionIDs[leaderElectionID] = false + close(groupStopChan) + + cm.mu.Unlock() + + // Starting leader election for LE group if controller isn't stopped + select { + case <-cm.internalStop: + return + default: + go cm.startLeaderElectionRunnable(leaderElectionID) + } + }, + }) + if err != nil { + cm.errSignal.SignalError(err) + } +} + +func (cm *controllerManager) startLeaderElection(resourceLock resourcelock.Interface, callbacks leaderelection.LeaderCallbacks) (err error) { l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: cm.resourceLock, + Lock: resourceLock, LeaseDuration: cm.leaseDuration, RenewDeadline: cm.renewDeadline, RetryPeriod: cm.retryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ context.Context) { - cm.startLeaderElectionRunnables() - }, - OnStoppedLeading: func() { - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errSignal.SignalError(fmt.Errorf("leader election lost")) - }, - }, + Callbacks: callbacks, }) if err != nil { return err diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 852eb787b9..1447dd1761 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -40,6 +40,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" ) +const ( + defaultLeaderElectionID = "default-le-id" +) + // Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables. // A Manager is required to create Controllers. type Manager interface { @@ -115,18 +119,19 @@ type Options struct { // so that all controllers will not send list requests simultaneously. SyncPeriod *time.Duration - // LeaderElection determines whether or not to use leader election when - // starting the manager. - LeaderElection bool + // DefaultLeaderElection determines whether or not to use leader election by default + // for runnables that don't implement LeaderElectionRunnable interface. + DefaultLeaderElection bool + + // DefaultLeaderElectionID determines the name of the configmap that leader election + // will use for runnables that don't implement LeaderElectionRunnable interface. + // If not specified, default value will be assigned. + DefaultLeaderElectionID string // LeaderElectionNamespace determines the namespace in which the leader - // election configmap will be created. + // election configmaps will be created. LeaderElectionNamespace string - // LeaderElectionID determines the name of the configmap that leader election - // will use for holding the leader lock. - LeaderElectionID string - // LeaseDuration is the duration that non-leader candidates will // wait to force acquire leadership. This is measured against time of // last observed ack. Default is 15 seconds. @@ -223,6 +228,9 @@ type LeaderElectionRunnable interface { // NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode. // e.g. controllers need to be run in leader election mode, while webhook server doesn't. NeedLeaderElection() bool + + // GetID returns leader election ID + GetID() string } // New returns a new Manager for creating Controllers. @@ -265,16 +273,6 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } - // Create the resource lock to enable leader election) - resourceLock, err := options.newResourceLock(config, recorderProvider, leaderelection.Options{ - LeaderElection: options.LeaderElection, - LeaderElectionID: options.LeaderElectionID, - LeaderElectionNamespace: options.LeaderElectionNamespace, - }) - if err != nil { - return nil, err - } - // Create the metrics listener. This will throw an error if the metrics bind // address is invalid or already in use. metricsListener, err := options.newMetricsListener(options.MetricsBindAddress) @@ -292,27 +290,30 @@ func New(config *rest.Config, options Options) (Manager, error) { stop := make(chan struct{}) return &controllerManager{ - config: config, - scheme: options.Scheme, - cache: cache, - fieldIndexes: cache, - client: writeObj, - apiReader: apiReader, - recorderProvider: recorderProvider, - resourceLock: resourceLock, - mapper: mapper, - metricsListener: metricsListener, - internalStop: stop, - internalStopper: stop, - port: options.Port, - host: options.Host, - certDir: options.CertDir, - leaseDuration: *options.LeaseDuration, - renewDeadline: *options.RenewDeadline, - retryPeriod: *options.RetryPeriod, - healthProbeListener: healthProbeListener, - readinessEndpointName: options.ReadinessEndpointName, - livenessEndpointName: options.LivenessEndpointName, + config: config, + scheme: options.Scheme, + cache: cache, + fieldIndexes: cache, + client: writeObj, + apiReader: apiReader, + recorderProvider: recorderProvider, + mapper: mapper, + metricsListener: metricsListener, + internalStop: stop, + internalStopper: stop, + port: options.Port, + host: options.Host, + certDir: options.CertDir, + leaseDuration: *options.LeaseDuration, + renewDeadline: *options.RenewDeadline, + retryPeriod: *options.RetryPeriod, + healthProbeListener: healthProbeListener, + readinessEndpointName: options.ReadinessEndpointName, + livenessEndpointName: options.LivenessEndpointName, + defaultLeaderElection: options.DefaultLeaderElection, + defaultLeaderElectionID: options.DefaultLeaderElectionID, + leaderElectionNamespace: options.LeaderElectionNamespace, + newResourceLock: options.newResourceLock, }, nil } @@ -412,5 +413,9 @@ func setOptionsDefaults(options Options) Options { options.newHealthProbeListener = defaultHealthProbeListener } + if options.DefaultLeaderElectionID == "" { + options.DefaultLeaderElectionID = defaultLeaderElectionID + } + return options } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 9b291d80cc..c3bd5c2963 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -31,12 +31,10 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/leaderelection" fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -140,32 +138,6 @@ var _ = Describe("manger.Manager", func() { close(done) }) - Context("with leader election enabled", func() { - It("should default ID to controller-runtime if ID is not set", func() { - var rl resourcelock.Interface - m, err := New(cfg, Options{ - LeaderElection: true, - LeaderElectionNamespace: "default", - LeaderElectionID: "test-leader-election-id", - newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) { - var err error - rl, err = leaderelection.NewResourceLock(config, recorderProvider, options) - return rl, err - }, - }) - Expect(err).ToNot(HaveOccurred()) - Expect(m).ToNot(BeNil()) - Expect(rl.Describe()).To(Equal("default/test-leader-election-id")) - }) - - It("should return an error if namespace not set and not running in cluster", func() { - m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) - Expect(m).To(BeNil()) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("unable to find leader election namespace: not running in-cluster, please specify LeaderElectionNamespace")) - }) - }) - It("should create a listener for the metrics if a valid address is provided", func() { var listener net.Listener m, err := New(cfg, Options{ @@ -327,6 +299,41 @@ var _ = Describe("manger.Manager", func() { <-c3 }) + It("shouldn't allow empty LeaderElectionID for controller", func(done Done) { + m, err := New(cfg, options) + Expect(err).ToNot(HaveOccurred()) + + r := &leRunnable{ + leaderElectionID: "", + } + Expect(m.Add(r).Error()).To(ContainSubstring("LeaderElectionID must be configured")) + close(done) + }) + + It("should be able to add runnables with same LeaderElectionID", func(done Done) { + if options.LeaderElectionNamespace != "" { + uniqueLeaderElectionID := "unique" + nonUniqueLeaderElectionID := "non-unique" + m, err := New(cfg, options) + Expect(err).ToNot(HaveOccurred()) + + uniqueR := &leRunnable{ + leaderElectionID: uniqueLeaderElectionID, + } + r1 := &leRunnable{ + leaderElectionID: nonUniqueLeaderElectionID, + } + r2 := &leRunnable{ + leaderElectionID: nonUniqueLeaderElectionID, + } + Expect(m.Add(uniqueR)).To(Succeed()) + Expect(m.Add(r1)).To(Succeed()) + Expect(m.Add(r2)).To(Succeed()) + } + + close(done) + }) + It("should return an error if any non-leaderelection Components fail to Start", func() { // TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429 }) @@ -338,8 +345,6 @@ var _ = Describe("manger.Manager", func() { Context("with leaderelection enabled", func() { startSuite(Options{ - LeaderElection: true, - LeaderElectionID: "controller-runtime", LeaderElectionNamespace: "default", newResourceLock: fakeleaderelection.NewResourceLock, }) @@ -889,3 +894,19 @@ func (i *injectable) InjectStopChannel(stop <-chan struct{}) error { func (i *injectable) Start(<-chan struct{}) error { return nil } + +type leRunnable struct { + leaderElectionID string +} + +func (*leRunnable) Start(<-chan struct{}) error { + return nil +} + +func (*leRunnable) NeedLeaderElection() bool { + return true +} + +func (le *leRunnable) GetID() string { + return le.leaderElectionID +} diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index d542f2de44..2b7e1f65cd 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -107,6 +107,12 @@ func (*Server) NeedLeaderElection() bool { return false } +// GetID implements the LeaderElectionRunnable interface. +// It's dummy method that always returns empty string as webhook.Server doesn't need leader election. +func (*Server) GetID() string { + return "" +} + // Register marks the given webhook as being served at the given path. // It panics if two hooks are registered on the same path. func (s *Server) Register(path string, hook http.Handler) { From 0a1076a326d3f29542519014dfeaaf23a6a6ad7b Mon Sep 17 00:00:00 2001 From: GrigoriyMikhalkin Date: Thu, 27 Feb 2020 21:34:27 +0300 Subject: [PATCH 2/2] added leader election modes --- pkg/builder/controller_test.go | 17 +++++++------ pkg/cache/informer_cache.go | 7 +++--- pkg/cache/informer_cache_test.go | 3 ++- pkg/controller/controller.go | 10 +++++--- pkg/controller/controller_integration_test.go | 3 ++- pkg/controller/controller_test.go | 9 ++++--- pkg/internal/controller/controller.go | 9 ++++--- .../recorder/recorder_integration_test.go | 3 ++- pkg/leaderelection/leader_election.go | 15 ++++++++++- pkg/manager/internal.go | 25 ++++++++++++------- pkg/manager/manager.go | 15 +++++------ pkg/manager/manager_test.go | 5 ++-- pkg/webhook/server.go | 7 +++--- 13 files changed, 80 insertions(+), 48 deletions(-) diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 30e5056a05..a6a2b27569 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/scheme" @@ -64,7 +65,7 @@ var _ = Describe("application", func() { Owns(&appsv1.ReplicaSet{}). WithOptions(controller.Options{ LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }). Build(noop) @@ -83,7 +84,7 @@ var _ = Describe("application", func() { Owns(&appsv1.ReplicaSet{}). WithOptions(controller.Options{ LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }). Build(noop) @@ -110,7 +111,7 @@ var _ = Describe("application", func() { Owns(&appsv1.ReplicaSet{}). WithOptions(controller.Options{ LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }). Build(noop) @@ -139,7 +140,7 @@ var _ = Describe("application", func() { WithOptions(controller.Options{ MaxConcurrentReconciles: maxConcurrentReconciles, LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }). Build(noop) @@ -186,7 +187,7 @@ var _ = Describe("application", func() { Owns(&appsv1.ReplicaSet{}). WithOptions(controller.Options{ LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }). Build(noop) @@ -199,7 +200,7 @@ var _ = Describe("application", func() { Owns(&appsv1.ReplicaSet{}). WithOptions(controller.Options{ LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }). Build(noop) @@ -218,7 +219,7 @@ var _ = Describe("application", func() { Owns(&appsv1.ReplicaSet{}). WithOptions(controller.Options{ LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }) doReconcileTest("3", stop, bldr, m, false) @@ -233,7 +234,7 @@ var _ = Describe("application", func() { For(&appsv1.Deployment{}). WithOptions(controller.Options{ LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }). Watches( // Equivalent of Owns diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 8fa836c960..d50432affb 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache/internal" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" ) var ( @@ -163,10 +164,10 @@ func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) { return i.Informer, err } -// NeedLeaderElection implements the LeaderElectionRunnable interface +// GetLeaderElectionMode implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock -func (ip *informerCache) NeedLeaderElection() bool { - return false +func (ip *informerCache) GetLeaderElectionMode() leaderelection.Mode { + return leaderelection.NonLeaderElectionMode } // GetID implements the LeaderElectionRunnable interface. diff --git a/pkg/cache/informer_cache_test.go b/pkg/cache/informer_cache_test.go index 9571845191..a227ff5724 100644 --- a/pkg/cache/informer_cache_test.go +++ b/pkg/cache/informer_cache_test.go @@ -8,6 +8,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -23,6 +24,6 @@ var _ = Describe("informerCache", func() { leaderElectionRunnable, ok := c.(manager.LeaderElectionRunnable) Expect(ok).To(BeTrue()) - Expect(leaderElectionRunnable.NeedLeaderElection()).To(BeFalse()) + Expect(leaderElectionRunnable.GetLeaderElectionMode()).To(Equal(leaderelection.NonLeaderElectionMode)) }) }) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 64621854db..34deb598ac 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -22,6 +22,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/internal/controller" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/ratelimiter" @@ -48,8 +49,8 @@ type Options struct { // Leader Election options type LeaderElectionOptions struct { - // NeedLeaderElection determines whether or not to use leader election when starting the controller. - NeedLeaderElection bool + // LeaderElectionMode determines what leader election mode to use when starting the controller. + LeaderElectionMode leaderelection.Mode // LeaderElectionID determines the name of the configmap that leader election will use for holding the leader lock. LeaderElectionID string @@ -96,8 +97,9 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) } if options.LeaderElection == nil { + // Defaulting to per-manager leader election mode for backwards compatibility options.LeaderElection = &LeaderElectionOptions{ - NeedLeaderElection: true, + LeaderElectionMode: leaderelection.PerManagerLeaderElectionMode, LeaderElectionID: name, } } @@ -120,7 +122,7 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, Name: name, - LeaderElection: options.LeaderElection.NeedLeaderElection, + LeaderElectionMode: options.LeaderElection.LeaderElectionMode, LeaderElectionID: options.LeaderElection.LeaderElectionID, } diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 56fa8d115a..aa965954c1 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -66,7 +67,7 @@ var _ = Describe("controller", func() { return reconcile.Result{}, nil }), LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 1520128954..165e80b18f 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" @@ -51,7 +52,7 @@ var _ = Describe("controller.Controller", func() { c, err := controller.New("", m, controller.Options{ Reconciler: rec, LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }) Expect(c).To(BeNil()) @@ -78,7 +79,7 @@ var _ = Describe("controller.Controller", func() { c, err := controller.New("foo", m, controller.Options{ Reconciler: &failRec{}, LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }) Expect(c).To(BeNil()) @@ -95,7 +96,7 @@ var _ = Describe("controller.Controller", func() { c1, err := controller.New("c1", m, controller.Options{ Reconciler: rec, LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }) Expect(err).NotTo(HaveOccurred()) @@ -104,7 +105,7 @@ var _ = Describe("controller.Controller", func() { c2, err := controller.New("c2", m, controller.Options{ Reconciler: rec, LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 0d965492f5..a51a33cbc0 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" @@ -50,9 +51,9 @@ type Controller struct { // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. MaxConcurrentReconciles int - // LeaderElection determines whether or not to use leader election when + // LeaderElectionMode determines which leader election mode to use when // starting the controller. - LeaderElection bool + LeaderElectionMode leaderelection.Mode // LeaderElectionID determines the name of the configmap that leader election // will use for holding the leader lock. @@ -305,8 +306,8 @@ func (c *Controller) updateMetrics(reconcileTime time.Duration) { ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds()) } -func (c *Controller) NeedLeaderElection() bool { - return c.LeaderElection +func (c *Controller) GetLeaderElectionMode() leaderelection.Mode { + return c.LeaderElectionMode } func (c *Controller) GetID() string { diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index 6f5fbd866e..c4d07f9ebe 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -25,6 +25,7 @@ import ( ref "k8s.io/client-go/tools/reference" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -62,7 +63,7 @@ var _ = Describe("recorder", func() { return reconcile.Result{}, nil }), LeaderElection: &controller.LeaderElectionOptions{ - NeedLeaderElection: false, + LeaderElectionMode: leaderelection.NonLeaderElectionMode, }, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index 41b74c6074..a41edc4a18 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -29,7 +29,20 @@ import ( "sigs.k8s.io/controller-runtime/pkg/recorder" ) -const inClusterNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" +type Mode uint8 + +const ( + inClusterNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" + + // NonLeaderElectionMode mode for Runnables that don't need leader election + NonLeaderElectionMode Mode = 0 + + // PerManagerLeaderElectionMode mode for Runnables that need per-manager leader election mode + PerManagerLeaderElectionMode Mode = 1 + + // PerControllerGroupLeaderElectionMode mode for Runnables that need per-controller leader election mode + PerControllerGroupLeaderElectionMode Mode = 2 +) // Options provides the required configuration to create a new resource lock type Options struct { diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 049f54a305..63e941fabb 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -89,8 +89,9 @@ type controllerManager struct { // (and EventHandlers, Sources and Predicates). recorderProvider recorder.Provider - // defaultLeaderElection determines whether or not to use leader election by default - // for runnables that don't implement LeaderElectionRunnable interface. + // defaultLeaderElection determines whether or not to use leader election + // for runnables that need per-manager leader election or + // don't implement LeaderElectionRunnable interface. defaultLeaderElection bool // defaultLeaderElectionID is used for runnables that don't implement LeaderElectionRunnable interface. @@ -237,7 +238,16 @@ func (cm *controllerManager) Add(r Runnable) error { } // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); (ok && !leRunnable.NeedLeaderElection()) || (!ok && !cm.defaultLeaderElection) { + leRunnable, ok := r.(LeaderElectionRunnable) + + // If runnable doesn't implement LeaderElectionRunnable interface and defaultLeaderElection is true + // it's assumed that it needs per-manager leader election. + // This is done to maintain backwards compatibility + needPerManagerLE := cm.defaultLeaderElection && (!ok || (ok && (leRunnable.GetLeaderElectionMode() == crleaderelection.PerManagerLeaderElectionMode))) + + needPerControllerLE := ok && (leRunnable.GetLeaderElectionMode() == crleaderelection.PerControllerGroupLeaderElectionMode) + + if !needPerManagerLE && !needPerControllerLE { cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) if cm.started { @@ -252,18 +262,15 @@ func (cm *controllerManager) Add(r Runnable) error { } else { var leID string - if ok { + if needPerManagerLE { + leID = cm.defaultLeaderElectionID + } else { leID := leRunnable.GetID() // Check that leader election ID is defined if leID == "" { return errors.New("LeaderElectionID must be configured") } - } else if cm.defaultLeaderElection { - // If runnable doesn't implement LeaderElectionRunnable interface and defaultLeaderElection is true - // it's assumed that it needs leader election. - // This is done to maintain backwards compatibility - leID = cm.defaultLeaderElectionID } cm.leaderElectionRunnables[leID] = append(cm.leaderElectionRunnables[leID], r) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 1447dd1761..faa0f60207 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -119,12 +119,14 @@ type Options struct { // so that all controllers will not send list requests simultaneously. SyncPeriod *time.Duration - // DefaultLeaderElection determines whether or not to use leader election by default - // for runnables that don't implement LeaderElectionRunnable interface. + // DefaultLeaderElection determines whether or not to use leader election + // for runnables that need per-manager leader election or + // don't implement LeaderElectionRunnable interface. DefaultLeaderElection bool // DefaultLeaderElectionID determines the name of the configmap that leader election - // will use for runnables that don't implement LeaderElectionRunnable interface. + // will use for runnables that need per-manager leader election or + // don't implement LeaderElectionRunnable interface. // If not specified, default value will be assigned. DefaultLeaderElectionID string @@ -225,11 +227,10 @@ func (r RunnableFunc) Start(s <-chan struct{}) error { // LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode. type LeaderElectionRunnable interface { - // NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode. - // e.g. controllers need to be run in leader election mode, while webhook server doesn't. - NeedLeaderElection() bool + // GetLeaderElectionMode returns leader election mode in which Runnable needs to be run. + GetLeaderElectionMode() leaderelection.Mode - // GetID returns leader election ID + // GetID returns leader election ID for per-controller leader election mode. GetID() string } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index c3bd5c2963..8865df42f8 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -903,8 +904,8 @@ func (*leRunnable) Start(<-chan struct{}) error { return nil } -func (*leRunnable) NeedLeaderElection() bool { - return true +func (*leRunnable) GetLeaderElectionMode() leaderelection.Mode { + return leaderelection.PerControllerGroupLeaderElectionMode } func (le *leRunnable) GetID() string { diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 2b7e1f65cd..dbbdf7e0b8 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -30,6 +30,7 @@ import ( "sync" "time" + "sigs.k8s.io/controller-runtime/pkg/leaderelection" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/webhook/internal/certwatcher" "sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics" @@ -101,10 +102,10 @@ func (s *Server) setDefaults() { } } -// NeedLeaderElection implements the LeaderElectionRunnable interface, which indicates +// GetLeaderElectionMode implements the LeaderElectionRunnable interface, which indicates // the webhook server doesn't need leader election. -func (*Server) NeedLeaderElection() bool { - return false +func (*Server) GetLeaderElectionMode() leaderelection.Mode { + return leaderelection.NonLeaderElectionMode } // GetID implements the LeaderElectionRunnable interface.