diff --git a/bootstrap/bootstrap-pod.yaml b/bootstrap/bootstrap-pod.yaml index bc93746282..ca42facd11 100644 --- a/bootstrap/bootstrap-pod.yaml +++ b/bootstrap/bootstrap-pod.yaml @@ -37,6 +37,7 @@ spec: fieldRef: fieldPath: spec.nodeName hostNetwork: true + terminationGracePeriodSeconds: 130 volumes: - name: kubeconfig hostPath: diff --git a/cmd/start.go b/cmd/start.go index 20fae2301a..f38462c1e7 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -1,6 +1,8 @@ package main import ( + "context" + "github.com/spf13/cobra" "k8s.io/klog" @@ -16,11 +18,12 @@ func init() { Long: "", Run: func(cmd *cobra.Command, args []string) { // To help debugging, immediately log version - klog.Infof("%s", version.String) + klog.Info(version.String) - if err := opts.Run(); err != nil { + if err := opts.Run(context.Background()); err != nil { klog.Fatalf("error: %v", err) } + klog.Infof("Graceful shutdown complete for %s.", version.String) }, } diff --git a/install/0000_00_cluster-version-operator_03_deployment.yaml b/install/0000_00_cluster-version-operator_03_deployment.yaml index bc0ba32404..76ae8a980d 100644 --- a/install/0000_00_cluster-version-operator_03_deployment.yaml +++ b/install/0000_00_cluster-version-operator_03_deployment.yaml @@ -52,6 +52,7 @@ spec: nodeSelector: node-role.kubernetes.io/master: "" priorityClassName: "system-cluster-critical" + terminationGracePeriodSeconds: 130 tolerations: - key: "node-role.kubernetes.io/master" operator: Exists diff --git a/lib/resourceread/apiext_test.go b/lib/resourceread/apiext_test.go index 912faf89a6..17ee98ea3f 100644 --- a/lib/resourceread/apiext_test.go +++ b/lib/resourceread/apiext_test.go @@ -13,7 +13,7 @@ func TestReadCustomResourceDefinitionOrDie(t *testing.T) { args args }{ { - name:"v1", + name: "v1", args: args{ objBytes: []byte(` apiVersion: apiextensions.k8s.io/v1 @@ -42,7 +42,7 @@ spec: }, }, { - name:"v1beta1", + name: "v1beta1", args: args{ objBytes: []byte(` apiVersion: apiextensions.k8s.io/v1beta1 @@ -82,4 +82,4 @@ spec: _ = ReadCustomResourceDefinitionOrDie(tt.args.objBytes) }) } -} \ No newline at end of file +} diff --git a/pkg/autoupdate/autoupdate.go b/pkg/autoupdate/autoupdate.go index 2b69378021..fe836c50ff 100644 --- a/pkg/autoupdate/autoupdate.go +++ b/pkg/autoupdate/autoupdate.go @@ -7,7 +7,6 @@ import ( "github.com/blang/semver" - "k8s.io/klog" v1 "github.com/openshift/api/config/v1" clientset "github.com/openshift/client-go/config/clientset/versioned" "github.com/openshift/client-go/config/clientset/versioned/scheme" @@ -23,6 +22,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "k8s.io/klog" ) const ( @@ -87,7 +87,7 @@ func New( } // Run runs the autoupdate controller. -func (ctrl *Controller) Run(workers int, stopCh <-chan struct{}) { +func (ctrl *Controller) Run(workers int, stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() defer ctrl.queue.ShutDown() @@ -95,15 +95,16 @@ func (ctrl *Controller) Run(workers int, stopCh <-chan struct{}) { defer klog.Info("Shutting down AutoUpdateController") if !cache.WaitForCacheSync(stopCh, ctrl.cacheSynced...) { - klog.Info("Caches never synchronized") - return + return fmt.Errorf("caches never synchronized") } for i := 0; i < workers; i++ { + // FIXME: actually wait until these complete if the Context is canceled. And possibly add utilruntime.HandleCrash. go wait.Until(ctrl.worker, time.Second, stopCh) } <-stopCh + return nil } func (ctrl *Controller) eventHandler() cache.ResourceEventHandler { diff --git a/pkg/cvo/availableupdates.go b/pkg/cvo/availableupdates.go index 66dd6d6bb8..72aaa8ea5a 100644 --- a/pkg/cvo/availableupdates.go +++ b/pkg/cvo/availableupdates.go @@ -2,7 +2,6 @@ package cvo import ( "crypto/tls" - "crypto/x509" "fmt" "net/url" "runtime" @@ -11,7 +10,6 @@ import ( "github.com/blang/semver" "github.com/google/uuid" "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog" @@ -197,54 +195,3 @@ func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsCon LastTransitionTime: metav1.Now(), } } - -// getHTTPSProxyURL returns a url.URL object for the configured -// https proxy only. It can be nil if does not exist or there is an error. -func (optr *Operator) getHTTPSProxyURL() (*url.URL, string, error) { - proxy, err := optr.proxyLister.Get("cluster") - - if errors.IsNotFound(err) { - return nil, "", nil - } - if err != nil { - return nil, "", err - } - - if &proxy.Spec != nil { - if proxy.Spec.HTTPSProxy != "" { - proxyURL, err := url.Parse(proxy.Spec.HTTPSProxy) - if err != nil { - return nil, "", err - } - return proxyURL, proxy.Spec.TrustedCA.Name, nil - } - } - return nil, "", nil -} - -func (optr *Operator) getTLSConfig(cmNameRef string) (*tls.Config, error) { - cm, err := optr.cmConfigLister.Get(cmNameRef) - - if err != nil { - return nil, err - } - - certPool, _ := x509.SystemCertPool() - if certPool == nil { - certPool = x509.NewCertPool() - } - - if cm.Data["ca-bundle.crt"] != "" { - if ok := certPool.AppendCertsFromPEM([]byte(cm.Data["ca-bundle.crt"])); !ok { - return nil, fmt.Errorf("unable to add ca-bundle.crt certificates") - } - } else { - return nil, nil - } - - config := &tls.Config{ - RootCAs: certPool, - } - - return config, nil -} diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index d5bc3d4ce0..4bafcf6da7 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -169,7 +169,6 @@ func New( proxyInformer configinformersv1.ProxyInformer, client clientset.Interface, kubeClient kubernetes.Interface, - enableMetrics bool, exclude string, ) *Operator { eventBroadcaster := record.NewBroadcaster() @@ -214,11 +213,6 @@ func New( // make sure this is initialized after all the listers are initialized optr.upgradeableChecks = optr.defaultUpgradeableChecks() - if enableMetrics { - if err := optr.registerMetrics(coInformer.Informer()); err != nil { - panic(err) - } - } return optr } @@ -321,8 +315,7 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder v } // Run runs the cluster version operator until stopCh is completed. Workers is ignored for now. -func (optr *Operator) Run(ctx context.Context, workers int) { - defer utilruntime.HandleCrash() +func (optr *Operator) Run(ctx context.Context, workers int) error { defer optr.queue.ShutDown() stopCh := ctx.Done() workerStopCh := make(chan struct{}) @@ -331,8 +324,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) { defer klog.Info("Shutting down ClusterVersionOperator") if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) { - klog.Info("Caches never synchronized") - return + return fmt.Errorf("caches never synchronized: %w", ctx.Err()) } // trigger the first cluster version reconcile always @@ -361,6 +353,8 @@ func (optr *Operator) Run(ctx context.Context, workers int) { // stop the queue, then wait for the worker to exit optr.queue.ShutDown() <-workerStopCh + + return nil } func (optr *Operator) queueKey() string { diff --git a/pkg/cvo/cvo_test.go b/pkg/cvo/cvo_test.go index 97ceeb852d..7fdb3964e2 100644 --- a/pkg/cvo/cvo_test.go +++ b/pkg/cvo/cvo_test.go @@ -27,8 +27,8 @@ import ( "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" - "k8s.io/client-go/rest" kfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" ktesting "k8s.io/client-go/testing" "k8s.io/client-go/util/workqueue" "k8s.io/klog" diff --git a/pkg/cvo/egress.go b/pkg/cvo/egress.go new file mode 100644 index 0000000000..75cfa607c1 --- /dev/null +++ b/pkg/cvo/egress.go @@ -0,0 +1,61 @@ +package cvo + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "net/url" + + "k8s.io/apimachinery/pkg/api/errors" +) + +// getHTTPSProxyURL returns a url.URL object for the configured +// https proxy only. It can be nil if does not exist or there is an error. +func (optr *Operator) getHTTPSProxyURL() (*url.URL, string, error) { + proxy, err := optr.proxyLister.Get("cluster") + + if errors.IsNotFound(err) { + return nil, "", nil + } + if err != nil { + return nil, "", err + } + + if &proxy.Spec != nil { + if proxy.Spec.HTTPSProxy != "" { + proxyURL, err := url.Parse(proxy.Spec.HTTPSProxy) + if err != nil { + return nil, "", err + } + return proxyURL, proxy.Spec.TrustedCA.Name, nil + } + } + return nil, "", nil +} + +func (optr *Operator) getTLSConfig(cmNameRef string) (*tls.Config, error) { + cm, err := optr.cmConfigLister.Get(cmNameRef) + + if err != nil { + return nil, err + } + + certPool, _ := x509.SystemCertPool() + if certPool == nil { + certPool = x509.NewCertPool() + } + + if cm.Data["ca-bundle.crt"] != "" { + if ok := certPool.AppendCertsFromPEM([]byte(cm.Data["ca-bundle.crt"])); !ok { + return nil, fmt.Errorf("unable to add ca-bundle.crt certificates") + } + } else { + return nil, nil + } + + config := &tls.Config{ + RootCAs: certPool, + } + + return config, nil +} diff --git a/pkg/cvo/metrics.go b/pkg/cvo/metrics.go index db03328694..34f92203fa 100644 --- a/pkg/cvo/metrics.go +++ b/pkg/cvo/metrics.go @@ -1,21 +1,28 @@ package cvo import ( + "context" + "net" + "net/http" "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" + "k8s.io/klog" configv1 "github.com/openshift/api/config/v1" "github.com/openshift/cluster-version-operator/lib/resourcemerge" "github.com/openshift/cluster-version-operator/pkg/internal" ) -func (optr *Operator) registerMetrics(coInformer cache.SharedInformer) error { +// RegisterMetrics initializes metrics and registers them with the +// Prometheus implementation. +func (optr *Operator) RegisterMetrics(coInformer cache.SharedInformer) error { m := newOperatorMetrics(optr) coInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: m.clusterOperatorChanged, @@ -86,6 +93,67 @@ version for 'cluster', or empty for 'initial'. } } +// RunMetrics launches an server bound to listenAddress serving +// Prometheus metrics at /metrics over HTTP. Continues serving until +// runContext.Done() and then attempts a clean shutdown limited by +// shutdownContext.Done(). Assumes runContext.Done() occurs before or +// simultaneously with shutdownContext.Done(). +func RunMetrics(runContext context.Context, shutdownContext context.Context, listenAddress string) error { + handler := http.NewServeMux() + handler.Handle("/metrics", promhttp.Handler()) + server := &http.Server{ + Handler: handler, + } + + errorChannel := make(chan error, 1) + errorChannelCount := 1 + go func() { + tcpListener, err := net.Listen("tcp", listenAddress) + if err != nil { + errorChannel <- err + return + } + + klog.Infof("Metrics port listening for HTTP on %v", listenAddress) + + errorChannel <- server.Serve(tcpListener) + }() + + shutdown := false + var loopError error + for errorChannelCount > 0 { + if shutdown { + err := <-errorChannel + errorChannelCount-- + if err != nil && err != http.ErrServerClosed { + if loopError == nil { + loopError = err + } else if err != nil { // log the error we are discarding + klog.Errorf("Failed to gracefully shut down metrics server: %s", err) + } + } + } else { + select { + case <-runContext.Done(): // clean shutdown + case err := <-errorChannel: // crashed before a shutdown was requested + errorChannelCount-- + if err != nil && err != http.ErrServerClosed { + loopError = err + } + } + shutdown = true + shutdownError := server.Shutdown(shutdownContext) + if loopError == nil { + loopError = shutdownError + } else if shutdownError != nil { // log the error we are discarding + klog.Errorf("Failed to gracefully shut down metrics server: %s", shutdownError) + } + } + } + + return loopError +} + type conditionKey struct { Name string Type string diff --git a/pkg/start/start.go b/pkg/start/start.go index c56436f23b..97befa3b32 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -6,17 +6,15 @@ import ( "context" "fmt" "math/rand" - "net/http" "os" "os/signal" - "sync" "syscall" "time" "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus/promhttp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -67,10 +65,14 @@ type Options struct { Name string Namespace string PayloadOverride string - EnableMetrics bool ResyncInterval time.Duration } +type asyncResult struct { + name string + error error +} + func defaultEnv(name, defaultValue string) string { env, ok := os.LookupEnv(name) if !ok { @@ -91,12 +93,11 @@ func NewOptions() *Options { Name: defaultEnv("CVO_NAME", defaultComponentName), PayloadOverride: os.Getenv("PAYLOAD_OVERRIDE"), ResyncInterval: minResyncPeriod, - EnableMetrics: true, Exclude: os.Getenv("EXCLUDE_MANIFESTS"), } } -func (o *Options) Run() error { +func (o *Options) Run(ctx context.Context) error { if o.NodeName == "" { return fmt.Errorf("node-name is required") } @@ -126,90 +127,127 @@ func (o *Options) Run() error { return err } - // TODO: Kube 1.14 will contain a ReleaseOnCancel boolean on - // LeaderElectionConfig that allows us to have the lock code - // release the lease when this context is cancelled. At that - // time we can remove our changes to OnStartedLeading. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + o.run(ctx, controllerCtx, lock) + return nil +} + +// run launches a number of goroutines to handle manifest application, +// metrics serving, etc. It continues operating until ctx.Done(), +// and then attempts a clean shutdown limited by an internal context +// with a two-minute cap. It returns after it successfully collects all +// launched goroutines. +func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) { + runContext, runCancel := context.WithCancel(ctx) // so we can cancel internally on errors or TERM + defer runCancel() + shutdownContext, shutdownCancel := context.WithCancel(context.Background()) // extends beyond ctx + defer shutdownCancel() + postMainContext, postMainCancel := context.WithCancel(context.Background()) // extends beyond ctx + defer postMainCancel() + ch := make(chan os.Signal, 1) defer func() { signal.Stop(ch) }() signal.Notify(ch, os.Interrupt, syscall.SIGTERM) go func() { + defer utilruntime.HandleCrash() sig := <-ch klog.Infof("Shutting down due to %s", sig) - cancel() - - // exit after 2s no matter what - select { - case <-time.After(5 * time.Second): - klog.Fatalf("Exiting") - case <-ch: - klog.Fatalf("Received shutdown signal twice, exiting") - } + runCancel() + sig = <-ch + klog.Fatalf("Received shutdown signal twice, exiting: %s", sig) }() - o.run(ctx, controllerCtx, lock) - return nil -} - -func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) { - // listen on metrics - if len(o.ListenAddr) > 0 { - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) + resultChannel := make(chan asyncResult, 1) + resultChannelCount := 0 + if o.ListenAddr != "" { + resultChannelCount++ go func() { - if err := http.ListenAndServe(o.ListenAddr, mux); err != nil { - klog.Fatalf("Unable to start metrics server: %v", err) - } + defer utilruntime.HandleCrash() + err := cvo.RunMetrics(postMainContext, shutdownContext, o.ListenAddr) + resultChannel <- asyncResult{name: "metrics server", error: err} }() } - exit := make(chan struct{}) - exitClose := sync.Once{} - - // TODO: when we switch to graceful lock shutdown, this can be - // moved back inside RunOrDie - // TODO: properly wire ctx here - go leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ - Lock: lock, - LeaseDuration: leaseDuration, - RenewDeadline: renewDeadline, - RetryPeriod: retryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(localCtx context.Context) { - controllerCtx.Start(ctx) - select { - case <-ctx.Done(): - // WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel - // and client-go ContextCancelable, which allows us to block new API requests before - // we step down. However, the CVO isn't that sensitive to races and can tolerate - // brief overlap. - klog.Infof("Stepping down as leader") - // give the controllers some time to shut down - time.Sleep(100 * time.Millisecond) - // if we still hold the leader lease, clear the owner identity (other lease watchers - // still have to wait for expiration) like the new ReleaseOnCancel code will do. - if err := lock.Update(resourcelock.LeaderElectionRecord{}); err == nil { - // if we successfully clear the owner identity, we can safely delete the record - if err := lock.Client.ConfigMaps(lock.ConfigMapMeta.Namespace).Delete(lock.ConfigMapMeta.Name, nil); err != nil { - klog.Warningf("Unable to step down cleanly: %v", err) - } + informersDone := postMainContext.Done() + // FIXME: would be nice if there was a way to collect these. + controllerCtx.CVInformerFactory.Start(informersDone) + controllerCtx.OpenshiftConfigInformerFactory.Start(informersDone) + controllerCtx.InformerFactory.Start(informersDone) + + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + leaderelection.RunOrDie(postMainContext, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: leaseDuration, + RenewDeadline: renewDeadline, + RetryPeriod: retryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { // no need for this passed-through postMainContext, because goroutines we launch inside will use runContext + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + err := controllerCtx.CVO.Run(runContext, 2) + resultChannel <- asyncResult{name: "main operator", error: err} + }() + + if controllerCtx.AutoUpdate != nil { + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + err := controllerCtx.AutoUpdate.Run(2, runContext.Done()) + resultChannel <- asyncResult{name: "auto-update controller", error: err} + }() } - klog.Infof("Finished shutdown") - exitClose.Do(func() { close(exit) }) - case <-localCtx.Done(): - // we will exit in OnStoppedLeading - } + }, + OnStoppedLeading: func() { + klog.Info("Stopped leading; shutting down.") + runCancel() + }, }, - OnStoppedLeading: func() { - klog.Warning("leaderelection lost") - exitClose.Do(func() { close(exit) }) - }, - }, - }) + }) + resultChannel <- asyncResult{name: "leader controller", error: nil} + }() - <-exit + var shutdownTimer *time.Timer + for resultChannelCount > 0 { + klog.Infof("Waiting on %d outstanding goroutines.", resultChannelCount) + if shutdownTimer == nil { // running + select { + case <-runContext.Done(): + klog.Info("Run context completed; beginning two-minute graceful shutdown period.") + shutdownTimer = time.NewTimer(2 * time.Minute) + case result := <-resultChannel: + resultChannelCount-- + if result.error == nil { + klog.Infof("Collected %s goroutine.", result.name) + } else { + klog.Errorf("Collected %s goroutine: %v", result.name, result.error) + runCancel() // this will cause shutdownTimer initialization in the next loop + } + if result.name == "main operator" { + postMainCancel() + } + } + } else { // shutting down + select { + case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing. + shutdownCancel() + shutdownTimer.Stop() + case result := <-resultChannel: + resultChannelCount-- + if result.error == nil { + klog.Infof("Collected %s goroutine.", result.name) + } else { + klog.Errorf("Collected %s goroutine: %v", result.name, result.error) + } + if result.name == "main operator" { + postMainCancel() + } + } + } + } + klog.Info("Finished collecting operator goroutines.") } // createResourceLock initializes the lock. @@ -327,6 +365,7 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { sharedInformers := externalversions.NewSharedInformerFactory(client, resyncPeriod(o.ResyncInterval)()) + coInformer := sharedInformers.Config().V1().ClusterOperators() ctx := &Context{ CVInformerFactory: cvInformer, OpenshiftConfigInformerFactory: openshiftConfigInformer, @@ -340,12 +379,11 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { o.PayloadOverride, resyncPeriod(o.ResyncInterval)(), cvInformer.Config().V1().ClusterVersions(), - sharedInformers.Config().V1().ClusterOperators(), + coInformer, openshiftConfigInformer.Core().V1().ConfigMaps(), sharedInformers.Config().V1().Proxies(), cb.ClientOrDie(o.Namespace), cb.KubeClientOrDie(o.Namespace, useProtobuf), - o.EnableMetrics, o.Exclude, ), } @@ -358,18 +396,10 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { cb.KubeClientOrDie(o.Namespace), ) } - return ctx -} - -// Start launches the controllers in the provided context and any supporting -// infrastructure. When ch is closed the controllers will be shut down. -func (c *Context) Start(ctx context.Context) { - ch := ctx.Done() - go c.CVO.Run(ctx, 2) - if c.AutoUpdate != nil { - go c.AutoUpdate.Run(2, ch) + if o.ListenAddr != "" { + if err := ctx.CVO.RegisterMetrics(coInformer.Informer()); err != nil { + panic(err) + } } - c.CVInformerFactory.Start(ch) - c.OpenshiftConfigInformerFactory.Start(ch) - c.InformerFactory.Start(ch) + return ctx } diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index 446fba9d4d..de8e1d7ca3 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -238,15 +238,19 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { options.NodeName = "test-node" options.ReleaseImage = payloadImage1 options.PayloadOverride = filepath.Join(dir, "ignored") - options.EnableMetrics = false controllers := options.NewControllerContext(cb) worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "") controllers.CVO.SetSyncWorkerForTesting(worker) + lock, err := createResourceLock(cb, options.Namespace, options.Name) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controllers.Start(ctx) + go options.run(ctx, controllers, lock) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForUpdateAvailable(t, client, ns, false, "0.0.1") @@ -390,16 +394,20 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { options.NodeName = "test-node" options.ReleaseImage = payloadImage1 options.PayloadOverride = filepath.Join(dir, "ignored") - options.EnableMetrics = false options.ResyncInterval = 3 * time.Second controllers := options.NewControllerContext(cb) worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "") controllers.CVO.SetSyncWorkerForTesting(worker) + lock, err := createResourceLock(cb, options.Namespace, options.Name) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controllers.Start(ctx) + go options.run(ctx, controllers, lock) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForUpdateAvailable(t, client, ns, false, "0.0.1") @@ -497,13 +505,12 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { options.Name = ns options.ListenAddr = "" options.NodeName = "test-node" - options.EnableMetrics = false controllers := options.NewControllerContext(cb) worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "") controllers.CVO.SetSyncWorkerForTesting(worker) - lock, err := createResourceLock(cb, ns, ns) + lock, err := createResourceLock(cb, options.Namespace, options.Name) if err != nil { t.Fatal(err) } @@ -519,7 +526,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { // wait until the lock record exists err = wait.PollImmediate(200*time.Millisecond, 60*time.Second, func() (bool, error) { - _, err := kc.CoreV1().ConfigMaps(ns).Get(ns, metav1.GetOptions{}) + _, _, err := lock.Get() if err != nil { if errors.IsNotFound(err) { return false, nil @@ -541,26 +548,26 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { t.Fatalf("no leader election events found in\n%#v", events.Items) } - t.Logf("after the context is closed, the lock record should be deleted quickly") + t.Logf("after the context is closed, the lock should be released quickly") cancel() startTime := time.Now() var endTime time.Time // the lock should be deleted immediately err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { - _, err := kc.CoreV1().ConfigMaps(ns).Get(ns, metav1.GetOptions{}) - if errors.IsNotFound(err) { - endTime = time.Now() - return true, nil - } + electionRecord, _, err := lock.Get() if err != nil { + if errors.IsNotFound(err) { + return false, nil + } return false, err } - return false, nil + endTime = time.Now() + return electionRecord.HolderIdentity == "", nil }) if err != nil { t.Fatal(err) } - t.Logf("lock deleted in %s", endTime.Sub(startTime)) + t.Logf("lock released in %s", endTime.Sub(startTime)) select { case <-time.After(time.Second): @@ -667,7 +674,6 @@ metadata: options.NodeName = "test-node" options.ReleaseImage = payloadImage1 options.PayloadOverride = payloadDir - options.EnableMetrics = false controllers := options.NewControllerContext(cb) if err := controllers.CVO.InitializeFromPayload(cb.RestConfig(defaultQPS), cb.RestConfig(highQPS)); err != nil { t.Fatal(err) @@ -676,9 +682,14 @@ metadata: worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "") controllers.CVO.SetSyncWorkerForTesting(worker) + lock, err := createResourceLock(cb, options.Namespace, options.Name) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controllers.Start(ctx) + go options.run(ctx, controllers, lock) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForUpdateAvailable(t, client, ns, false, "0.0.1")