diff --git a/pkg/minikube/bootstrapper/kubeadm/kubeadm.go b/pkg/minikube/bootstrapper/kubeadm/kubeadm.go index a85c04402431..5974b1787103 100644 --- a/pkg/minikube/bootstrapper/kubeadm/kubeadm.go +++ b/pkg/minikube/bootstrapper/kubeadm/kubeadm.go @@ -17,8 +17,10 @@ limitations under the License. package kubeadm import ( + "bufio" "context" "fmt" + "io" "net" "os/exec" "path" @@ -226,9 +228,17 @@ func (k *Bootstrapper) init(cfg config.ClusterConfig) error { conf := bsutil.KubeadmYamlPath ctx, cancel := context.WithTimeout(context.Background(), initTimeoutMinutes*time.Minute) defer cancel() + admInitLogReader, admInitLogWriter := io.Pipe() c := exec.CommandContext(ctx, "/bin/bash", "-c", fmt.Sprintf("%s init --config %s %s --ignore-preflight-errors=%s", bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), conf, extraFlags, strings.Join(ignore, ","))) - if _, err := k.c.RunCmd(c); err != nil { + c.Stdout = admInitLogWriter + c.Stderr = admInitLogWriter + sc, err := k.c.StartCmd(c) + if err != nil { + return errors.Wrap(err, "start") + } + go outputAdmInitSteps(admInitLogReader) + if _, err := k.c.WaitCmd(sc); err != nil { if ctx.Err() == context.DeadlineExceeded { return ErrInitTimedout } @@ -236,9 +246,8 @@ func (k *Bootstrapper) init(cfg config.ClusterConfig) error { if strings.Contains(err.Error(), "'kubeadm': Permission denied") { return ErrNoExecLinux } - return errors.Wrap(err, "run") + return errors.Wrap(err, "wait") } - if err := k.applyCNI(cfg); err != nil { return errors.Wrap(err, "apply cni") } @@ -272,6 +281,37 @@ func (k *Bootstrapper) init(cfg config.ClusterConfig) error { return nil } +// ouputAdmInitSteps streams the pipe and outputs the current step +func outputAdmInitSteps(logs io.Reader) { + type step struct { + logTag string + registerStep register.RegStep + stepMessage string + } + + steps := []step{ + {logTag: "certs", registerStep: register.PreparingKubernetesCerts, stepMessage: "Generating certificates and keys ..."}, + {logTag: "control-plane", registerStep: register.PreparingKubernetesControlPlane, stepMessage: "Booting up control plane ..."}, + {logTag: "bootstrap-token", registerStep: register.PreparingKubernetesBootstrapToken, stepMessage: "Configuring RBAC rules ..."}, + } + nextStepIndex := 0 + + scanner := bufio.NewScanner(logs) + for scanner.Scan() { + if nextStepIndex >= len(steps) { + scanner.Text() + continue + } + nextStep := steps[nextStepIndex] + if !strings.Contains(scanner.Text(), fmt.Sprintf("[%s]", nextStep.logTag)) { + continue + } + register.Reg.SetStep(nextStep.registerStep) + out.Step(style.Option, nextStep.stepMessage) + nextStepIndex++ + } +} + // applyCNI applies CNI to a cluster. Needs to be done every time a VM is powered up. func (k *Bootstrapper) applyCNI(cfg config.ClusterConfig) error { cnm, err := cni.New(cfg) diff --git a/pkg/minikube/command/command_runner.go b/pkg/minikube/command/command_runner.go index 7de15d0d5fae..88c66c2680eb 100644 --- a/pkg/minikube/command/command_runner.go +++ b/pkg/minikube/command/command_runner.go @@ -51,12 +51,26 @@ type RunResult struct { Args []string // the args that was passed to Runner } +// StartedCmd holds the contents of a started command +type StartedCmd struct { + cmd *exec.Cmd + rr *RunResult +} + // Runner represents an interface to run commands. type Runner interface { // RunCmd runs a cmd of exec.Cmd type. allowing user to set cmd.Stdin, cmd.Stdout,... // not all implementors are guaranteed to handle all the properties of cmd. RunCmd(cmd *exec.Cmd) (*RunResult, error) + // StartCmd starts a cmd of exec.Cmd type. + // This func in non-blocking, use WaitCmd to block until complete. + // Not all implementors are guaranteed to handle all the properties of cmd. + StartCmd(cmd *exec.Cmd) (*StartedCmd, error) + + // WaitCmd will prevent further execution until the started command has completed. + WaitCmd(startedCmd *StartedCmd) (*RunResult, error) + // Copy is a convenience method that runs a command to copy a file Copy(assets.CopyableFile) error diff --git a/pkg/minikube/command/exec_runner.go b/pkg/minikube/command/exec_runner.go index 6e5caf5708a2..58a1e26a7062 100644 --- a/pkg/minikube/command/exec_runner.go +++ b/pkg/minikube/command/exec_runner.go @@ -86,6 +86,53 @@ func (*execRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) { return rr, fmt.Errorf("%s: %v\nstdout:\n%s\nstderr:\n%s", rr.Command(), err, rr.Stdout.String(), rr.Stderr.String()) } +// StartCmd implements the Command Runner interface to start a exec.Cmd object +func (*execRunner) StartCmd(cmd *exec.Cmd) (*StartedCmd, error) { + rr := &RunResult{Args: cmd.Args} + sc := &StartedCmd{cmd: cmd, rr: rr} + klog.Infof("Start: %v", rr.Command()) + + var outb, errb io.Writer + if cmd.Stdout == nil { + var so bytes.Buffer + outb = io.MultiWriter(&so, &rr.Stdout) + } else { + outb = io.MultiWriter(cmd.Stdout, &rr.Stdout) + } + + if cmd.Stderr == nil { + var se bytes.Buffer + errb = io.MultiWriter(&se, &rr.Stderr) + } else { + errb = io.MultiWriter(cmd.Stderr, &rr.Stderr) + } + + cmd.Stdout = outb + cmd.Stderr = errb + + if err := cmd.Start(); err != nil { + return sc, errors.Wrap(err, "start") + } + + return sc, nil +} + +// WaitCmd implements the Command Runner interface to wait until a started exec.Cmd object finishes +func (*execRunner) WaitCmd(sc *StartedCmd) (*RunResult, error) { + rr := sc.rr + + err := sc.cmd.Wait() + if exitError, ok := err.(*exec.ExitError); ok { + rr.ExitCode = exitError.ExitCode() + } + + if err == nil { + return rr, nil + } + + return rr, fmt.Errorf("%s: %v\nstdout:\n%s\nstderr:\n%s", rr.Command(), err, rr.Stdout.String(), rr.Stderr.String()) +} + // Copy copies a file and its permissions func (e *execRunner) Copy(f assets.CopyableFile) error { dst := path.Join(f.GetTargetDir(), f.GetTargetName()) diff --git a/pkg/minikube/command/fake_runner.go b/pkg/minikube/command/fake_runner.go index e1b8f8615004..b663ff7a38fe 100644 --- a/pkg/minikube/command/fake_runner.go +++ b/pkg/minikube/command/fake_runner.go @@ -90,6 +90,47 @@ func (f *FakeCommandRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) { return rr, nil } +// StartCmd implements the Command Runner interface to start a exec.Cmd object +func (f *FakeCommandRunner) StartCmd(cmd *exec.Cmd) (*StartedCmd, error) { + rr := &RunResult{Args: cmd.Args} + sc := &StartedCmd{cmd: cmd, rr: rr} + klog.Infof("(FakeCommandRunner) Start: %v", rr.Command()) + + key := rr.Command() + out, ok := f.cmdMap.Load(key) + if !ok { + cmds := f.commands() + if len(cmds) == 0 { + return sc, fmt.Errorf("asked to execute %s, but FakeCommandRunner has no commands stored", rr.Command()) + } + + var txt strings.Builder + for _, c := range f.commands() { + txt.WriteString(fmt.Sprintf(" `%s`\n", c)) + } + return sc, fmt.Errorf("unregistered command:\n `%s`\nexpected one of:\n%s", key, txt.String()) + } + + var buf bytes.Buffer + outStr := "" + if out != nil { + outStr = out.(string) + } + _, err := buf.WriteString(outStr) + if err != nil { + return sc, errors.Wrap(err, "Writing outStr to FakeCommandRunner's buffer") + } + rr.Stdout = buf + rr.Stderr = buf + + return sc, nil +} + +// WaitCmd implements the Command Runner interface to wait until a started exec.Cmd object finishes +func (f *FakeCommandRunner) WaitCmd(sc *StartedCmd) (*RunResult, error) { + return sc.rr, nil +} + // Copy adds the filename, file contents key value pair to the stored map. func (f *FakeCommandRunner) Copy(file assets.CopyableFile) error { var b bytes.Buffer diff --git a/pkg/minikube/command/kic_runner.go b/pkg/minikube/command/kic_runner.go index 5cc78687bc6e..ae14308a22cb 100644 --- a/pkg/minikube/command/kic_runner.go +++ b/pkg/minikube/command/kic_runner.go @@ -131,6 +131,14 @@ func (k *kicRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) { } +func (k *kicRunner) StartCmd(cmd *exec.Cmd) (*StartedCmd, error) { + return nil, fmt.Errorf("kicRunner does not support StartCmd - you could be the first to add it") +} + +func (k *kicRunner) WaitCmd(sc *StartedCmd) (*RunResult, error) { + return nil, fmt.Errorf("kicRunner does not support WaitCmd - you could be the first to add it") +} + // Copy copies a file and its permissions func (k *kicRunner) Copy(f assets.CopyableFile) error { dst := path.Join(path.Join(f.GetTargetDir(), f.GetTargetName())) diff --git a/pkg/minikube/command/ssh_runner.go b/pkg/minikube/command/ssh_runner.go index 8dc2c15c8e58..e2e6bc389e59 100644 --- a/pkg/minikube/command/ssh_runner.go +++ b/pkg/minikube/command/ssh_runner.go @@ -46,6 +46,7 @@ var ( type SSHRunner struct { d drivers.Driver c *ssh.Client + s *ssh.Session } // NewSSHRunner returns a new SSHRunner that will run commands @@ -194,6 +195,100 @@ func (s *SSHRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) { return rr, fmt.Errorf("%s: %v\nstdout:\n%s\nstderr:\n%s", rr.Command(), err, rr.Stdout.String(), rr.Stderr.String()) } +// teeSSHStart starts a non-blocking SSH command, streaming stdout, stderr to logs +func teeSSHStart(s *ssh.Session, cmd string, outB io.Writer, errB io.Writer) error { + outPipe, err := s.StdoutPipe() + if err != nil { + return errors.Wrap(err, "stdout") + } + + errPipe, err := s.StderrPipe() + if err != nil { + return errors.Wrap(err, "stderr") + } + + go func() { + if err := teePrefix(ErrPrefix, errPipe, errB, klog.V(8).Infof); err != nil { + klog.Errorf("tee stderr: %v", err) + } + }() + go func() { + if err := teePrefix(OutPrefix, outPipe, outB, klog.V(8).Infof); err != nil { + klog.Errorf("tee stdout: %v", err) + } + }() + + return s.Start(cmd) +} + +// StartCmd implements the Command Runner interface to start a exec.Cmd object +func (s *SSHRunner) StartCmd(cmd *exec.Cmd) (*StartedCmd, error) { + if cmd.Stdin != nil { + return nil, fmt.Errorf("SSHRunner does not support stdin - you could be the first to add it") + } + + if s.s != nil { + return nil, fmt.Errorf("another SSH command has been started and is currently running") + } + + rr := &RunResult{Args: cmd.Args} + sc := &StartedCmd{cmd: cmd, rr: rr} + klog.Infof("Start: %v", rr.Command()) + + var outb, errb io.Writer + + if cmd.Stdout == nil { + var so bytes.Buffer + outb = io.MultiWriter(&so, &rr.Stdout) + } else { + outb = io.MultiWriter(cmd.Stdout, &rr.Stdout) + } + + if cmd.Stderr == nil { + var se bytes.Buffer + errb = io.MultiWriter(&se, &rr.Stderr) + } else { + errb = io.MultiWriter(cmd.Stderr, &rr.Stderr) + } + + sess, err := s.session() + if err != nil { + return sc, errors.Wrap(err, "NewSession") + } + + s.s = sess + + err = teeSSHStart(s.s, shellquote.Join(cmd.Args...), outb, errb) + + return sc, err +} + +// WaitCmd implements the Command Runner interface to wait until a started exec.Cmd object finishes +func (s *SSHRunner) WaitCmd(sc *StartedCmd) (*RunResult, error) { + if s.s == nil { + return nil, fmt.Errorf("there is no SSH command started") + } + + rr := sc.rr + + err := s.s.Wait() + if exitError, ok := err.(*exec.ExitError); ok { + rr.ExitCode = exitError.ExitCode() + } + + if err := s.s.Close(); err != io.EOF { + klog.Errorf("session close: %v", err) + } + + s.s = nil + + if err == nil { + return rr, nil + } + + return rr, fmt.Errorf("%s: %v\nstdout:\n%s\nstderr:\n%s", rr.Command(), err, rr.Stdout.String(), rr.Stderr.String()) +} + // Copy copies a file to the remote over SSH. func (s *SSHRunner) Copy(f assets.CopyableFile) error { dst := path.Join(path.Join(f.GetTargetDir(), f.GetTargetName())) diff --git a/pkg/minikube/cruntime/cruntime.go b/pkg/minikube/cruntime/cruntime.go index 332dc1be4c78..5a939c2f1a79 100644 --- a/pkg/minikube/cruntime/cruntime.go +++ b/pkg/minikube/cruntime/cruntime.go @@ -54,7 +54,15 @@ func ValidRuntimes() []string { // CommandRunner is the subset of command.Runner this package consumes type CommandRunner interface { + // RunCmd is a blocking method that runs a command + // Use this if you don't need to stream stdout and stderr in real-time RunCmd(cmd *exec.Cmd) (*command.RunResult, error) + // StartCmd is a non-blocking method that starts a command + // Use WaitCmd to block until the command is complete + // Use this if you need to stream stdout and/or stderr in real-time + StartCmd(cmd *exec.Cmd) (*command.StartedCmd, error) + // WaitCmd blocks until the started command completes + WaitCmd(sc *command.StartedCmd) (*command.RunResult, error) // Copy is a convenience method that runs a command to copy a file Copy(assets.CopyableFile) error // Remove is a convenience method that runs a command to remove a file diff --git a/pkg/minikube/cruntime/cruntime_test.go b/pkg/minikube/cruntime/cruntime_test.go index 3bd39bac7834..11acd37c1330 100644 --- a/pkg/minikube/cruntime/cruntime_test.go +++ b/pkg/minikube/cruntime/cruntime_test.go @@ -217,6 +217,14 @@ func (f *FakeRunner) RunCmd(cmd *exec.Cmd) (*command.RunResult, error) { } } +func (f *FakeRunner) StartCmd(cmd *exec.Cmd) (*command.StartedCmd, error) { + return &command.StartedCmd{}, nil +} + +func (f *FakeRunner) WaitCmd(sc *command.StartedCmd) (*command.RunResult, error) { + return &command.RunResult{}, nil +} + func (f *FakeRunner) Copy(assets.CopyableFile) error { return nil } diff --git a/pkg/minikube/out/register/register.go b/pkg/minikube/out/register/register.go index 985d81463da2..ebb6c13a4cd7 100644 --- a/pkg/minikube/out/register/register.go +++ b/pkg/minikube/out/register/register.go @@ -26,20 +26,23 @@ import ( // If you add a new step here, please also add it to register.Reg registry inside the init() function const ( - InitialSetup RegStep = "Initial Minikube Setup" - SelectingDriver RegStep = "Selecting Driver" - DownloadingArtifacts RegStep = "Downloading Artifacts" - StartingNode RegStep = "Starting Node" - PullingBaseImage RegStep = "Pulling Base Image" - RunningLocalhost RegStep = "Running on Localhost" - LocalOSRelease RegStep = "Local OS Release" - CreatingContainer RegStep = "Creating Container" - CreatingVM RegStep = "Creating VM" - ConfiguringLHEnv RegStep = "Configuring Localhost Environment" - PreparingKubernetes RegStep = "Preparing Kubernetes" - VerifyingKubernetes RegStep = "Verifying Kubernetes" - EnablingAddons RegStep = "Enabling Addons" - Done RegStep = "Done" + InitialSetup RegStep = "Initial Minikube Setup" + SelectingDriver RegStep = "Selecting Driver" + DownloadingArtifacts RegStep = "Downloading Artifacts" + StartingNode RegStep = "Starting Node" + PullingBaseImage RegStep = "Pulling Base Image" + RunningLocalhost RegStep = "Running on Localhost" + LocalOSRelease RegStep = "Local OS Release" + CreatingContainer RegStep = "Creating Container" + CreatingVM RegStep = "Creating VM" + ConfiguringLHEnv RegStep = "Configuring Localhost Environment" + PreparingKubernetes RegStep = "Preparing Kubernetes" + PreparingKubernetesCerts RegStep = "Generating certificates" + PreparingKubernetesControlPlane RegStep = "Booting control plane" + PreparingKubernetesBootstrapToken RegStep = "Configuring RBAC rules" + VerifyingKubernetes RegStep = "Verifying Kubernetes" + EnablingAddons RegStep = "Enabling Addons" + Done RegStep = "Done" Stopping RegStep = "Stopping" PowerOff RegStep = "PowerOff" @@ -77,6 +80,9 @@ func init() { CreatingContainer, CreatingVM, PreparingKubernetes, + PreparingKubernetesCerts, + PreparingKubernetesControlPlane, + PreparingKubernetesBootstrapToken, ConfiguringLHEnv, VerifyingKubernetes, EnablingAddons,