Skip to content

Commit

Permalink
Merge pull request #7973 from sharifelgamal/restart
Browse files Browse the repository at this point in the history
Make sure multinode clusters can survive restarts
  • Loading branch information
sharifelgamal committed May 28, 2020
2 parents a2c8823 + 94ce379 commit 4f06139
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 41 deletions.
45 changes: 31 additions & 14 deletions cmd/minikube/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,27 +286,44 @@ func startWithDriver(starter node.Starter, existing *config.ClusterConfig) (*kub
}

numNodes := viper.GetInt(nodes)
if numNodes == 1 && existing != nil {
if existing != nil {
if numNodes > 1 {
// We ignore the --nodes parameter if we're restarting an existing cluster
out.WarningT(`The cluster {{.cluster}} already exists which means the --nodes parameter will be ignored. Use "minikube node add" to add nodes to an existing cluster.`, out.V{"cluster": existing.Name})
}
numNodes = len(existing.Nodes)
}
if numNodes > 1 {
if driver.BareMetal(starter.Cfg.Driver) {
exit.WithCodeT(exit.Config, "The none driver is not compatible with multi-node clusters.")
} else {
out.Ln("")
warnAboutMultiNode()
for i := 1; i < numNodes; i++ {
nodeName := node.Name(i + 1)
n := config.Node{
Name: nodeName,
Worker: true,
ControlPlane: false,
KubernetesVersion: starter.Cfg.KubernetesConfig.KubernetesVersion,
// Only warn users on first start.
if existing == nil {
out.Ln("")
warnAboutMultiNode()

for i := 1; i < numNodes; i++ {
nodeName := node.Name(i + 1)
n := config.Node{
Name: nodeName,
Worker: true,
ControlPlane: false,
KubernetesVersion: starter.Cfg.KubernetesConfig.KubernetesVersion,
}
out.Ln("") // extra newline for clarity on the command line
err := node.Add(starter.Cfg, n)
if err != nil {
return nil, errors.Wrap(err, "adding node")
}
}
out.Ln("") // extra newline for clarity on the command line
err := node.Add(starter.Cfg, n)
if err != nil {
return nil, errors.Wrap(err, "adding node")
} else {
for _, n := range existing.Nodes {
if !n.ControlPlane {
err := node.Add(starter.Cfg, n)
if err != nil {
return nil, errors.Wrap(err, "adding node")
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/minikube/cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func status(api libmachine.API, cc config.ClusterConfig, n config.Node) (*Status
glog.Infof("%s kubelet status = %s", name, stk)
st.Kubelet = stk.String()

// Early exit for regular nodes
// Early exit for worker nodes
if !controlPlane {
return st, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/minikube/bootstrapper/bsutil/kverify/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func WaitForHealthyAPIServer(r cruntime.Manager, bs bootstrapper.Bootstrapper, c
}

if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, healthz); err != nil {
return fmt.Errorf("apiserver healthz never reported healthy")
return fmt.Errorf("apiserver healthz never reported healthy: %v", err)
}

vcheck := func() (bool, error) {
Expand Down
42 changes: 32 additions & 10 deletions pkg/minikube/bootstrapper/kubeadm/kubeadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (k *Bootstrapper) StartCluster(cfg config.ClusterConfig) error {

if err := bsutil.ExistingConfig(k.c); err == nil {
glog.Infof("found existing configuration files, will attempt cluster restart")
rerr := k.restartCluster(cfg)
rerr := k.restartControlPlane(cfg)
if rerr == nil {
return nil
}
Expand Down Expand Up @@ -484,7 +484,7 @@ func (k *Bootstrapper) needsReconfigure(conf string, hostname string, port int,
}

// restartCluster restarts the Kubernetes cluster configured by kubeadm
func (k *Bootstrapper) restartCluster(cfg config.ClusterConfig) error {
func (k *Bootstrapper) restartControlPlane(cfg config.ClusterConfig) error {
glog.Infof("restartCluster start")

start := time.Now()
Expand Down Expand Up @@ -605,10 +605,24 @@ func (k *Bootstrapper) JoinCluster(cc config.ClusterConfig, n config.Node, joinC
}()

// Join the master by specifying its token
joinCmd = fmt.Sprintf("%s --v=10 --node-name=%s", joinCmd, driver.MachineName(cc, n))
out, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", joinCmd))
if err != nil {
return errors.Wrapf(err, "cmd failed: %s\n%+v\n", joinCmd, out)
joinCmd = fmt.Sprintf("%s --node-name=%s", joinCmd, driver.MachineName(cc, n))

join := func() error {
// reset first to clear any possibly existing state
_, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", fmt.Sprintf("%s reset -f", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion))))
if err != nil {
glog.Infof("kubeadm reset failed, continuing anyway: %v", err)
}

out, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", joinCmd))
if err != nil {
return errors.Wrapf(err, "cmd failed: %s\n%+v\n", joinCmd, out.Output())
}
return nil
}

if err := retry.Expo(join, 10*time.Second, 1*time.Minute); err != nil {
return errors.Wrap(err, "joining cp")
}

if _, err := k.c.RunCmd(exec.Command("/bin/bash", "-c", "sudo systemctl daemon-reload && sudo systemctl enable kubelet && sudo systemctl start kubelet")); err != nil {
Expand All @@ -618,17 +632,21 @@ func (k *Bootstrapper) JoinCluster(cc config.ClusterConfig, n config.Node, joinC
return nil
}

// GenerateToken creates a token and returns the appropriate kubeadm join command to run
// GenerateToken creates a token and returns the appropriate kubeadm join command to run, or the already existing token
func (k *Bootstrapper) GenerateToken(cc config.ClusterConfig) (string, error) {
// Take that generated token and use it to get a kubeadm join command
tokenCmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("%s token create --print-join-command --ttl=0", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion)))
r, err := k.c.RunCmd(tokenCmd)
if err != nil {
return "", errors.Wrap(err, "generating bootstrap token")
return "", errors.Wrap(err, "generating join command")
}

joinCmd := r.Stdout.String()
joinCmd = strings.Replace(joinCmd, "kubeadm", bsutil.InvokeKubeadm(cc.KubernetesConfig.KubernetesVersion), 1)
joinCmd = fmt.Sprintf("%s --ignore-preflight-errors=all", strings.TrimSpace(joinCmd))
if cc.KubernetesConfig.CRISocket != "" {
joinCmd = fmt.Sprintf("%s --cri-socket %s", joinCmd, cc.KubernetesConfig.CRISocket)
}

return joinCmd, nil
}
Expand Down Expand Up @@ -743,14 +761,18 @@ func (k *Bootstrapper) UpdateNode(cfg config.ClusterConfig, n config.Node, r cru
}

files := []assets.CopyableFile{
assets.NewMemoryAssetTarget(kubeadmCfg, bsutil.KubeadmYamlPath+".new", "0640"),
assets.NewMemoryAssetTarget(kubeletCfg, bsutil.KubeletSystemdConfFile, "0644"),
assets.NewMemoryAssetTarget(kubeletService, bsutil.KubeletServiceFile, "0644"),
}

if n.ControlPlane {
files = append(files, assets.NewMemoryAssetTarget(kubeadmCfg, bsutil.KubeadmYamlPath+".new", "0640"))
}

// Copy the default CNI config (k8s.conf), so that kubelet can successfully
// start a Pod in the case a user hasn't manually installed any CNI plugin
// and minikube was started with "--extra-config=kubelet.network-plugin=cni".
if cfg.KubernetesConfig.EnableDefaultCNI {
if cfg.KubernetesConfig.EnableDefaultCNI && !config.MultiNode(cfg) {
files = append(files, assets.NewMemoryAssetTarget([]byte(defaultCNIConfig), bsutil.DefaultCNIConfigPath, "0644"))
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/minikube/cruntime/cruntime.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ func New(c Config) (Manager, error) {

switch c.Type {
case "", "docker":
return &Docker{Socket: c.Socket, Runner: c.Runner, Init: sm}, nil
return &Docker{
Socket: c.Socket,
Runner: c.Runner,
Init: sm,
}, nil
case "crio", "cri-o":
return &CRIO{
Socket: c.Socket,
Expand Down
10 changes: 5 additions & 5 deletions pkg/minikube/machine/cache_images.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,16 @@ func CacheAndLoadImages(images []string) error {

status, err := Status(api, m)
if err != nil {
glog.Warningf("error getting status for %s: %v", pName, err)
failed = append(failed, pName)
glog.Warningf("error getting status for %s: %v", m, err)
failed = append(failed, m)
continue
}

if status == state.Running.String() { // the not running hosts will load on next start
h, err := api.Load(m)
if err != nil {
glog.Warningf("Failed to load machine %q: %v", m, err)
failed = append(failed, pName)
failed = append(failed, m)
continue
}
cr, err := CommandRunner(h)
Expand All @@ -214,10 +214,10 @@ func CacheAndLoadImages(images []string) error {
}
err = LoadImages(c, cr, images, constants.ImageCacheDir)
if err != nil {
failed = append(failed, pName)
failed = append(failed, m)
glog.Warningf("Failed to load cached images for profile %s. make sure the profile is running. %v", pName, err)
}
succeeded = append(succeeded, pName)
succeeded = append(succeeded, m)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/minikube/machine/fix.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func recreateIfNeeded(api libmachine.API, cc *config.ClusterConfig, n *config.No
}

if !me || err == constants.ErrMachineMissing {
out.T(out.Shrug, `{{.driver_name}} "{{.cluster}}" {{.machine_type}} is missing, will recreate.`, out.V{"driver_name": cc.Driver, "cluster": cc.Name, "machine_type": machineType})
out.T(out.Shrug, `{{.driver_name}} "{{.cluster}}" {{.machine_type}} is missing, will recreate.`, out.V{"driver_name": cc.Driver, "cluster": machineName, "machine_type": machineType})
demolish(api, *cc, *n, h)

glog.Infof("Sleeping 1 second for extra luck!")
Expand All @@ -133,20 +133,21 @@ func recreateIfNeeded(api libmachine.API, cc *config.ClusterConfig, n *config.No

if s == state.Running {
if !recreated {
out.T(out.Running, `Updating the running {{.driver_name}} "{{.cluster}}" {{.machine_type}} ...`, out.V{"driver_name": cc.Driver, "cluster": cc.Name, "machine_type": machineType})
out.T(out.Running, `Updating the running {{.driver_name}} "{{.cluster}}" {{.machine_type}} ...`, out.V{"driver_name": cc.Driver, "cluster": machineName, "machine_type": machineType})
}
return h, nil
}

if !recreated {
out.T(out.Restarting, `Restarting existing {{.driver_name}} {{.machine_type}} for "{{.cluster}}" ...`, out.V{"driver_name": cc.Driver, "cluster": cc.Name, "machine_type": machineType})
out.T(out.Restarting, `Restarting existing {{.driver_name}} {{.machine_type}} for "{{.cluster}}" ...`, out.V{"driver_name": cc.Driver, "cluster": machineName, "machine_type": machineType})
}
if err := h.Driver.Start(); err != nil {
return h, errors.Wrap(err, "driver start")
}
if err := saveHost(api, h, cc, n); err != nil {
return h, err
}

return h, nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/minikube/machine/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ func createHost(api libmachine.API, cfg *config.ClusterConfig, n *config.Node) (
if err := saveHost(api, h, cfg, n); err != nil {
return h, err
}

return h, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/minikube/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"net"
"os"
"os/exec"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -128,6 +127,7 @@ func Start(starter Starter, apiServer bool) (*kubeconfig.Settings, error) {
if err = bs.SetupCerts(starter.Cfg.KubernetesConfig, *starter.Node); err != nil {
return nil, errors.Wrap(err, "setting up certs")
}

}

var wg sync.WaitGroup
Expand Down Expand Up @@ -156,6 +156,7 @@ func Start(starter Starter, apiServer bool) (*kubeconfig.Settings, error) {
if err := bs.WaitForNode(*starter.Cfg, *starter.Node, viper.GetDuration(waitTimeout)); err != nil {
return nil, errors.Wrap(err, "Wait failed")
}

} else {
if err := bs.UpdateNode(*starter.Cfg, *starter.Node, cr); err != nil {
return nil, errors.Wrap(err, "Updating node")
Expand Down Expand Up @@ -251,7 +252,6 @@ func configureRuntimes(runner cruntime.CommandRunner, cc config.ClusterConfig, k

err = cr.Enable(disableOthers, forceSystemd())
if err != nil {
debug.PrintStack()
exit.WithError("Failed to enable container runtime", err)
}

Expand Down
83 changes: 80 additions & 3 deletions test/integration/multinode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func TestMultiNode(t *testing.T) {
{"StopNode", validateStopRunningNode},
{"StartAfterStop", validateStartNodeAfterStop},
{"DeleteNode", validateDeleteNodeFromMultiNode},
{"StopMultiNode", validateStopMultiNodeCluster},
{"RestartMultiNode", validateRestartMultiNodeCluster},
}
for _, tc := range tests {
tc := tc
Expand Down Expand Up @@ -138,12 +140,20 @@ func validateStopRunningNode(ctx context.Context, t *testing.T, profile string)
}

func validateStartNodeAfterStop(ctx context.Context, t *testing.T, profile string) {
// TODO (#7496): remove skip once restarts work
t.Skip("Restarting nodes is broken :(")
if DockerDriver() {
rr, err := Run(t, exec.Command("docker", "version", "-f", "{{.Server.Version}}"))
if err != nil {
t.Fatalf("docker is broken: %v", err)
}
if strings.Contains(rr.Stdout.String(), "azure") {
t.Skip("kic containers are not supported on docker's azure")
}
}

// Start the node back up
rr, err := Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "node", "start", ThirdNodeName))
rr, err := Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "node", "start", ThirdNodeName, "--alsologtostderr"))
if err != nil {
t.Logf(rr.Stderr.String())
t.Errorf("node start returned an error. args %q: %v", rr.Command(), err)
}

Expand All @@ -160,6 +170,73 @@ func validateStartNodeAfterStop(ctx context.Context, t *testing.T, profile strin
if strings.Count(rr.Stdout.String(), "kubelet: Running") != 3 {
t.Errorf("status says both kubelets are not running: args %q: %v", rr.Command(), rr.Stdout.String())
}

// Make sure kubectl can connect correctly
rr, err = Run(t, exec.CommandContext(ctx, "kubectl", "get", "nodes"))
if err != nil {
t.Fatalf("failed to kubectl get nodes. args %q : %v", rr.Command(), err)
}
}

func validateStopMultiNodeCluster(ctx context.Context, t *testing.T, profile string) {
// Run minikube node stop on that node
rr, err := Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "stop"))
if err != nil {
t.Errorf("node stop returned an error. args %q: %v", rr.Command(), err)
}

// Run status to see the stopped hosts
rr, err = Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "status"))
// Exit code 7 means one host is stopped, which we are expecting
if err != nil && rr.ExitCode != 7 {
t.Fatalf("failed to run minikube status. args %q : %v", rr.Command(), err)
}

// Make sure minikube status shows 2 stopped nodes
rr, err = Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "status", "--alsologtostderr"))
if err != nil && rr.ExitCode != 7 {
t.Fatalf("failed to run minikube status. args %q : %v", rr.Command(), err)
}

if strings.Count(rr.Stdout.String(), "host: Stopped") != 2 {
t.Errorf("incorrect number of stopped hosts: args %q: %v", rr.Command(), rr.Stdout.String())
}

if strings.Count(rr.Stdout.String(), "kubelet: Stopped") != 2 {
t.Errorf("incorrect number of stopped kubelets: args %q: %v", rr.Command(), rr.Stdout.String())
}
}

func validateRestartMultiNodeCluster(ctx context.Context, t *testing.T, profile string) {
if DockerDriver() {
rr, err := Run(t, exec.Command("docker", "version", "-f", "{{.Server.Version}}"))
if err != nil {
t.Fatalf("docker is broken: %v", err)
}
if strings.Contains(rr.Stdout.String(), "azure") {
t.Skip("kic containers are not supported on docker's azure")
}
}
// Restart a full cluster with minikube start
startArgs := append([]string{"start", "-p", profile}, StartArgs()...)
rr, err := Run(t, exec.CommandContext(ctx, Target(), startArgs...))
if err != nil {
t.Fatalf("failed to start cluster. args %q : %v", rr.Command(), err)
}

// Make sure minikube status shows 2 running nodes
rr, err = Run(t, exec.CommandContext(ctx, Target(), "-p", profile, "status", "--alsologtostderr"))
if err != nil {
t.Fatalf("failed to run minikube status. args %q : %v", rr.Command(), err)
}

if strings.Count(rr.Stdout.String(), "host: Running") != 2 {
t.Errorf("status says both hosts are not running: args %q: %v", rr.Command(), rr.Stdout.String())
}

if strings.Count(rr.Stdout.String(), "kubelet: Running") != 2 {
t.Errorf("status says both kubelets are not running: args %q: %v", rr.Command(), rr.Stdout.String())
}
}

func validateDeleteNodeFromMultiNode(ctx context.Context, t *testing.T, profile string) {
Expand Down

0 comments on commit 4f06139

Please sign in to comment.