Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip kubeadm if cluster is running & properly configured #7124

Merged
merged 3 commits into from
Mar 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/minikube/bootstrapper/bsutil/binaries.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func TransferBinaries(cfg config.KubernetesConfig, c command.Runner) error {
return err
}

// stop kubelet to avoid "Text File Busy" error
if _, err := c.RunCmd(exec.Command("/bin/bash", "-c", "pgrep kubelet && sudo systemctl stop kubelet")); err != nil {
glog.Warningf("unable to stop kubelet: %s", err)
}

var g errgroup.Group
for _, name := range constants.KubernetesReleaseBinaries {
name := name
Expand Down
6 changes: 3 additions & 3 deletions pkg/minikube/bootstrapper/bsutil/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ const (
// ConfigFileAssets returns configuration file assets
func ConfigFileAssets(cfg config.KubernetesConfig, kubeadm []byte, kubelet []byte, kubeletSvc []byte, defaultCNIConfig []byte) []assets.CopyableFile {
fs := []assets.CopyableFile{
assets.NewMemoryAssetTarget(kubeadm, KubeadmYamlPath, "0640"),
assets.NewMemoryAssetTarget(kubelet, KubeletSystemdConfFile, "0644"),
assets.NewMemoryAssetTarget(kubeletSvc, KubeletServiceFile, "0644"),
assets.NewMemoryAssetTarget(kubeadm, KubeadmYamlPath+".new", "0640"),
assets.NewMemoryAssetTarget(kubelet, KubeletSystemdConfFile+".new", "0644"),
assets.NewMemoryAssetTarget(kubeletSvc, KubeletServiceFile+".new", "0644"),
}
// Copy the default CNI config (k8s.conf), so that kubelet can successfully
// start a Pod in the case a user hasn't manually installed any CNI plugin
Expand Down
70 changes: 69 additions & 1 deletion pkg/minikube/bootstrapper/bsutil/kverify/kverify.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/docker/machine/libmachine/state"
"github.com/golang/glog"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -78,6 +79,68 @@ func apiServerPID(cr command.Runner) (int, error) {
return strconv.Atoi(s)
}

// ExpectedComponentsRunning returns whether or not all expected components are running
func ExpectedComponentsRunning(cs *kubernetes.Clientset) error {
expected := []string{
"kube-dns", // coredns
"etcd",
"kube-apiserver",
"kube-controller-manager",
"kube-proxy",
"kube-scheduler",
}

found := map[string]bool{}

pods, err := cs.CoreV1().Pods("kube-system").List(meta.ListOptions{})
if err != nil {
return err
}

for _, pod := range pods.Items {
glog.Infof("found pod: %s", podStatusMsg(pod))
if pod.Status.Phase != core.PodRunning {
continue
}
for k, v := range pod.ObjectMeta.Labels {
if k == "component" || k == "k8s-app" {
found[v] = true
}
}
}

missing := []string{}
for _, e := range expected {
if !found[e] {
missing = append(missing, e)
}
}
if len(missing) > 0 {
return fmt.Errorf("missing components: %v", strings.Join(missing, ", "))
}
return nil
}

// podStatusMsg returns a human-readable pod status, for generating debug status
func podStatusMsg(pod core.Pod) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%q [%s] %s", pod.ObjectMeta.GetName(), pod.ObjectMeta.GetUID(), pod.Status.Phase))
for i, c := range pod.Status.Conditions {
if c.Reason != "" {
if i == 0 {
sb.WriteString(": ")
} else {
sb.WriteString(" / ")
}
sb.WriteString(fmt.Sprintf("%s:%s", c.Type, c.Reason))
}
if c.Message != "" {
sb.WriteString(fmt.Sprintf(" (%s)", c.Message))
}
}
return sb.String()
}

// WaitForSystemPods verifies essential pods for running kurnetes is running
func WaitForSystemPods(r cruntime.Manager, bs bootstrapper.Bootstrapper, cr command.Runner, client *kubernetes.Clientset, start time.Time, timeout time.Duration) error {
glog.Info("waiting for kube-system pods to appear ...")
Expand All @@ -99,6 +162,10 @@ func WaitForSystemPods(r cruntime.Manager, bs bootstrapper.Bootstrapper, cr comm
return false, nil
}
glog.Infof("%d kube-system pods found", len(pods.Items))
for _, pod := range pods.Items {
glog.Infof(podStatusMsg(pod))
}

if len(pods.Items) < 2 {
return false, nil
}
Expand Down Expand Up @@ -159,7 +226,7 @@ func APIServerStatus(cr command.Runner, ip net.IP, port int) (state.State, error

pid, err := apiServerPID(cr)
if err != nil {
glog.Warningf("unable to get apiserver pid: %v", err)
glog.Warningf("stopped: unable to get apiserver pid: %v", err)
return state.Stopped, nil
}

Expand Down Expand Up @@ -205,6 +272,7 @@ func apiServerHealthz(ip net.IP, port int) (state.State, error) {
resp, err := client.Get(url)
// Connection refused, usually.
if err != nil {
glog.Infof("stopped: %s: %v", url, err)
return state.Stopped, nil
}
if resp.StatusCode == http.StatusUnauthorized {
Expand Down
155 changes: 95 additions & 60 deletions pkg/minikube/bootstrapper/kubeadm/kubeadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {

}

c = exec.Command("/bin/bash", "-c", fmt.Sprintf("%s init --config %s %s --ignore-preflight-errors=%s", bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), bsutil.KubeadmYamlPath, extraFlags, strings.Join(ignore, ",")))
conf := bsutil.KubeadmYamlPath
c = exec.Command("/bin/bash", "-c", fmt.Sprintf("sudo mv %s.new %s && %s init --config %s %s --ignore-preflight-errors=%s", conf, conf, bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), conf, extraFlags, strings.Join(ignore, ",")))
rr, err := k.c.RunCmd(c)
if err != nil {
return errors.Wrapf(err, "init failed. output: %q", rr.Output())
Expand All @@ -236,6 +237,20 @@ func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {
return nil
}

func (k *Bootstrapper) controlPlaneEndpoint(cfg config.ClusterConfig) (string, int, error) {
cp, err := config.PrimaryControlPlane(&cfg)
if err != nil {
return "", 0, err
}

if driver.IsKIC(cfg.Driver) {
ip := oci.DefaultBindIPV4
port, err := oci.ForwardedPort(cfg.Driver, cfg.Name, cp.Port)
return ip, port, err
}
return cp.IP, cp.Port, nil
}

// client sets and returns a Kubernetes client to use to speak to a kubeadm launched apiserver
func (k *Bootstrapper) client(ip string, port int) (*kubernetes.Clientset, error) {
if k.k8sClient != nil {
Expand All @@ -262,34 +277,28 @@ func (k *Bootstrapper) client(ip string, port int) (*kubernetes.Clientset, error
// WaitForNode blocks until the node appears to be healthy
func (k *Bootstrapper) WaitForNode(cfg config.ClusterConfig, n config.Node, timeout time.Duration) error {
start := time.Now()
out.T(out.Waiting, "Waiting for cluster to come online ...")

if !n.ControlPlane {
glog.Infof("%s is not a control plane, nothing to wait for", n.Name)
return nil
}

cr, err := cruntime.New(cruntime.Config{Type: cfg.KubernetesConfig.ContainerRuntime, Runner: k.c})
if err != nil {
return err
}

if n.ControlPlane {
if err := kverify.WaitForAPIServerProcess(cr, k, k.c, start, timeout); err != nil {
return err
}
if err := kverify.WaitForAPIServerProcess(cr, k, k.c, start, timeout); err != nil {
return err
}

ip := n.IP
port := n.Port
if driver.IsKIC(cfg.Driver) {
ip = oci.DefaultBindIPV4
p, err := oci.ForwardedPort(cfg.Driver, driver.MachineName(cfg, n), port)
if err != nil {
return errors.Wrapf(err, "get host-bind port %d for container %s", port, cfg.Name)
}
port = p
ip, port, err := k.controlPlaneEndpoint(cfg)
if err != nil {
return err
}

if n.ControlPlane {
if err := kverify.WaitForHealthyAPIServer(cr, k, k.c, start, ip, port, timeout); err != nil {
return err
}
if err := kverify.WaitForHealthyAPIServer(cr, k, k.c, start, ip, port, timeout); err != nil {
return err
}

c, err := k.client(ip, port)
Expand All @@ -303,6 +312,31 @@ func (k *Bootstrapper) WaitForNode(cfg config.ClusterConfig, n config.Node, time
return nil
}

// needsReset returns whether or not the cluster needs to be reconfigured
func (k *Bootstrapper) needsReset(conf string, ip string, port int, client *kubernetes.Clientset) bool {
if _, err := k.c.RunCmd(exec.Command("sudo", "diff", "-u", conf, conf+".new")); err != nil {
glog.Infof("needs reset: configs differ")
return true
}

st, err := kverify.APIServerStatus(k.c, net.ParseIP(ip), port)
if err != nil {
glog.Infof("needs reset: apiserver error: %v", err)
return true
}

if st != state.Running {
glog.Infof("needs reset: apiserver in state %s", st)
return true
}

if err := kverify.ExpectedComponentsRunning(client); err != nil {
glog.Infof("needs reset: %v", err)
return true
}
return false
}

// restartCluster restarts the Kubernetes cluster configured by kubeadm
func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
glog.Infof("restartCluster start")
Expand All @@ -328,14 +362,36 @@ func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
glog.Errorf("failed to create compat symlinks: %v", err)
}

ip, port, err := k.controlPlaneEndpoint(cfg)
if err != nil {
return errors.Wrap(err, "control plane")
}

client, err := k.client(ip, port)
if err != nil {
return errors.Wrap(err, "getting k8s client")
}

// If the cluster is running, check if we have any work to do.
conf := bsutil.KubeadmYamlPath
if !k.needsReset(conf, ip, port, client) {
glog.Infof("Taking a shortcut, as the cluster seems to be properly configured")
return nil
}

if _, err := k.c.RunCmd(exec.Command("sudo", "mv", conf+".new", conf)); err != nil {
return errors.Wrap(err, "mv")
}

baseCmd := fmt.Sprintf("%s %s", bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), phase)
cmds := []string{
fmt.Sprintf("%s phase certs all --config %s", baseCmd, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, bsutil.KubeadmYamlPath),
fmt.Sprintf("%s phase certs all --config %s", baseCmd, conf),
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, conf),
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, conf),
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, conf),
}

glog.Infof("resetting cluster from %s", conf)
// Run commands one at a time so that it is easier to root cause failures.
for _, c := range cmds {
rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", c))
Expand All @@ -346,38 +402,19 @@ func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {

cr, err := cruntime.New(cruntime.Config{Type: cfg.KubernetesConfig.ContainerRuntime, Runner: k.c})
if err != nil {
return err
return errors.Wrap(err, "runtime")
}

// We must ensure that the apiserver is healthy before proceeding
if err := kverify.WaitForAPIServerProcess(cr, k, k.c, time.Now(), kconst.DefaultControlPlaneTimeout); err != nil {
return errors.Wrap(err, "apiserver healthz")
}

cp, err := config.PrimaryControlPlane(&cfg)
if err != nil {
return errors.Wrap(err, "getting control plane")
}
ip := cp.IP
port := cp.Port
if driver.IsKIC(cfg.Driver) {
ip = oci.DefaultBindIPV4
port, err = oci.ForwardedPort(cfg.Driver, driver.MachineName(cfg, cp), port)
if err != nil {
return errors.Wrapf(err, "get host-bind port %d for container %s", port, driver.MachineName(cfg, cp))
}
}
client, err := k.client(ip, port)
if err != nil {
return errors.Wrap(err, "getting k8s client")
}

if err := kverify.WaitForSystemPods(cr, k, k.c, client, time.Now(), kconst.DefaultControlPlaneTimeout); err != nil {
return errors.Wrap(err, "system pods")
}

// Explicitly re-enable kubeadm addons (proxy, coredns) so that they will check for IP or configuration changes.
if rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, bsutil.KubeadmYamlPath))); err != nil {
if rr, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s phase addon all --config %s", baseCmd, conf))); err != nil {
return errors.Wrapf(err, fmt.Sprintf("addon phase cmd:%q", rr.Command()))
}

Expand Down Expand Up @@ -495,11 +532,6 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru

glog.Infof("kubelet %s config:\n%+v", kubeletCfg, cfg.KubernetesConfig)

// stop kubelet to avoid "Text File Busy" error
if err := stopKubelet(k.c); err != nil {
glog.Warningf("unable to stop kubelet: %s", err)
}

if err := bsutil.TransferBinaries(cfg.KubernetesConfig, k.c); err != nil {
return errors.Wrap(err, "downloading binaries")
}
Expand All @@ -508,25 +540,19 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru
if cfg.KubernetesConfig.EnableDefaultCNI {
cniFile = []byte(defaultCNIConfig)
}

// Install assets into temporary files
files := bsutil.ConfigFileAssets(cfg.KubernetesConfig, kubeadmCfg, kubeletCfg, kubeletService, cniFile)
if err := copyFiles(k.c, files); err != nil {
return err
}

if err := startKubelet(k.c); err != nil {
if err := reloadKubelet(k.c); err != nil {
return err
}
return nil
}

func stopKubelet(runner command.Runner) error {
stopCmd := exec.Command("/bin/bash", "-c", "pgrep kubelet && sudo systemctl stop kubelet")
if rr, err := runner.RunCmd(stopCmd); err != nil {
return errors.Wrapf(err, "command: %q output: %q", rr.Command(), rr.Output())
}
return nil
}

func copyFiles(runner command.Runner, files []assets.CopyableFile) error {
// Combine mkdir request into a single call to reduce load
dirs := []string{}
Expand All @@ -546,8 +572,17 @@ func copyFiles(runner command.Runner, files []assets.CopyableFile) error {
return nil
}

func startKubelet(runner command.Runner) error {
startCmd := exec.Command("/bin/bash", "-c", "sudo systemctl daemon-reload && sudo systemctl start kubelet")
func reloadKubelet(runner command.Runner) error {
svc := bsutil.KubeletServiceFile
conf := bsutil.KubeletSystemdConfFile

checkCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("pgrep kubelet && diff -u %s %s.new && diff -u %s %s.new", svc, svc, conf, conf))
if _, err := runner.RunCmd(checkCmd); err == nil {
glog.Infof("kubelet is already running with the right configs")
return nil
}

startCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("sudo mv %s.new %s && sudo mv %s.new %s && sudo systemctl daemon-reload && sudo systemctl restart kubelet", svc, svc, conf, conf))
if _, err := runner.RunCmd(startCmd); err != nil {
return errors.Wrap(err, "starting kubelet")
}
Expand Down