Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart kube-proxy using kubeadm & add bootstrapper.WaitCluster #4276

Merged
merged 5 commits into from
May 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 3 additions & 32 deletions cmd/minikube/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/blang/semver"
"github.com/docker/machine/libmachine"
"github.com/docker/machine/libmachine/host"
"github.com/docker/machine/libmachine/state"
"github.com/golang/glog"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
Expand Down Expand Up @@ -244,9 +243,6 @@ func runStart(cmd *cobra.Command, args []string) {
// The kube config must be update must come before bootstrapping, otherwise health checks may use a stale IP
kubeconfig := updateKubeConfig(host, &config)
bootstrapCluster(bs, cr, runner, config.KubernetesConfig, preexisting, isUpgrade)

apiserverPort := config.KubernetesConfig.NodePort
validateCluster(bs, cr, runner, ip, apiserverPort)
configureMounts()
if err = LoadCachedImagesInConfigFile(); err != nil {
console.Failure("Unable to load cached images from config file.")
Expand All @@ -257,6 +253,9 @@ func runStart(cmd *cobra.Command, args []string) {
prepareNone()
}

if err := bs.WaitCluster(config.KubernetesConfig); err != nil {
exit.WithError("Wait failed", err)
}
showKubectlConnectInfo(kubeconfig)

}
Expand Down Expand Up @@ -668,34 +667,6 @@ func bootstrapCluster(bs bootstrapper.Bootstrapper, r cruntime.Manager, runner b
}
}

// validateCluster validates that the cluster is well-configured and healthy
func validateCluster(bs bootstrapper.Bootstrapper, r cruntime.Manager, runner bootstrapper.CommandRunner, ip string, apiserverPort int) {
k8sStat := func() (err error) {
st, err := bs.GetKubeletStatus()
if err != nil || st != state.Running.String() {
return &pkgutil.RetriableError{Err: fmt.Errorf("kubelet unhealthy: %v: %s", err, st)}
}
return nil
}
err := pkgutil.RetryAfter(20, k8sStat, 3*time.Second)
if err != nil {
exit.WithLogEntries("kubelet checks failed", err, logs.FindProblems(r, bs, runner))
}
aStat := func() (err error) {
st, err := bs.GetAPIServerStatus(net.ParseIP(ip), apiserverPort)
if err != nil || st != state.Running.String() {
return &pkgutil.RetriableError{Err: fmt.Errorf("apiserver status=%s err=%v", st, err)}
}
return nil
}

err = pkgutil.RetryAfter(30, aStat, 10*time.Second)
if err != nil {
exit.WithLogEntries("apiserver checks failed", err, logs.FindProblems(r, bs, runner))
}
console.OutLn("")
}

// configureMounts configures any requested filesystem mounts
func configureMounts() {
if !viper.GetBool(createMount) {
Expand Down
1 change: 1 addition & 0 deletions pkg/minikube/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Bootstrapper interface {
UpdateCluster(config.KubernetesConfig) error
RestartCluster(config.KubernetesConfig) error
DeleteCluster(config.KubernetesConfig) error
WaitCluster(config.KubernetesConfig) error
// LogCommands returns a map of log type to a command which will display that log.
LogCommands(LogOptions) map[string]string
SetupCerts(cfg config.KubernetesConfig) error
Expand Down
79 changes: 40 additions & 39 deletions pkg/minikube/bootstrapper/kubeadm/kubeadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/bootstrapper"
"k8s.io/minikube/pkg/minikube/config"
Expand Down Expand Up @@ -70,7 +71,6 @@ type pod struct {

// PodsByLayer are queries we run when health checking, sorted roughly by dependency layer
var PodsByLayer = []pod{
{"apiserver", "component", "kube-apiserver"},
{"proxy", "k8s-app", "kube-proxy"},
{"etcd", "component", "etcd"},
{"scheduler", "component", "kube-scheduler"},
Expand Down Expand Up @@ -214,20 +214,10 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
}
}

if err := waitForPods(k8s, false); err != nil {
return errors.Wrap(err, "wait")
}

glog.Infof("Configuring cluster permissions ...")
if err := util.RetryAfter(100, elevateKubeSystemPrivileges, time.Millisecond*500); err != nil {
return errors.Wrap(err, "timed out waiting to elevate kube-system RBAC privileges")
}

// Make sure elevating privileges didn't screw anything up
if err := waitForPods(k8s, true); err != nil {
return errors.Wrap(err, "wait")
}

return nil
}

Expand Down Expand Up @@ -260,37 +250,37 @@ func addAddons(files *[]assets.CopyableFile, data interface{}) error {
return nil
}

// waitForPods waits until the important Kubernetes pods are in running state
func waitForPods(k8s config.KubernetesConfig, quiet bool) error {
// WaitCluster blocks until Kubernetes appears to be healthy.
func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig) error {
// Do not wait for "k8s-app" pods in the case of CNI, as they are managed
// by a CNI plugin which is usually started after minikube has been brought
// up. Otherwise, minikube won't start, as "k8s-app" pods are not ready.
componentsOnly := k8s.NetworkPlugin == "cni"

if !quiet {
console.OutStyle("waiting-pods", "Waiting for:")
}
console.OutStyle("waiting-pods", "Verifying:")
client, err := util.GetClient()
if err != nil {
return errors.Wrap(err, "k8s client")
}

// Wait until the apiserver can answer queries properly. We don't care if the apiserver
// pod shows up as registered, but need the webserver for all subsequent queries.
console.Out(" apiserver")
tstromberg marked this conversation as resolved.
Show resolved Hide resolved
if err := k.waitForAPIServer(k8s); err != nil {
return errors.Wrap(err, "waiting for apiserver")
}

for _, p := range PodsByLayer {
if componentsOnly && p.key != "component" {
continue
}

if !quiet {
console.Out(" %s", p.name)
}
console.Out(" %s", p.name)
selector := labels.SelectorFromSet(labels.Set(map[string]string{p.key: p.value}))
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, fmt.Sprintf("waiting for %s=%s", p.key, p.value))
}
}
if !quiet {
console.OutLn("")
}
console.OutLn("")
return nil
}

Expand All @@ -308,11 +298,13 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
controlPlane = "control-plane"
}

configPath := constants.KubeadmConfigFile
baseCmd := fmt.Sprintf("sudo kubeadm %s", phase)
cmds := []string{
fmt.Sprintf("sudo kubeadm %s phase certs all --config %s", phase, constants.KubeadmConfigFile),
fmt.Sprintf("sudo kubeadm %s phase kubeconfig all --config %s", phase, constants.KubeadmConfigFile),
fmt.Sprintf("sudo kubeadm %s phase %s all --config %s", phase, controlPlane, constants.KubeadmConfigFile),
fmt.Sprintf("sudo kubeadm %s phase etcd local --config %s", phase, constants.KubeadmConfigFile),
fmt.Sprintf("%s phase certs all --config %s", baseCmd, configPath),
fmt.Sprintf("%s phase kubeconfig all --config %s", baseCmd, configPath),
fmt.Sprintf("%s phase %s all --config %s", baseCmd, controlPlane, configPath),
fmt.Sprintf("%s phase etcd local --config %s", baseCmd, configPath),
}

// Run commands one at a time so that it is easier to root cause failures.
Expand All @@ -322,23 +314,32 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
}
}

if err := waitForPods(k8s, false); err != nil {
return errors.Wrap(err, "wait")
if err := k.waitForAPIServer(k8s); err != nil {
return errors.Wrap(err, "waiting for apiserver")
}

console.OutStyle("reconfiguring", "Updating kube-proxy configuration ...")
if err = util.RetryAfter(5, func() error { return updateKubeProxyConfigMap(k8s) }, 5*time.Second); err != nil {
return errors.Wrap(err, "restarting kube-proxy")
// restart the proxy and coredns
if err := k.c.Run(fmt.Sprintf("%s phase addon all --config %s", baseCmd, configPath)); err != nil {
return errors.Wrapf(err, "addon phase")
}

// Make sure the kube-proxy restart didn't screw anything up.
if err := waitForPods(k8s, true); err != nil {
return errors.Wrap(err, "wait")
}

return nil
}

// waitForAPIServer waits for the apiserver to start up
func (k *Bootstrapper) waitForAPIServer(k8s config.KubernetesConfig) error {
glog.Infof("Waiting for apiserver ...")
return wait.PollImmediate(time.Millisecond*200, time.Minute*1, func() (bool, error) {
status, err := k.GetAPIServerStatus(net.ParseIP(k8s.NodeIP), k8s.NodePort)
glog.Infof("status: %s, err: %v", status, err)
if err != nil {
return false, err
}
if status != "Running" {
return false, nil
}
return true, nil
})
}

// DeleteCluster removes the components that were started earlier
func (k *Bootstrapper) DeleteCluster(k8s config.KubernetesConfig) error {
cmd := fmt.Sprintf("sudo kubeadm reset --force")
Expand Down
100 changes: 0 additions & 100 deletions pkg/minikube/bootstrapper/kubeadm/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@ limitations under the License.
package kubeadm

import (
"bytes"
"encoding/json"
"html/template"
"net"
"strings"

"github.com/golang/glog"
"github.com/pkg/errors"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1beta1"
apierr "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/service"
"k8s.io/minikube/pkg/util"
Expand Down Expand Up @@ -130,98 +125,3 @@ func elevateKubeSystemPrivileges() error {
}
return nil
}

const (
kubeconfigConf = "kubeconfig.conf"
kubeProxyConfigmapTmpl = `apiVersion: v1
kind: Config
clusters:
- cluster:
certificate-authority: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
server: https://{{.AdvertiseAddress}}:{{.APIServerPort}}
name: default
contexts:
- context:
cluster: default
namespace: default
user: default
name: default
current-context: default
users:
- name: default
user:
tokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
`
)

// updateKubeProxyConfigMap updates the IP & port kube-proxy listens on, and restarts it.
func updateKubeProxyConfigMap(k8s config.KubernetesConfig) error {
client, err := util.GetClient()
if err != nil {
return errors.Wrap(err, "getting k8s client")
}

selector := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "kube-proxy"}))
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, "kube-proxy not running")
}

cfgMap, err := client.CoreV1().ConfigMaps("kube-system").Get("kube-proxy", meta.GetOptions{})
if err != nil {
return &util.RetriableError{Err: errors.Wrap(err, "getting kube-proxy configmap")}
}
glog.Infof("kube-proxy config: %v", cfgMap.Data[kubeconfigConf])
t := template.Must(template.New("kubeProxyTmpl").Parse(kubeProxyConfigmapTmpl))
opts := struct {
AdvertiseAddress string
APIServerPort int
}{
AdvertiseAddress: k8s.NodeIP,
APIServerPort: k8s.NodePort,
}

kubeconfig := bytes.Buffer{}
if err := t.Execute(&kubeconfig, opts); err != nil {
return errors.Wrap(err, "executing kube proxy configmap template")
}

if cfgMap.Data == nil {
cfgMap.Data = map[string]string{}
}

updated := strings.TrimSuffix(kubeconfig.String(), "\n")
glog.Infof("updated kube-proxy config: %s", updated)

// An optimization, but also one that's unlikely, as kubeadm writes the address as 'localhost'
if cfgMap.Data[kubeconfigConf] == updated {
glog.Infof("kube-proxy config appears to require no change, not restarting kube-proxy")
return nil
}
cfgMap.Data[kubeconfigConf] = updated

// Make this step retriable, as it can fail with:
// "Operation cannot be fulfilled on configmaps "kube-proxy": the object has been modified; please apply your changes to the latest version and try again"
if _, err := client.CoreV1().ConfigMaps("kube-system").Update(cfgMap); err != nil {
return &util.RetriableError{Err: errors.Wrap(err, "updating configmap")}
}

pods, err := client.CoreV1().Pods("kube-system").List(meta.ListOptions{
LabelSelector: "k8s-app=kube-proxy",
})
if err != nil {
return errors.Wrap(err, "listing kube-proxy pods")
}
for _, pod := range pods.Items {
// Retriable, as known to fail with: pods "<name>" not found
if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &meta.DeleteOptions{}); err != nil {
return &util.RetriableError{Err: errors.Wrapf(err, "deleting pod %+v", pod)}
}
}

// Wait for the scheduler to restart kube-proxy
if err := util.WaitForPodsWithLabelRunning(client, "kube-system", selector); err != nil {
return errors.Wrap(err, "kube-proxy not running")
}

return nil
}