Skip to content

Commit

Permalink
Added sub-step logging to adm init step on start
Browse files Browse the repository at this point in the history
  • Loading branch information
spowelljr committed Dec 9, 2020
1 parent c0a092e commit cf35c15
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 17 deletions.
45 changes: 42 additions & 3 deletions pkg/minikube/bootstrapper/kubeadm/kubeadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package kubeadm

import (
"bufio"
"context"
"fmt"
"io"
"net"
"os/exec"
"path"
Expand Down Expand Up @@ -226,19 +228,25 @@ func (k *Bootstrapper) init(cfg config.ClusterConfig) error {
conf := bsutil.KubeadmYamlPath
ctx, cancel := context.WithTimeout(context.Background(), initTimeoutMinutes*time.Minute)
defer cancel()
admInitLogReader, admInitLogWriter := io.Pipe()
c := exec.CommandContext(ctx, "/bin/bash", "-c", fmt.Sprintf("%s init --config %s %s --ignore-preflight-errors=%s",
bsutil.InvokeKubeadm(cfg.KubernetesConfig.KubernetesVersion), conf, extraFlags, strings.Join(ignore, ",")))
if _, err := k.c.RunCmd(c); err != nil {
c.Stdout = admInitLogWriter
sc, err := k.c.StartCmd(c)
if err != nil {
return errors.Wrap(err, "start")
}
go outputAdmInitSteps(admInitLogReader)
if _, err := k.c.WaitCmd(sc); err != nil {
if ctx.Err() == context.DeadlineExceeded {
return ErrInitTimedout
}

if strings.Contains(err.Error(), "'kubeadm': Permission denied") {
return ErrNoExecLinux
}
return errors.Wrap(err, "run")
return errors.Wrap(err, "wait")
}

if err := k.applyCNI(cfg); err != nil {
return errors.Wrap(err, "apply cni")
}
Expand Down Expand Up @@ -272,6 +280,37 @@ func (k *Bootstrapper) init(cfg config.ClusterConfig) error {
return nil
}

// ouputAdmInitSteps streams the pipe and outputs the current step
func outputAdmInitSteps(logs io.Reader) {
type step struct {
logTag string
registerStep register.RegStep
stepMessage string
}

steps := []step{
{logTag: "certs", registerStep: register.PreparingKubernetesCerts, stepMessage: "Generating certificates and keys ..."},
{logTag: "control-plane", registerStep: register.PreparingKubernetesControlPlane, stepMessage: "Booting up control plane ..."},
{logTag: "bootstrap-token", registerStep: register.PreparingKubernetesBootstrapToken, stepMessage: "Configuring RBAC rules ..."},
}
nextStepIndex := 0

scanner := bufio.NewScanner(logs)
for scanner.Scan() {
if nextStepIndex >= len(steps) {
scanner.Text()
continue
}
nextStep := steps[nextStepIndex]
if !strings.Contains(scanner.Text(), fmt.Sprintf("[%s]", nextStep.logTag)) {
continue
}
register.Reg.SetStep(nextStep.registerStep)
out.Step(style.Option, nextStep.stepMessage)
nextStepIndex++
}
}

// applyCNI applies CNI to a cluster. Needs to be done every time a VM is powered up.
func (k *Bootstrapper) applyCNI(cfg config.ClusterConfig) error {
cnm, err := cni.New(cfg)
Expand Down
14 changes: 14 additions & 0 deletions pkg/minikube/command/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,26 @@ type RunResult struct {
Args []string // the args that was passed to Runner
}

// StartedCmd holds the contents of a started command
type StartedCmd struct {
cmd *exec.Cmd
rr *RunResult
}

// Runner represents an interface to run commands.
type Runner interface {
// RunCmd runs a cmd of exec.Cmd type. allowing user to set cmd.Stdin, cmd.Stdout,...
// not all implementors are guaranteed to handle all the properties of cmd.
RunCmd(cmd *exec.Cmd) (*RunResult, error)

// StartCmd starts a cmd of exec.Cmd type.
// This func in non-blocking, use WaitCmd to block until complete.
// Not all implementors are guaranteed to handle all the properties of cmd.
StartCmd(cmd *exec.Cmd) (*StartedCmd, error)

// WaitCmd will prevent further execution until the started command has completed.
WaitCmd(startedCmd *StartedCmd) (*RunResult, error)

// Copy is a convenience method that runs a command to copy a file
Copy(assets.CopyableFile) error

Expand Down
47 changes: 47 additions & 0 deletions pkg/minikube/command/exec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,53 @@ func (*execRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) {
return rr, fmt.Errorf("%s: %v\nstdout:\n%s\nstderr:\n%s", rr.Command(), err, rr.Stdout.String(), rr.Stderr.String())
}

// StartCmd implements the Command Runner interface to start a exec.Cmd object
func (*execRunner) StartCmd(cmd *exec.Cmd) (*StartedCmd, error) {
rr := &RunResult{Args: cmd.Args}
sc := &StartedCmd{cmd: cmd, rr: rr}
klog.Infof("Start: %v", rr.Command())

var outb, errb io.Writer
if cmd.Stdout == nil {
var so bytes.Buffer
outb = io.MultiWriter(&so, &rr.Stdout)
} else {
outb = io.MultiWriter(cmd.Stdout, &rr.Stdout)
}

if cmd.Stderr == nil {
var se bytes.Buffer
errb = io.MultiWriter(&se, &rr.Stderr)
} else {
errb = io.MultiWriter(cmd.Stderr, &rr.Stderr)
}

cmd.Stdout = outb
cmd.Stderr = errb

if err := cmd.Start(); err != nil {
return sc, errors.Wrap(err, "start")
}

return sc, nil
}

// WaitCmd implements the Command Runner interface to wait until a started exec.Cmd object finishes
func (*execRunner) WaitCmd(sc *StartedCmd) (*RunResult, error) {
rr := sc.rr

err := sc.cmd.Wait()
if exitError, ok := err.(*exec.ExitError); ok {
rr.ExitCode = exitError.ExitCode()
}

if err == nil {
return rr, nil
}

return rr, fmt.Errorf("%s: %v\nstdout:\n%s\nstderr:\n%s", rr.Command(), err, rr.Stdout.String(), rr.Stderr.String())
}

// Copy copies a file and its permissions
func (e *execRunner) Copy(f assets.CopyableFile) error {
dst := path.Join(f.GetTargetDir(), f.GetTargetName())
Expand Down
41 changes: 41 additions & 0 deletions pkg/minikube/command/fake_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,47 @@ func (f *FakeCommandRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) {
return rr, nil
}

// StartCmd implements the Command Runner interface to start a exec.Cmd object
func (f *FakeCommandRunner) StartCmd(cmd *exec.Cmd) (*StartedCmd, error) {
rr := &RunResult{Args: cmd.Args}
sc := &StartedCmd{cmd: cmd, rr: rr}
klog.Infof("(FakeCommandRunner) Start: %v", rr.Command())

key := rr.Command()
out, ok := f.cmdMap.Load(key)
if !ok {
cmds := f.commands()
if len(cmds) == 0 {
return sc, fmt.Errorf("asked to execute %s, but FakeCommandRunner has no commands stored", rr.Command())
}

var txt strings.Builder
for _, c := range f.commands() {
txt.WriteString(fmt.Sprintf(" `%s`\n", c))
}
return sc, fmt.Errorf("unregistered command:\n `%s`\nexpected one of:\n%s", key, txt.String())
}

var buf bytes.Buffer
outStr := ""
if out != nil {
outStr = out.(string)
}
_, err := buf.WriteString(outStr)
if err != nil {
return sc, errors.Wrap(err, "Writing outStr to FakeCommandRunner's buffer")
}
rr.Stdout = buf
rr.Stderr = buf

return sc, nil
}

// WaitCmd implements the Command Runner interface to wait until a started exec.Cmd object finishes
func (f *FakeCommandRunner) WaitCmd(sc *StartedCmd) (*RunResult, error) {
return sc.rr, nil
}

// Copy adds the filename, file contents key value pair to the stored map.
func (f *FakeCommandRunner) Copy(file assets.CopyableFile) error {
var b bytes.Buffer
Expand Down
8 changes: 8 additions & 0 deletions pkg/minikube/command/kic_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ func (k *kicRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) {

}

func (k *kicRunner) StartCmd(cmd *exec.Cmd) (*StartedCmd, error) {
return nil, fmt.Errorf("kicRunner does not support StartCmd - you could be the first to add it")
}

func (k *kicRunner) WaitCmd(sc *StartedCmd) (*RunResult, error) {
return nil, fmt.Errorf("kicRunner does not support WaitCmd - you could be the first to add it")
}

// Copy copies a file and its permissions
func (k *kicRunner) Copy(f assets.CopyableFile) error {
dst := path.Join(path.Join(f.GetTargetDir(), f.GetTargetName()))
Expand Down
95 changes: 95 additions & 0 deletions pkg/minikube/command/ssh_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
type SSHRunner struct {
d drivers.Driver
c *ssh.Client
s *ssh.Session
}

// NewSSHRunner returns a new SSHRunner that will run commands
Expand Down Expand Up @@ -194,6 +195,100 @@ func (s *SSHRunner) RunCmd(cmd *exec.Cmd) (*RunResult, error) {
return rr, fmt.Errorf("%s: %v\nstdout:\n%s\nstderr:\n%s", rr.Command(), err, rr.Stdout.String(), rr.Stderr.String())
}

// teeSSHStart starts a non-blocking SSH command, streaming stdout, stderr to logs
func teeSSHStart(s *ssh.Session, cmd string, outB io.Writer, errB io.Writer) error {
outPipe, err := s.StdoutPipe()
if err != nil {
return errors.Wrap(err, "stdout")
}

errPipe, err := s.StderrPipe()
if err != nil {
return errors.Wrap(err, "stderr")
}

go func() {
if err := teePrefix(ErrPrefix, errPipe, errB, klog.V(8).Infof); err != nil {
klog.Errorf("tee stderr: %v", err)
}
}()
go func() {
if err := teePrefix(OutPrefix, outPipe, outB, klog.V(8).Infof); err != nil {
klog.Errorf("tee stdout: %v", err)
}
}()

return s.Start(cmd)
}

// StartCmd implements the Command Runner interface to start a exec.Cmd object
func (s *SSHRunner) StartCmd(cmd *exec.Cmd) (*StartedCmd, error) {
if cmd.Stdin != nil {
return nil, fmt.Errorf("SSHRunner does not support stdin - you could be the first to add it")
}

if s.s != nil {
return nil, fmt.Errorf("another SSH command has been started and is currently running")
}

rr := &RunResult{Args: cmd.Args}
sc := &StartedCmd{cmd: cmd, rr: rr}
klog.Infof("Start: %v", rr.Command())

var outb, errb io.Writer

if cmd.Stdout == nil {
var so bytes.Buffer
outb = io.MultiWriter(&so, &rr.Stdout)
} else {
outb = io.MultiWriter(cmd.Stdout, &rr.Stdout)
}

if cmd.Stderr == nil {
var se bytes.Buffer
errb = io.MultiWriter(&se, &rr.Stderr)
} else {
errb = io.MultiWriter(cmd.Stderr, &rr.Stderr)
}

sess, err := s.session()
if err != nil {
return sc, errors.Wrap(err, "NewSession")
}

s.s = sess

err = teeSSHStart(s.s, shellquote.Join(cmd.Args...), outb, errb)

return sc, err
}

// WaitCmd implements the Command Runner interface to wait until a started exec.Cmd object finishes
func (s *SSHRunner) WaitCmd(sc *StartedCmd) (*RunResult, error) {
if s.s == nil {
return nil, fmt.Errorf("there is no SSH command started")
}

rr := sc.rr

err := s.s.Wait()
if exitError, ok := err.(*exec.ExitError); ok {
rr.ExitCode = exitError.ExitCode()
}

if err := s.s.Close(); err != io.EOF {
klog.Errorf("session close: %v", err)
}

s.s = nil

if err == nil {
return rr, nil
}

return rr, fmt.Errorf("%s: %v\nstdout:\n%s\nstderr:\n%s", rr.Command(), err, rr.Stdout.String(), rr.Stderr.String())
}

// Copy copies a file to the remote over SSH.
func (s *SSHRunner) Copy(f assets.CopyableFile) error {
dst := path.Join(path.Join(f.GetTargetDir(), f.GetTargetName()))
Expand Down
4 changes: 4 additions & 0 deletions pkg/minikube/cruntime/cruntime.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func ValidRuntimes() []string {
// CommandRunner is the subset of command.Runner this package consumes
type CommandRunner interface {
RunCmd(cmd *exec.Cmd) (*command.RunResult, error)
// StartCmd is a non-blocking method that starts a command
StartCmd(cmd *exec.Cmd) (*command.StartedCmd, error)
// WaitCmd blocks until the started command completes
WaitCmd(sc *command.StartedCmd) (*command.RunResult, error)
// Copy is a convenience method that runs a command to copy a file
Copy(assets.CopyableFile) error
// Remove is a convenience method that runs a command to remove a file
Expand Down
8 changes: 8 additions & 0 deletions pkg/minikube/cruntime/cruntime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ func (f *FakeRunner) RunCmd(cmd *exec.Cmd) (*command.RunResult, error) {
}
}

func (f *FakeRunner) StartCmd(cmd *exec.Cmd) (*command.StartedCmd, error) {
return &command.StartedCmd{}, nil
}

func (f *FakeRunner) WaitCmd(sc *command.StartedCmd) (*command.RunResult, error) {
return &command.RunResult{}, nil
}

func (f *FakeRunner) Copy(assets.CopyableFile) error {
return nil
}
Expand Down
Loading

0 comments on commit cf35c15

Please sign in to comment.