From 1fc1da40c8bd618383ad3fc1b6f3e06e1afe1846 Mon Sep 17 00:00:00 2001 From: Michal Fojtik Date: Thu, 11 Apr 2019 13:24:38 +0200 Subject: [PATCH 1/5] UPSTREAM: 76452: wire SIGTERM signal channel to controller manager and scheduler --- vendor/k8s.io/kubernetes/cmd/hyperkube/main.go | 4 ++-- .../cmd/kube-controller-manager/app/controllermanager.go | 4 ++-- vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go | 8 +++----- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/vendor/k8s.io/kubernetes/cmd/hyperkube/main.go b/vendor/k8s.io/kubernetes/cmd/hyperkube/main.go index 8cb1ec2ad423..9ed0d0d8e0b8 100644 --- a/vendor/k8s.io/kubernetes/cmd/hyperkube/main.go +++ b/vendor/k8s.io/kubernetes/cmd/hyperkube/main.go @@ -94,7 +94,7 @@ func NewHyperKubeCommand(stopCh <-chan struct{}) (*cobra.Command, []func() *cobr return ret } controller := func() *cobra.Command { - ret := kubecontrollermanager.NewControllerManagerCommand() + ret := kubecontrollermanager.NewControllerManagerCommand(stopCh) // add back some unfortunate aliases that should be removed ret.Aliases = []string{"controller-manager"} return ret @@ -106,7 +106,7 @@ func NewHyperKubeCommand(stopCh <-chan struct{}) (*cobra.Command, []func() *cobr return ret } scheduler := func() *cobra.Command { - ret := kubescheduler.NewSchedulerCommand() + ret := kubescheduler.NewSchedulerCommand(stopCh) // add back some unfortunate aliases that should be removed ret.Aliases = []string{"scheduler"} return ret diff --git a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go index 4a0cf681c103..85504438f348 100644 --- a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go +++ b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go @@ -78,7 +78,7 @@ const ( ) // NewControllerManagerCommand creates a *cobra.Command object with default parameters -func NewControllerManagerCommand() *cobra.Command { +func NewControllerManagerCommand(stopCh <-chan struct{}) *cobra.Command { s, err := options.NewKubeControllerManagerOptions() if err != nil { klog.Fatalf("unable to initialize command options: %v", err) @@ -113,7 +113,7 @@ controller, and serviceaccounts controller.`, os.Exit(1) } - if err := Run(c.Complete(), wait.NeverStop); err != nil { + if err := Run(c.Complete(), stopCh); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } diff --git a/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go b/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go index 615f9e8f18f7..749b61fedff7 100644 --- a/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go +++ b/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go @@ -67,7 +67,7 @@ import ( ) // NewSchedulerCommand creates a *cobra.Command object with default parameters -func NewSchedulerCommand() *cobra.Command { +func NewSchedulerCommand(stopCh <-chan struct{}) *cobra.Command { opts, err := options.NewOptions() if err != nil { klog.Fatalf("unable to initialize command options: %v", err) @@ -83,7 +83,7 @@ constraints, affinity and anti-affinity specifications, data locality, inter-wor interference, deadlines, and so on. Workload-specific requirements will be exposed through the API as necessary.`, Run: func(cmd *cobra.Command, args []string) { - if err := runCommand(cmd, args, opts); err != nil { + if err := runCommand(cmd, args, opts, stopCh); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } @@ -114,7 +114,7 @@ through the API as necessary.`, } // runCommand runs the scheduler. -func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error { +func runCommand(cmd *cobra.Command, args []string, opts *options.Options, stopCh <-chan struct{}) error { verflag.PrintAndExitIfRequested() utilflag.PrintFlags(cmd.Flags()) @@ -141,8 +141,6 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error os.Exit(1) } - stopCh := make(chan struct{}) - // Get the completed config cc := c.Complete() From 0113a83bc813a6ab3e532948112aef0eb177edc3 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 11 Apr 2019 14:37:22 +0200 Subject: [PATCH 2/5] UPSTREAM: 72970: apiserver: sync with http server shutdown to flush existing connections --- .../app/controllermanager.go | 3 ++- .../app/controllermanager.go | 3 ++- .../cmd/kube-scheduler/app/server.go | 3 ++- .../pkg/server/deprecated_insecure_serving.go | 4 ++- .../apiserver/pkg/server/genericapiserver.go | 9 +++++-- .../pkg/server/genericapiserver_test.go | 5 ++-- .../apiserver/pkg/server/secure_serving.go | 26 +++++++++++-------- 7 files changed, 34 insertions(+), 19 deletions(-) diff --git a/vendor/k8s.io/kubernetes/cmd/cloud-controller-manager/app/controllermanager.go b/vendor/k8s.io/kubernetes/cmd/cloud-controller-manager/app/controllermanager.go index 0df21242a9ba..1b20567db08d 100644 --- a/vendor/k8s.io/kubernetes/cmd/cloud-controller-manager/app/controllermanager.go +++ b/vendor/k8s.io/kubernetes/cmd/cloud-controller-manager/app/controllermanager.go @@ -149,7 +149,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error if c.SecureServing != nil { unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) - if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { + // TODO: handle stoppedCh returned by c.SecureServing.Serve + if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { return err } } diff --git a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go index 85504438f348..666beb899a48 100644 --- a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go +++ b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go @@ -178,7 +178,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { if c.SecureServing != nil { unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) - if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { + // TODO: handle stoppedCh returned by c.SecureServing.Serve + if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { return err } } diff --git a/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go b/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go index 749b61fedff7..f8f390cd1802 100644 --- a/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go +++ b/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go @@ -220,7 +220,8 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error } if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) - if err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil { + // TODO: handle stoppedCh returned by c.SecureServing.Serve + if _, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start healthz server: %v", err) } diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go index a78250edae9b..4f8d07a42dc3 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/deprecated_insecure_serving.go @@ -50,7 +50,9 @@ func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTime } else { klog.Infof("Serving insecurely on %s", s.Listener.Addr()) } - return RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh) + _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh) + // NOTE: we do not handle stoppedCh returned by RunServer for graceful termination here + return err } func (s *DeprecatedInsecureServingInfo) NewLoopbackClientConfig() (*rest.Config, error) { diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index be3c156cb4e0..c98e43a26246 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -319,9 +319,11 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) - + var stoppedCh <-chan struct{} if s.SecureServingInfo != nil && s.Handler != nil { - if err := s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh); err != nil { + var err error + stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh) + if err != nil { close(internalStopCh) return err } @@ -334,6 +336,9 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { <-stopCh close(internalStopCh) + if stoppedCh != nil { + <-stoppedCh + } s.HandlerChainWaitGroup.Wait() close(auditStopCh) }() diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 8769b53f58bf..deb1451db4ac 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -544,9 +544,9 @@ func TestGracefulShutdown(t *testing.T) { // get port serverPort := ln.Addr().(*net.TCPAddr).Port - err = RunServer(insecureServer, ln, 10*time.Second, stopCh) + stoppedCh, err := RunServer(insecureServer, ln, 10*time.Second, stopCh) if err != nil { - t.Errorf("RunServer err: %v", err) + t.Fatalf("RunServer err: %v", err) } graceCh := make(chan struct{}) @@ -567,6 +567,7 @@ func TestGracefulShutdown(t *testing.T) { close(stopCh) // wait for wait group handler finish s.HandlerChainWaitGroup.Wait() + <-stoppedCh // check server all handlers finished. if !graceShutdown { diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go index 52519f85ae7c..45b43322b8bc 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go @@ -38,12 +38,12 @@ const ( defaultKeepAlivePeriod = 3 * time.Minute ) -// serveSecurely runs the secure http server. It fails only if certificates cannot -// be loaded or the initial listen call fails. The actual server loop (stoppable by closing -// stopCh) runs in a go routine, i.e. serveSecurely does not block. -func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) error { +// Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails. +// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block. +// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed. +func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) { if s.Listener == nil { - return fmt.Errorf("listener must not be nil") + return nil, fmt.Errorf("listener must not be nil") } secureServer := &http.Server{ @@ -119,7 +119,7 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur // apply settings to the server if err := http2.ConfigureServer(secureServer, http2Options); err != nil { - return fmt.Errorf("error configuring http2: %v", err) + return nil, fmt.Errorf("error configuring http2: %v", err) } klog.Infof("Serving securely on %s", secureServer.Addr) @@ -127,21 +127,25 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur } // RunServer listens on the given port if listener is not given, -// then spawns a go-routine continuously serving -// until the stopCh is closed. This function does not block. +// then spawns a go-routine continuously serving until the stopCh is closed. +// It returns a stoppedCh that is closed when all non-hijacked active requests +// have been processed. +// This function does not block // TODO: make private when insecure serving is gone from the kube-apiserver func RunServer( server *http.Server, ln net.Listener, shutDownTimeout time.Duration, stopCh <-chan struct{}, -) error { +) (<-chan struct{}, error) { if ln == nil { - return fmt.Errorf("listener must not be nil") + return nil, fmt.Errorf("listener must not be nil") } // Shutdown server gracefully. + stoppedCh := make(chan struct{}) go func() { + defer close(stoppedCh) <-stopCh ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) server.Shutdown(ctx) @@ -168,7 +172,7 @@ func RunServer( } }() - return nil + return stoppedCh, nil } type NamedTLSCert struct { From d0e8322b8961c3c1f8c1842ca7a3c946130a92d6 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 11 Apr 2019 13:37:04 +0200 Subject: [PATCH 3/5] UPSTREAM: 76452: apiserver: wire stoppedCh further up the call chain --- .../apiserver/pkg/server/genericapiserver.go | 17 ++++++++++++----- .../apiserver/pkg/server/secure_serving.go | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index c98e43a26246..30d9c678eb7f 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -280,7 +280,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { s.installReadyz(stopCh) // close socket after delayed stopCh - err := s.NonBlockingRun(delayedStopCh) + serverDoneCh, err := s.NonBlockingRun(delayedStopCh) if err != nil { return err } @@ -299,12 +299,15 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // Wait for all requests to finish, which are bounded by the RequestTimeout variable. s.HandlerChainWaitGroup.Wait() + // wait for server listener to be closed + <-serverDoneCh + return nil } // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. -func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { +func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) { // Use an stop channel to allow graceful shutdown without dropping audit events // after http server shutdown. auditStopCh := make(chan struct{}) @@ -313,7 +316,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { // before http server start serving. Otherwise the Backend.ProcessEvents call might block. if s.AuditBackend != nil { if err := s.AuditBackend.Run(auditStopCh); err != nil { - return fmt.Errorf("failed to run the audit backend: %v", err) + return nil, fmt.Errorf("failed to run the audit backend: %v", err) } } @@ -325,8 +328,12 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh) if err != nil { close(internalStopCh) - return err + return nil, err } + } else { + ch := make(chan struct{}) + close(ch) + stoppedCh = ch } // Now that listener have bound successfully, it is the @@ -349,7 +356,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err) } - return nil + return stoppedCh, nil } // installAPIResources is a private method for installing the REST storage backing each api groupversionresource diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go index 45b43322b8bc..3b7aa9276406 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go @@ -86,7 +86,7 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur // need to load the certs at least once if err := loader.CheckCerts(); err != nil { - return err + return nil, err } go loader.Run(stopCh) From 12bd741c9c1fe73a76e787b7e5826526dcc8ba11 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 11 Apr 2019 14:04:55 +0200 Subject: [PATCH 4/5] UPSTREAM: 76452: Wire done channel in scheduler+{cloud,kube}-controller-manager --- .../app/controllermanager.go | 25 ++++++++-- .../app/controllermanager.go | 28 ++++++++--- .../cmd/kube-scheduler/app/server.go | 48 +++++++++---------- 3 files changed, 66 insertions(+), 35 deletions(-) diff --git a/vendor/k8s.io/kubernetes/cmd/cloud-controller-manager/app/controllermanager.go b/vendor/k8s.io/kubernetes/cmd/cloud-controller-manager/app/controllermanager.go index 1b20567db08d..de16baef585a 100644 --- a/vendor/k8s.io/kubernetes/cmd/cloud-controller-manager/app/controllermanager.go +++ b/vendor/k8s.io/kubernetes/cmd/cloud-controller-manager/app/controllermanager.go @@ -26,6 +26,7 @@ import ( "github.com/spf13/cobra" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server" @@ -111,6 +112,16 @@ the cloud specific control loops shipped with Kubernetes.`, // Run runs the ExternalCMServer. This should never exit. func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + go func() { + select { + case <-stopCh: + cancel() + case <-ctx.Done(): + } + }() + // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) @@ -149,9 +160,13 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error if c.SecureServing != nil { unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) - // TODO: handle stoppedCh returned by c.SecureServing.Serve - if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { + if serverStoppedCh, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { return err + } else { + defer func() { + cancel() + <-serverStoppedCh + }() } } if c.InsecureServing != nil { @@ -204,13 +219,15 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { - klog.Fatalf("leaderelection lost") + cancel() + utilruntime.HandleError(fmt.Errorf("leaderelection lost")) }, }, WatchDog: electionChecker, Name: "cloud-controller-manager", }) - panic("unreachable") + + return nil } // startControllers starts the cloud specific controller loops. diff --git a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go index 666beb899a48..a4714bc343e6 100644 --- a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go +++ b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go @@ -155,6 +155,16 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration { // Run runs the KubeControllerManagerOptions. This should never exit. func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + go func() { + select { + case <-stopCh: + cancel() + case <-ctx.Done(): + } + }() + // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) @@ -178,16 +188,20 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { if c.SecureServing != nil { unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) - // TODO: handle stoppedCh returned by c.SecureServing.Serve - if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { + if serverStoppedCh, err := c.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { return err + } else { + defer func() { + cancel() + <-serverStoppedCh + }() } } if c.InsecureServing != nil { unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}} handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn) - if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil { + if err := c.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil { return err } } @@ -256,7 +270,7 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { klog.Fatalf("error creating lock: %v", err) } - leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, @@ -264,13 +278,15 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { - klog.Fatalf("leaderelection lost") + cancel() + utilruntime.HandleError(fmt.Errorf("leaderelection lost")) }, }, WatchDog: electionChecker, Name: "kube-controller-manager", }) - panic("unreachable") + + return nil } type ControllerContext struct { diff --git a/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go b/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go index f8f390cd1802..816f886c6565 100644 --- a/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go +++ b/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go @@ -163,6 +163,16 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, stopCh // Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed. func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + go func() { + select { + case <-stopCh: + cancel() + case <-ctx.Done(): + } + }() + var storageClassInformer storageinformers.StorageClassInformer if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { storageClassInformer = cc.InformerFactory.Storage().V1().StorageClasses() @@ -220,10 +230,14 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error } if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) - // TODO: handle stoppedCh returned by c.SecureServing.Serve - if _, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil { + if serverStoppedCh, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start healthz server: %v", err) + } else { + defer func() { + cancel() + <-serverStoppedCh + }() } } @@ -235,27 +249,12 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error cc.InformerFactory.WaitForCacheSync(stopCh) controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced) - // Prepare a reusable runCommand function. - run := func(ctx context.Context) { - sched.Run() - <-ctx.Done() - } - - ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here - defer cancel() - - go func() { - select { - case <-stopCh: - cancel() - case <-ctx.Done(): - } - }() - // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ - OnStartedLeading: run, + OnStartedLeading: func(context.Context) { + sched.Run() + }, OnStoppedLeading: func() { utilruntime.HandleError(fmt.Errorf("lost master")) }, @@ -266,13 +265,12 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error } leaderElector.Run(ctx) - - return fmt.Errorf("lost lease") + } else { + // Leader election is disabled, so runCommand inline until done. + sched.Run() } - // Leader election is disabled, so runCommand inline until done. - run(ctx) - return fmt.Errorf("finished without leader elect") + return nil } // buildHandlerChain wraps the given handler with the standard filters. From 1aecdea3e7907674a49ac78fe0b8cd29eba400cd Mon Sep 17 00:00:00 2001 From: Michal Fojtik Date: Thu, 11 Apr 2019 14:36:40 +0200 Subject: [PATCH 5/5] integration: pass stop channel to kube controller manager --- test/util/server/server.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/util/server/server.go b/test/util/server/server.go index 758a2f4e80fd..71ff781742aa 100644 --- a/test/util/server/server.go +++ b/test/util/server/server.go @@ -41,6 +41,7 @@ import ( authorizationv1typedclient "github.com/openshift/client-go/authorization/clientset/versioned/typed/authorization/v1" projectv1typedclient "github.com/openshift/client-go/project/clientset/versioned/typed/project/v1" "github.com/openshift/library-go/pkg/crypto" + "github.com/openshift/origin/pkg/api/legacy" "github.com/openshift/origin/pkg/cmd/openshift-apiserver" "github.com/openshift/origin/pkg/cmd/openshift-controller-manager" @@ -56,10 +57,11 @@ import ( newproject "github.com/openshift/origin/pkg/oc/cli/admin/project" "github.com/openshift/origin/test/util" - // install all APIs - _ "github.com/openshift/origin/pkg/api/install" _ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/extensions/install" + + // install all APIs + _ "github.com/openshift/origin/pkg/api/install" ) var ( @@ -667,7 +669,7 @@ func startKubernetesControllers(masterConfig *configapi.MasterConfig, adminKubeC } go func() { - cmd := kube_controller_manager.NewControllerManagerCommand() + cmd := kube_controller_manager.NewControllerManagerCommand(wait.NeverStop) if err := cmd.ParseFlags(args); err != nil { klog.Errorf("kube-controller-manager failed to parse flags: %v", err) return