Skip to content

Commit

Permalink
kubeadm: drop concurrency when waiting for kubelet /healthz
Browse files Browse the repository at this point in the history
The function wait.go#WaitForKubeletAndFunc() has been used in
a number of places in kubeadm. It starts a go routine to wait for
the kubelet /healthz and in parallel starts another go routine
to wait for an custom function.

This logic is problematic. If kubeadm is waiting for the kubelet
in parallel with something that requires the kubelet, the right
solution would be to first wait for the kubelet in serial and only
then proceed with the other action. The parallelism here particularly
during "init" required a unwanted "initial timeout" of 40s, before
the kubelet waiting even starts. In most cases, this makes the kubelet
waiter to not even start, while the main point of waiting becomes
the "other action".

- Remove the function WaitForKubeletAndFunc() from the Waiter interface.
- Rename the function WaitForHealthyKubelet() to just WaitForKubelet()
to be consistent with the naming WaitForAPI().
- Update WaitForKubelet() to not use TryRunCommand() and instead
use PollUntilContextTimeout().
- Remove the "initial timeout" of 40s in WaitForKubelet().
- Make both WaitForKubelet() and WaitForAPI() use similar error
handling and output.
- Update all usage of WaitForKubelet() to be a serial call before
any other action, such as another wait* call.
- Make the default wait timeout for the kubelet
/healthz to be 1 minute (kubeadmconstants.DefaultKubeletTimeout).
- Apply updates to all implementations of the Waiter interface.
  • Loading branch information
neolit123 committed Dec 20, 2023
1 parent 1f07da7 commit 5571188
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 77 deletions.
27 changes: 18 additions & 9 deletions cmd/kubeadm/app/cmd/phases/init/waitcontrolplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/pkg/errors"

clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
)
Expand Down Expand Up @@ -79,24 +79,23 @@ func runWaitControlPlanePhase(c workflow.RunData) error {
}
}

// waiter holds the apiclient.Waiter implementation of choice, responsible for querying the API server in various ways and waiting for conditions to be fulfilled
klog.V(1).Infoln("[wait-control-plane] Waiting for the API server to be healthy")

// WaitForAPI uses the /healthz endpoint, thus a client without permissions works fine
// Both Wait* calls below use a /healthz endpoint, thus a client without permissions works fine
client, err := data.ClientWithoutBootstrap()
if err != nil {
return errors.Wrap(err, "cannot obtain client without bootstrap")
}

timeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration
waiter, err := newControlPlaneWaiter(data.DryRun(), timeout, client, data.OutputWriter())
waiter, err := newControlPlaneWaiter(data.DryRun(), 0, client, data.OutputWriter())
if err != nil {
return errors.Wrap(err, "error creating waiter")
}

fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods from directory %q. This can take up to %v\n", data.ManifestDir(), timeout)
controlPlaneTimeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration
fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods"+
" from directory %q\n",
data.ManifestDir())

if err := waiter.WaitForKubeletAndFunc(waiter.WaitForAPI); err != nil {
handleError := func(err error) error {
context := struct {
Error string
Socket string
Expand All @@ -109,6 +108,16 @@ func runWaitControlPlanePhase(c workflow.RunData) error {
return errors.New("couldn't initialize a Kubernetes cluster")
}

waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout)
if err := waiter.WaitForKubelet(); err != nil {
return handleError(err)
}

waiter.SetTimeout(controlPlaneTimeout)
if err := waiter.WaitForAPI(); err != nil {
return handleError(err)
}

return nil
}

Expand Down
12 changes: 9 additions & 3 deletions cmd/kubeadm/app/cmd/phases/join/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,14 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) {
// Now the kubelet will perform the TLS Bootstrap, transforming /etc/kubernetes/bootstrap-kubelet.conf to /etc/kubernetes/kubelet.conf
// Wait for the kubelet to create the /etc/kubernetes/kubelet.conf kubeconfig file. If this process
// times out, display a somewhat user-friendly message.
waiter := apiclient.NewKubeWaiter(nil, kubeadmconstants.TLSBootstrapTimeout, os.Stdout)
if err := waiter.WaitForKubeletAndFunc(waitForTLSBootstrappedClient); err != nil {
waiter := apiclient.NewKubeWaiter(nil, 0, os.Stdout)
waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout)
if err := waiter.WaitForKubelet(); err != nil {
fmt.Printf(kubeadmJoinFailMsg, err)
return err
}

if err := waitForTLSBootstrappedClient(); err != nil {
fmt.Printf(kubeadmJoinFailMsg, err)
return err
}
Expand All @@ -227,7 +233,7 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) {

// waitForTLSBootstrappedClient waits for the /etc/kubernetes/kubelet.conf file to be available
func waitForTLSBootstrappedClient() error {
fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap...")
fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap")

// Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned.
return wait.PollImmediate(kubeadmconstants.TLSBootstrapRetryInterval, kubeadmconstants.TLSBootstrapTimeout, func() (bool, error) {
Expand Down
2 changes: 2 additions & 0 deletions cmd/kubeadm/app/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ const (

// DefaultControlPlaneTimeout specifies the default control plane (actually API Server) timeout for use by kubeadm
DefaultControlPlaneTimeout = 4 * time.Minute
// DefaultKubeletTimeout specifies the default kubelet timeout
DefaultKubeletTimeout = 4 * time.Minute

// MinimumAddressesInServiceSubnet defines minimum amount of nodes the Service subnet should allow.
// We need at least ten, because the DNS service is always at the tenth cluster clusterIP
Expand Down
9 changes: 2 additions & 7 deletions cmd/kubeadm/app/phases/upgrade/staticpods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,8 @@ func (w *fakeWaiter) WaitForStaticPodHashChange(_, _, _ string) error {
return w.errsToReturn[waitForHashChange]
}

// WaitForHealthyKubelet returns a dummy nil just to implement the interface
func (w *fakeWaiter) WaitForHealthyKubelet(_ time.Duration, _ string) error {
return nil
}

// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
func (w *fakeWaiter) WaitForKubeletAndFunc(f func() error) error {
// WaitForHKubelet returns a dummy nil just to implement the interface
func (w *fakeWaiter) WaitForKubelet() error {
return nil
}

Expand Down
116 changes: 66 additions & 50 deletions cmd/kubeadm/app/util/apiclient/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ type Waiter interface {
WaitForStaticPodHashChange(nodeName, component, previousHash string) error
// WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods
WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error
// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
WaitForKubeletAndFunc(f func() error) error
// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'
WaitForKubelet() error
// SetTimeout adjusts the timeout to the specified duration
SetTimeout(timeout time.Duration)
}
Expand All @@ -76,17 +74,28 @@ func NewKubeWaiter(client clientset.Interface, timeout time.Duration, writer io.

// WaitForAPI waits for the API Server's /healthz endpoint to report "ok"
func (w *KubeWaiter) WaitForAPI() error {
fmt.Printf("[api-check] Waiting for a healthy API server. This can take up to %v\n", w.timeout)

start := time.Now()
return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) {
healthStatus := 0
w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
err := wait.PollUntilContextTimeout(
context.Background(),
kubeadmconstants.APICallRetryInterval,
w.timeout,
true, func(ctx context.Context) (bool, error) {
healthStatus := 0
w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus)
if healthStatus != http.StatusOK {
return false, nil
}
return true, nil
})
if err != nil {
fmt.Printf("[api-check] The API server is not healthy after %v\n", time.Since(start))
return err
}

fmt.Printf("[apiclient] All control plane components are healthy after %f seconds\n", time.Since(start).Seconds())
return true, nil
})
fmt.Printf("[api-check] The API server is healthy after %v\n", time.Since(start))
return nil
}

// WaitForPodsWithLabel will lookup pods with the given label and wait until they are all
Expand Down Expand Up @@ -133,47 +142,54 @@ func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
})
}

// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
func (w *KubeWaiter) WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error {
time.Sleep(initialTimeout)
fmt.Printf("[kubelet-check] Initial timeout of %v passed.\n", initialTimeout)
return TryRunCommand(func() error {
client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})}
resp, err := client.Get(healthzEndpoint)
if err != nil {
fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.")
fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' failed with error: %v.\n", healthzEndpoint, err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.")
fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' returned HTTP code %d\n", healthzEndpoint, resp.StatusCode)
return errors.New("the kubelet healthz endpoint is unhealthy")
}
return nil
}, 5) // a failureThreshold of five means waiting for a total of 155 seconds
}
// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'.
func (w *KubeWaiter) WaitForKubelet() error {
var (
lastError error
start = time.Now()
healthzEndpoint = fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)
)

// WaitForKubeletAndFunc waits primarily for the function f to execute, even though it might take some time. If that takes a long time, and the kubelet
// /healthz continuously are unhealthy, kubeadm will error out after a period of exponential backoff
func (w *KubeWaiter) WaitForKubeletAndFunc(f func() error) error {
errorChan := make(chan error, 1)
fmt.Printf("[kubelet-check] Waiting for a healthy kubelet. This can take up to %v\n", w.timeout)

go func(errC chan error, waiter Waiter) {
if err := waiter.WaitForHealthyKubelet(40*time.Second, fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)); err != nil {
errC <- err
}
}(errorChan, w)
formatError := func(cause string) error {
return errors.Errorf("The HTTP call equal to 'curl -sSL %s' returned %s\n",
healthzEndpoint, cause)
}

go func(errC chan error) {
// This main goroutine sends whatever the f function returns (error or not) to the channel
// This in order to continue on success (nil error), or just fail if the function returns an error
errC <- f()
}(errorChan)
err := wait.PollUntilContextTimeout(
context.Background(),
kubeadmconstants.APICallRetryInterval,
w.timeout,
true, func(ctx context.Context) (bool, error) {
client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthzEndpoint, nil)
if err != nil {
lastError = formatError(fmt.Sprintf("error: %v", err))
return false, err
}
resp, err := client.Do(req)
if err != nil {
lastError = formatError(fmt.Sprintf("error: %v", err))
return false, nil
}
defer func() {
_ = resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK {
lastError = formatError(fmt.Sprintf("status code: %d", resp.StatusCode))
return false, nil
}

return true, nil
})
if err != nil {
fmt.Printf("[kubelet-check] The kubelet is not healthy after %v\n", time.Since(start))
return lastError
}

// This call is blocking until one of the goroutines sends to errorChan
return <-errorChan
fmt.Printf("[kubelet-check] The kubelet is healthy after %v\n", time.Since(start))
return nil
}

// SetTimeout adjusts the timeout to the specified duration
Expand Down
11 changes: 3 additions & 8 deletions cmd/kubeadm/app/util/dryrun/dryrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,9 @@ func (w *Waiter) WaitForPodToDisappear(podName string) error {
return nil
}

// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok'
func (w *Waiter) WaitForHealthyKubelet(_ time.Duration, healthzEndpoint string) error {
fmt.Printf("[dryrun] Would make sure the kubelet %q endpoint is healthy\n", healthzEndpoint)
return nil
}

// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function
func (w *Waiter) WaitForKubeletAndFunc(f func() error) error {
// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'
func (w *Waiter) WaitForKubelet() error {
fmt.Println("[dryrun] Would make sure the kubelet's /healthz endpoint is healthy")
return nil
}

Expand Down

0 comments on commit 5571188

Please sign in to comment.