Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make --wait=false non-blocking, --wait=true blocks on system pods #5894

Merged
merged 12 commits into from
Nov 13, 2019
20 changes: 6 additions & 14 deletions cmd/minikube/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func initMinikubeFlags() {
startCmd.Flags().String(criSocket, "", "The cri socket path to be used.")
startCmd.Flags().String(networkPlugin, "", "The name of the network plugin.")
startCmd.Flags().Bool(enableDefaultCNI, false, "Enable the default CNI plugin (/etc/cni/net.d/k8s.conf). Used in conjunction with \"--network-plugin=cni\".")
startCmd.Flags().Bool(waitUntilHealthy, false, "Wait until Kubernetes core services are healthy before exiting.")
startCmd.Flags().Bool(waitUntilHealthy, true, "Block until the apiserver is servicing API requests")
startCmd.Flags().Duration(waitTimeout, 6*time.Minute, "max time to wait per Kubernetes core services to be healthy.")
startCmd.Flags().Bool(nativeSSH, true, "Use native Golang SSH client (default true). Set to 'false' to use the command line 'ssh' command when accessing the docker machine. Useful for the machine drivers when they will not start with 'Waiting for SSH'.")
startCmd.Flags().Bool(autoUpdate, true, "If set, automatically updates drivers to the latest version. Defaults to true.")
Expand Down Expand Up @@ -365,7 +365,11 @@ func runStart(cmd *cobra.Command, args []string) {
if driverName == driver.None {
prepareNone()
}
waitCluster(bs, config)
if viper.GetBool(waitUntilHealthy) {
if err := bs.WaitForCluster(config.KubernetesConfig, viper.GetDuration(waitTimeout)); err != nil {
exit.WithError("Wait failed", err)
}
}
if err := showKubectlInfo(kubeconfig, k8sVersion, config.Name); err != nil {
glog.Errorf("kubectl info: %v", err)
}
Expand All @@ -389,18 +393,6 @@ func enableAddons() {
}
}

func waitCluster(bs bootstrapper.Bootstrapper, config cfg.MachineConfig) {
var podsToWaitFor []string

if !viper.GetBool(waitUntilHealthy) {
// only wait for apiserver if wait=false
podsToWaitFor = []string{"apiserver"}
}
if err := bs.WaitForPods(config.KubernetesConfig, viper.GetDuration(waitTimeout), podsToWaitFor); err != nil {
exit.WithError("Wait failed", err)
}
}

func displayVersion(version string) {
prefix := ""
if viper.GetString(cfg.MachineProfile) != constants.DefaultMachineName {
Expand Down
2 changes: 1 addition & 1 deletion pkg/minikube/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Bootstrapper interface {
UpdateCluster(config.KubernetesConfig) error
RestartCluster(config.KubernetesConfig) error
DeleteCluster(config.KubernetesConfig) error
WaitForPods(config.KubernetesConfig, time.Duration, []string) error
WaitForCluster(config.KubernetesConfig, time.Duration) error
// LogCommands returns a map of log type to a command which will display that log.
LogCommands(LogOptions) map[string]string
SetupCerts(cfg config.KubernetesConfig) error
Expand Down
202 changes: 75 additions & 127 deletions pkg/minikube/bootstrapper/kubeadm/kubeadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ import (
"github.com/pkg/errors"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
kconst "k8s.io/kubernetes/cmd/kubeadm/app/constants"
Expand All @@ -66,7 +65,6 @@ const (
defaultCNIConfigPath = "/etc/cni/net.d/k8s.conf"
kubeletServiceFile = "/lib/systemd/system/kubelet.service"
kubeletSystemdConfFile = "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf"
AllPods = "ALL_PODS"
)

const (
Expand Down Expand Up @@ -96,22 +94,6 @@ var KubeadmExtraArgsWhitelist = map[int][]string{
},
}

type pod struct {
// Human friendly name
name string
key string
value string
}

// PodsByLayer are queries we run when health checking, sorted roughly by dependency layer
var PodsByLayer = []pod{
{"proxy", "k8s-app", "kube-proxy"},
{"etcd", "component", "etcd"},
{"scheduler", "component", "kube-scheduler"},
{"controller", "component", "kube-controller-manager"},
{"dns", "k8s-app", "kube-dns"},
}

// yamlConfigPath is the path to the kubeadm configuration
var yamlConfigPath = path.Join(vmpath.GuestEphemeralDir, "kubeadm.yaml")

Expand Down Expand Up @@ -354,7 +336,6 @@ func addAddons(files *[]assets.CopyableFile, data interface{}) error {

// client returns a Kubernetes client to use to speak to a kubeadm launched apiserver
func (k *Bootstrapper) client(k8s config.KubernetesConfig) (*kubernetes.Clientset, error) {
// Catch case if WaitForPods was called with a stale ~/.kube/config
config, err := kapi.ClientConfig(k.contextName)
if err != nil {
return nil, errors.Wrap(err, "client config")
Expand All @@ -369,67 +350,78 @@ func (k *Bootstrapper) client(k8s config.KubernetesConfig) (*kubernetes.Clientse
return kubernetes.NewForConfig(config)
}

// WaitForPods blocks until pods specified in podsToWaitFor appear to be healthy.
func (k *Bootstrapper) WaitForPods(k8s config.KubernetesConfig, timeout time.Duration, podsToWaitFor []string) error {
// Do not wait for "k8s-app" pods in the case of CNI, as they are managed
// by a CNI plugin which is usually started after minikube has been brought
// up. Otherwise, minikube won't start, as "k8s-app" pods are not ready.
componentsOnly := k8s.NetworkPlugin == "cni"
out.T(out.WaitingPods, "Waiting for:")

// Wait until the apiserver can answer queries properly. We don't care if the apiserver
// pod shows up as registered, but need the webserver for all subsequent queries.

if shouldWaitForPod("apiserver", podsToWaitFor) {
out.String(" apiserver")
if err := k.waitForAPIServer(k8s); err != nil {
return errors.Wrap(err, "waiting for apiserver")
// WaitForCluster blocks until the cluster appears to be healthy
func (k *Bootstrapper) WaitForCluster(k8s config.KubernetesConfig, timeout time.Duration) error {
start := time.Now()
out.T(out.Waiting, "Waiting for cluster to come online ...")

glog.Infof("waiting for apiserver process to appear ...")
err := wait.PollImmediate(time.Second*1, time.Minute*5, func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during process check")
}
rr, ierr := k.c.RunCmd(exec.Command("sudo", "pgrep", "kube-apiserver"))
if ierr != nil {
glog.Warningf("pgrep apiserver: %v cmd: %s", ierr, rr.Command())
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("apiserver process never appeared")
}
glog.Infof("duration metric: took %s to wait for apiserver process to appear ...", time.Since(start))

glog.Infof("waiting for apiserver healthz status ...")
hStart := time.Now()
healthz := func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during healthz check")
}

status, err := k.GetAPIServerStatus(net.ParseIP(k8s.NodeIP), k8s.NodePort)
if err != nil {
glog.Warningf("status: %v", err)
return false, nil
}
if status != "Running" {
return false, nil
}
return true, nil
}

if err = wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, healthz); err != nil {
return fmt.Errorf("apiserver healthz never reported healthy")
}
glog.Infof("duration metric: took %s to wait for apiserver healthz status ...", time.Since(hStart))

glog.Infof("waiting for pod list to contain data ...")
pStart := time.Now()
client, err := k.client(k8s)
if err != nil {
return errors.Wrap(err, "client")
}

for _, p := range PodsByLayer {
if componentsOnly && p.key != "component" { // skip component check if network plugin is cni
continue
podList := func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("cluster wait timed out during pod check")
}
if !shouldWaitForPod(p.name, podsToWaitFor) {
continue
// Wait for any system pod, as waiting for apiserver may block until etcd
pods, err := client.CoreV1().Pods("kube-system").List(meta.ListOptions{})
if len(pods.Items) == 0 {
return true, nil
}
out.String(" %s", p.name)
selector := labels.SelectorFromSet(labels.Set(map[string]string{p.key: p.value}))
if err := kapi.WaitForPodsWithLabelRunning(client, "kube-system", selector, timeout); err != nil {
return errors.Wrap(err, fmt.Sprintf("waiting for %s=%s", p.key, p.value))
if err != nil {
return false, nil
}
glog.Infof("%d kube-system pods found", len(pods.Items))
return true, nil
}
out.Ln("")
return nil
}

// shouldWaitForPod returns true if:
// 1. podsToWaitFor is nil
// 2. name is in podsToWaitFor
// 3. ALL_PODS is in podsToWaitFor
// else, return false
func shouldWaitForPod(name string, podsToWaitFor []string) bool {
if podsToWaitFor == nil {
return true
}
if len(podsToWaitFor) == 0 {
return false
}
for _, p := range podsToWaitFor {
if p == AllPods {
return true
}
if p == name {
return true
}
if err = wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, podList); err != nil {
return fmt.Errorf("apiserver never returned a pod list")
}
return false
glog.Infof("duration metric: took %s to wait for pod list to return data ...", time.Since(pStart))
return nil
}

// RestartCluster restarts the Kubernetes cluster configured by kubeadm
Expand Down Expand Up @@ -472,76 +464,32 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
}
}

if err := k.waitForAPIServer(k8s); err != nil {
return errors.Wrap(err, "waiting for apiserver")
}

// restart the proxy and coredns
if rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, yamlConfigPath))); err != nil {
return errors.Wrapf(err, fmt.Sprintf("addon phase cmd:%q", rr.Command()))
}

if err := k.adjustResourceLimits(); err != nil {
glog.Warningf("unable to adjust resource limits: %v", err)
}
return nil
}

// waitForAPIServer waits for the apiserver to start up
func (k *Bootstrapper) waitForAPIServer(k8s config.KubernetesConfig) error {
start := time.Now()
defer func() {
glog.Infof("duration metric: took %s to wait for apiserver status ...", time.Since(start))
}()

glog.Infof("Waiting for apiserver process ...")
// To give a better error message, first check for process existence via ssh
// Needs minutes in case the image isn't cached (such as with v1.10.x)
err := wait.PollImmediate(time.Millisecond*300, time.Minute*3, func() (bool, error) {
rr, ierr := k.c.RunCmd(exec.Command("sudo", "pgrep", "kube-apiserver"))
if ierr != nil {
glog.Warningf("pgrep apiserver: %v cmd: %s", ierr, rr.Command())
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("apiserver process never appeared")
}

glog.Infof("Waiting for apiserver to port healthy status ...")
var client *kubernetes.Clientset
f := func() (bool, error) {
// We must ensure that the apiserver is healthy before proceeding
glog.Infof("waiting for apiserver healthz ...")
healthz := func() (bool, error) {
status, err := k.GetAPIServerStatus(net.ParseIP(k8s.NodeIP), k8s.NodePort)
glog.Infof("apiserver status: %s, err: %v", status, err)
if err != nil {
glog.Warningf("status: %v", err)
return false, nil
}
if status != "Running" {
return false, nil
}
// Make sure apiserver pod is retrievable
if client == nil {
// We only want to get the clientset once, because this line takes ~1 second to complete
client, err = k.client(k8s)
if err != nil {
glog.Warningf("get kubernetes client: %v", err)
return false, nil
}
}
return true, nil
}
if err = wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, healthz); err != nil {
return fmt.Errorf("apiserver healthz never reported healthy")
}

_, err = client.CoreV1().Pods("kube-system").Get("kube-apiserver-minikube", metav1.GetOptions{})
if err != nil {
return false, nil
}
// restart the proxy and coredns
tstromberg marked this conversation as resolved.
Show resolved Hide resolved
if rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, yamlConfigPath))); err != nil {
return errors.Wrapf(err, fmt.Sprintf("addon phase cmd:%q", rr.Command()))
}

return true, nil
// TODO: Check apiserver/kubelet logs for fatal errors so that users don't
// need to wait minutes to find out their flag didn't work.
if err := k.adjustResourceLimits(); err != nil {
glog.Warningf("unable to adjust resource limits: %v", err)
}
err = wait.PollImmediate(kconst.APICallRetryInterval, 2*kconst.DefaultControlPlaneTimeout, f)
return err
return nil
}

// DeleteCluster removes the components that were started earlier
Expand Down
42 changes: 0 additions & 42 deletions pkg/minikube/bootstrapper/kubeadm/kubeadm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,45 +363,3 @@ func TestGenerateConfig(t *testing.T) {
}
}
}

func TestShouldWaitForPod(t *testing.T) {
tests := []struct {
description string
pod string
podsToWaitFor []string
expected bool
}{
{
description: "pods to wait for is nil",
pod: "apiserver",
expected: true,
}, {
description: "pods to wait for is empty",
pod: "apiserver",
podsToWaitFor: []string{},
}, {
description: "pod is in podsToWaitFor",
pod: "apiserver",
podsToWaitFor: []string{"etcd", "apiserver"},
expected: true,
}, {
description: "pod is not in podsToWaitFor",
pod: "apiserver",
podsToWaitFor: []string{"etcd", "gvisor"},
}, {
description: "wait for all pods",
pod: "apiserver",
podsToWaitFor: []string{"ALL_PODS"},
expected: true,
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
actual := shouldWaitForPod(test.pod, test.podsToWaitFor)
if actual != test.expected {
t.Fatalf("unexpected diff: got %t, expected %t", actual, test.expected)
}
})
}
}
1 change: 0 additions & 1 deletion pkg/minikube/out/out_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestOutT(t *testing.T) {
{Option, "Option", nil, " ▪ Option\n", " - Option\n"},
{WarningType, "Warning", nil, "⚠️ Warning\n", "! Warning\n"},
{FatalType, "Fatal: {{.error}}", V{"error": "ugh"}, "💣 Fatal: ugh\n", "X Fatal: ugh\n"},
{WaitingPods, "wait", nil, "⌛ wait", "* wait"},
{Issue, "http://i/{{.number}}", V{"number": 10000}, " ▪ http://i/10000\n", " - http://i/10000\n"},
{Usage, "raw: {{.one}} {{.two}}", V{"one": "'%'", "two": "%d"}, "💡 raw: '%' %d\n", "* raw: '%' %d\n"},
{Running, "Installing Kubernetes version {{.version}} ...", V{"version": "v1.13"}, "🏃 ... v1.13 تثبيت Kubernetes الإصدار\n", "* ... v1.13 تثبيت Kubernetes الإصدار\n"},
Expand Down
1 change: 0 additions & 1 deletion pkg/minikube/out/style.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ var styles = map[StyleEnum]style{
Stopped: {Prefix: "🛑 "},
WarningType: {Prefix: "⚠️ ", LowPrefix: lowWarning},
Waiting: {Prefix: "⌛ "},
WaitingPods: {Prefix: "⌛ ", OmitNewline: true},
Usage: {Prefix: "💡 "},
Launch: {Prefix: "🚀 "},
Sad: {Prefix: "😿 "},
Expand Down
5 changes: 2 additions & 3 deletions test/integration/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,8 @@ func validateKubectlGetPods(ctx context.Context, t *testing.T, profile string) {
if err != nil {
t.Errorf("%s failed: %v", rr.Args, err)
}
podName := "kube-apiserver-minikube"
if !strings.Contains(rr.Stdout.String(), podName) {
t.Errorf("%s is not up in running, got: %s\n", podName, rr.Stdout.String())
if !strings.Contains(rr.Stdout.String(), "kube-system") {
t.Errorf("%s = %q, want *kube-system*", rr.Command(), rr.Stdout.String())
}
}

Expand Down