diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index 8a2e72b44a52..c50a3cfc9510 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -2071,29 +2071,13 @@ func (c *clusterImpl) Install( return errors.Wrap(roachprod.Install(ctx, l, c.MakeNodes(nodes), software), "cluster.Install") } -var reOnlyAlphanumeric = regexp.MustCompile(`[^a-zA-Z0-9]+`) - // cmdLogFileName comes up with a log file to use for the given argument string. func cmdLogFileName(t time.Time, nodes option.NodeListOption, args ...string) string { - // Make sure we treat {"./cockroach start"} like {"./cockroach", "start"}. - args = strings.Split(strings.Join(args, " "), " ") - prefix := []string{reOnlyAlphanumeric.ReplaceAllString(args[0], "")} - for _, arg := range args[1:] { - if s := reOnlyAlphanumeric.ReplaceAllString(arg, ""); s != arg { - break - } - prefix = append(prefix, arg) - } - s := strings.Join(prefix, "_") - const maxLen = 70 - if len(s) > maxLen { - s = s[:maxLen] - } logFile := fmt.Sprintf( "run_%s_n%s_%s", t.Format(`150405.000000000`), nodes.String()[1:], - s, + install.GenFilenameFromArgs(20, args...), ) return logFile } diff --git a/pkg/cmd/roachtest/cluster_test.go b/pkg/cmd/roachtest/cluster_test.go index 787d705f9649..86ff4b27b840 100644 --- a/pkg/cmd/roachtest/cluster_test.go +++ b/pkg/cmd/roachtest/cluster_test.go @@ -172,7 +172,7 @@ func TestClusterMachineType(t *testing.T) { func TestCmdLogFileName(t *testing.T) { ts := time.Date(2000, 1, 1, 15, 4, 12, 0, time.Local) - const exp = `run_150412.000000000_n1,3-4,9_cockroach_bla` + const exp = `run_150412.000000000_n1,3-4,9_cockroach-bla-foo-ba` nodes := option.NodeListOption{1, 3, 4, 9} assert.Equal(t, exp, diff --git a/pkg/cmd/roachtest/test_impl.go b/pkg/cmd/roachtest/test_impl.go index 853c42d502b1..641dd53f6636 100644 --- a/pkg/cmd/roachtest/test_impl.go +++ b/pkg/cmd/roachtest/test_impl.go @@ -344,15 +344,14 @@ func (t *testImpl) addFailure(format string, args ...interface{}) { formatFailure(&b, reportFailure) msg := b.String() - t.L().Printf("test failure #%d: %s", len(t.mu.failures), msg) + failureNum := len(t.mu.failures) + failureLog := fmt.Sprintf("failure_%d", failureNum) + t.L().Printf("test failure #%d: full stack retained in %s.log: %s", failureNum, failureLog, msg) // Also dump the verbose error (incl. all stack traces) to a log file, in case // we need it. The stacks are sometimes helpful, but we don't want them in the // main log as they are highly verbose. { - cl, err := t.L().ChildLogger( - fmt.Sprintf("failure_%d", len(t.mu.failures)), - logger.QuietStderr, logger.QuietStdout, - ) + cl, err := t.L().ChildLogger(failureLog, logger.QuietStderr, logger.QuietStdout) if err == nil { // We don't actually log through this logger since it adds an unrelated // file:line caller (namely ours). The error already has stack traces diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index 7dd34b8ff453..2807f7d83bb9 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -104,7 +104,7 @@ func NewSyncedCluster( var ErrAfterRetry = errors.New("error occurred after retries") // The first retry is after 5s, the second and final is after 25s -var defaultRunRetryOpt = retry.Options{ +var defaultRetryOpt = retry.Options{ InitialBackoff: 5 * time.Second, Multiplier: 5, MaxBackoff: 1 * time.Minute, @@ -112,68 +112,76 @@ var defaultRunRetryOpt = retry.Options{ MaxRetries: 2, } -// runWithMaybeRetry will run the specified function `f` at least once. -// Any returned error from `f` is passed to the `shouldRetryFn` which, +type RunRetryOpts struct { + retry.Options + shouldRetryFn func(*RunResultDetails) bool +} + +func newRunRetryOpts( + retryOpts retry.Options, shouldRetryFn func(*RunResultDetails) bool, +) *RunRetryOpts { + return &RunRetryOpts{ + Options: retryOpts, + shouldRetryFn: shouldRetryFn, + } +} + +var DefaultSSHRetryOpts = newRunRetryOpts(defaultRetryOpt, func(res *RunResultDetails) bool { return errors.Is(res.Err, rperrors.ErrSSH255) }) + +// defaultSCPRetry assumes any error is retryable +var defaultSCPRetry = newRunRetryOpts(defaultRetryOpt, func(res *RunResultDetails) bool { return true }) + +// runWithMaybeRetry will run the specified function `f` at least once, or only +// once if `runRetryOpts` is nil +// Any returned error from `f` is passed to `runRetryOpts.shouldRetryFn` which, // if it returns true, will result in `f` being retried using the `retryOpts` -// If the `shouldRetryFn` is not specified (nil), then no retries will be -// performed. +// If the `shouldRetryFn` is not specified (nil), then retries will be performed +// regardless of the previous result / error // -// We operate on a pointer to RunResultDetails as it has already have been +// We operate on a pointer to RunResultDetails as it has already been // captured in a *RunResultDetails[] in Run, but here we may enrich with attempt // number and a wrapper error. func runWithMaybeRetry( - l *logger.Logger, - retryOpts retry.Options, - shouldRetryFn func(*RunResultDetails) bool, - f func() (*RunResultDetails, error), + l *logger.Logger, retryOpts *RunRetryOpts, f func() (*RunResultDetails, error), ) (*RunResultDetails, error) { - var err error - var res *RunResultDetails + if retryOpts == nil { + res, err := f() + res.Attempt = 1 + return res, err + } + var res *RunResultDetails + var err error var cmdErr error - for r := retry.Start(retryOpts); r.Next(); { + for r := retry.Start(retryOpts.Options); r.Next(); { res, err = f() res.Attempt = r.CurrentAttempt() + 1 // nil err (denoting a roachprod error) indicates a potentially retryable res.Err if err == nil && res.Err != nil { cmdErr = errors.CombineErrors(cmdErr, res.Err) - if shouldRetryFn != nil && shouldRetryFn(res) { - l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1) + if retryOpts.shouldRetryFn == nil || retryOpts.shouldRetryFn(res) { + l.Printf("encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1) continue } } break } - if res.Err != nil && res.Attempt > 1 { - // An error cannot be marked with more than one reference error. Since res.Err may already be marked, we create - // a new error here and mark it. - res.Err = errors.Mark(errors.Wrapf(cmdErr, "error persisted after %v attempts", res.Attempt), ErrAfterRetry) + if res.Attempt > 1 { + if res.Err != nil { + // An error cannot be marked with more than one reference error. Since res.Err may already be marked, we create + // a new error here and mark it. + res.Err = errors.Mark(errors.Wrapf(cmdErr, "error persisted after %v attempts", res.Attempt), ErrAfterRetry) + } else { + l.Printf("command successful after %v attempts", res.Attempt) + } } return res, err } -// runWithDefaultSSHRetry will retry an SSH command which returns an error with exit code 255 -func runWithDefaultSSHRetry( - l *logger.Logger, f func() (*RunResultDetails, error), -) (*RunResultDetails, error) { - return runWithMaybeRetry( - l, - defaultRunRetryOpt, - func(res *RunResultDetails) bool { return errors.Is(res.Err, rperrors.ErrSSH255) }, - f, - ) -} - -// scpWithDefaultRetry assumes that any error returned from an scp attempt is retryable -func scpWithDefaultRetry(l *logger.Logger, src, dest string) (*RunResultDetails, error) { - return runWithMaybeRetry( - l, - defaultRunRetryOpt, - func(*RunResultDetails) bool { return true }, - func() (*RunResultDetails, error) { return scp(src, dest) }, - ) +func scpWithRetry(l *logger.Logger, src, dest string) (*RunResultDetails, error) { + return runWithMaybeRetry(l, defaultSCPRetry, func() (*RunResultDetails, error) { return scp(src, dest) }) } // Host returns the public IP of a node. @@ -210,22 +218,20 @@ func (c *SyncedCluster) TargetNodes() Nodes { } // GetInternalIP returns the internal IP address of the specified node. -func (c *SyncedCluster) GetInternalIP(ctx context.Context, n Node) (string, error) { +func (c *SyncedCluster) GetInternalIP( + l *logger.Logger, ctx context.Context, n Node, +) (string, error) { if c.IsLocal() { return c.Host(n), nil } - session, err := c.newSession(n) - if err != nil { - return "", errors.Wrapf(err, "GetInternalIP: failed dial %s:%d", c.Name, n) - } - defer session.Close() + sess := c.newSession(l, n, `hostname --all-ip-addresses`, "get-internal-ip") + defer sess.Close() var stdout, stderr strings.Builder - session.SetStdout(&stdout) - session.SetStderr(&stderr) - cmd := `hostname --all-ip-addresses` - if err := session.Run(ctx, cmd); err != nil { + sess.SetStdout(&stdout) + sess.SetStderr(&stderr) + if err := sess.Run(ctx); err != nil { return "", errors.Wrapf(err, "GetInternalIP: failed to execute hostname on %s:%d:\n(stdout) %s\n(stderr) %s", c.Name, n, stdout.String(), stderr.String()) @@ -285,11 +291,22 @@ func (c *SyncedCluster) roachprodEnvRegex(node Node) string { return fmt.Sprintf(`ROACHPROD=%s[ \/]`, escaped) } -func (c *SyncedCluster) newSession(node Node) (session, error) { +// cmdDebugName is the suffix of the generated ssh debug file +// If it is "", a suffix will be generated from the cmd string +func (c *SyncedCluster) newSession( + l *logger.Logger, node Node, cmd string, cmdDebugName string, +) session { if c.IsLocal() { - return newLocalSession(), nil + return newLocalSession(cmd) + } + command := remoteCommand{ + node: node, + user: c.user(node), + host: c.Host(node), + cmd: cmd, + debugName: cmdDebugName, } - return newRemoteSession(c.user(node), c.Host(node), c.DebugDir) + return newRemoteSession(l, command) } // Stop is used to stop cockroach on all nodes in the cluster. @@ -315,11 +332,6 @@ func (c *SyncedCluster) Stop( } return c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() var waitCmd string if wait { @@ -363,11 +375,15 @@ fi`, sig, // [3] waitCmd, // [4] ) - out, cmdErr := sess.CombinedOutput(ctx, cmd) + + sess := c.newSession(l, node, cmd, "node-stop") + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out return res, res.Err - }) + }, nil) // Disable SSH Retries } // Wipe TODO(peter): document @@ -378,12 +394,6 @@ func (c *SyncedCluster) Wipe(ctx context.Context, l *logger.Logger, preserveCert } return c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - var cmd string if c.IsLocal() { // Not all shells like brace expansion, so we'll do it here @@ -405,11 +415,14 @@ sudo rm -fr logs && cmd += "sudo rm -fr tenant-certs* ;\n" } } - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, "node-wipe") + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out return res, res.Err - }) + }, DefaultSSHRetryOpts) } // NodeStatus contains details about the status of a node. @@ -427,11 +440,6 @@ func (c *SyncedCluster) Status(ctx context.Context, l *logger.Logger) ([]NodeSta results := make([]NodeStatus, len(c.Nodes)) if err := c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() binary := cockroachNodeBinary(c, node) cmd := fmt.Sprintf(`out=$(ps axeww -o pid -o ucomm -o command | \ @@ -446,7 +454,10 @@ else echo ${out} fi ` - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, "node-status") + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -463,7 +474,7 @@ fi results[i] = NodeStatus{Running: true, Version: info[0], Pid: info[1]} return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return nil, err } for i := 0; i < len(results); i++ { @@ -505,7 +516,9 @@ type MonitorOpts struct { // If ignoreEmptyNodes is true, nodes on which no CockroachDB data is found // (in {store-dir}) will not be probed and single message, "skipped", will // be emitted for them. -func (c *SyncedCluster) Monitor(ctx context.Context, opts MonitorOpts) chan NodeMonitorInfo { +func (c *SyncedCluster) Monitor( + l *logger.Logger, ctx context.Context, opts MonitorOpts, +) chan NodeMonitorInfo { ch := make(chan NodeMonitorInfo) nodes := c.TargetNodes() var wg sync.WaitGroup @@ -514,20 +527,7 @@ func (c *SyncedCluster) Monitor(ctx context.Context, opts MonitorOpts) chan Node wg.Add(1) go func(i int) { defer wg.Done() - sess, err := c.newSession(nodes[i]) - if err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} - wg.Done() - return - } - defer sess.Close() - - p, err := sess.StdoutPipe() - if err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} - wg.Done() - return - } + node := nodes[i] // On each monitored node, we loop looking for a cockroach process. data := struct { @@ -539,8 +539,8 @@ func (c *SyncedCluster) Monitor(ctx context.Context, opts MonitorOpts) chan Node }{ OneShot: opts.OneShot, IgnoreEmpty: opts.IgnoreEmptyNodes, - Store: c.NodeDir(nodes[i], 1 /* storeIndex */), - Port: c.NodePort(nodes[i]), + Store: c.NodeDir(node, 1 /* storeIndex */), + Port: c.NodePort(node), Local: c.IsLocal(), } @@ -603,14 +603,23 @@ done t := template.Must(template.New("script").Parse(snippet)) var buf bytes.Buffer if err := t.Execute(&buf, data); err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} + ch <- NodeMonitorInfo{Node: node, Err: err} return } + sess := c.newSession(l, node, buf.String(), "node-monitor") + defer sess.Close() + + p, err := sess.StdoutPipe() + if err != nil { + ch <- NodeMonitorInfo{Node: node, Err: err} + wg.Done() + return + } // Request a PTY so that the script will receive a SIGPIPE when the // session is closed. if err := sess.RequestPty(); err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} + ch <- NodeMonitorInfo{Node: node, Err: err} return } @@ -624,12 +633,12 @@ done if err == io.EOF { return } - ch <- NodeMonitorInfo{Node: nodes[i], Msg: string(line)} + ch <- NodeMonitorInfo{Node: node, Msg: string(line)} } }(p) - if err := sess.Start(buf.String()); err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} + if err := sess.Start(); err != nil { + ch <- NodeMonitorInfo{Node: node, Err: err} return } @@ -644,7 +653,7 @@ done // pipe. Otherwise it can be closed under us, causing the reader to loop // infinitely receiving a non-`io.EOF` error. if err := sess.Wait(); err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} + ch <- NodeMonitorInfo{Node: node, Err: err} return } }(i) @@ -688,12 +697,6 @@ func (c *SyncedCluster) runCmdOnSingleNode( combined bool, stdout, stderr io.Writer, ) (*RunResultDetails, error) { - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - // Argument template expansion is node specific (e.g. for {store-dir}). e := expander{ node: node, @@ -717,9 +720,12 @@ func (c *SyncedCluster) runCmdOnSingleNode( nodeCmd = fmt.Sprintf("cd %s; %s", c.localVMDir(node), nodeCmd) } + sess := c.newSession(l, node, nodeCmd, GenFilenameFromArgs(20, expandedCmd)) + defer sess.Close() + var res *RunResultDetails if combined { - out, cmdErr := sess.CombinedOutput(ctx, nodeCmd) + out, cmdErr := sess.CombinedOutput(ctx) res = newRunResultDetails(node, cmdErr) res.CombinedOut = out } else { @@ -730,7 +736,7 @@ func (c *SyncedCluster) runCmdOnSingleNode( sess.SetStdout(multStdout) sess.SetStderr(multStderr) - res = newRunResultDetails(node, sess.Run(ctx, nodeCmd)) + res = newRunResultDetails(node, sess.Run(ctx)) res.Stderr = stderrBuffer.String() res.Stdout = stdoutBuffer.String() } @@ -773,7 +779,7 @@ func (c *SyncedCluster) Run( result, err := c.runCmdOnSingleNode(ctx, l, nodes[i], cmd, !stream, stdout, stderr) results[i] = result return result, err - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -822,7 +828,7 @@ func (c *SyncedCluster) RunWithDetails( result, err := c.runCmdOnSingleNode(ctx, l, nodes[i], cmd, false, l.Stdout, l.Stderr) resultPtrs[i] = result return result, err - }) + }, DefaultSSHRetryOpts) // Return values to preserve API results := make([]RunResultDetails, len(nodes)) @@ -870,15 +876,12 @@ func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { if err := c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] res := &RunResultDetails{Node: node} + cmd := "test -e /mnt/data1/.roachprod-initialized" for j := 0; j < 600; j++ { - sess, err := c.newSession(node) - if err != nil { - time.Sleep(500 * time.Millisecond) - continue - } + sess := c.newSession(l, node, cmd, "node-wait") defer sess.Close() - _, err = sess.CombinedOutput(ctx, "test -e /mnt/data1/.roachprod-initialized") + _, err := sess.CombinedOutput(ctx) if err != nil { time.Sleep(500 * time.Millisecond) continue @@ -888,7 +891,7 @@ func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { errs[i] = errors.New("timed out after 5m") res.Err = errs[i] return res, nil - }); err != nil { + }, nil); err != nil { return err } @@ -905,16 +908,6 @@ func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { return nil } -// setupSession is a helper which creates a new session and -// populates RunResultDetails with an error if one occurrs (unlikely -// given the code in `newSession`) -// RunResultDetails is used across all functions which -// create a session and holds error and stdout information -func (c *SyncedCluster) setupSession(node Node) (session, error) { - sess, err := c.newSession(node) - return sess, err -} - // SetupSSH configures the cluster for use with SSH. This is generally run after // the cloud.Cluster has been synced which resets the SSH credentials on the // machines and sets them up for the current user. This method enables the @@ -944,12 +937,6 @@ func (c *SyncedCluster) SetupSSH(ctx context.Context, l *logger.Logger) error { // cluster in order to allow inter-node ssh. var sshTar []byte if err := c.Parallel(l, "generating ssh key", 1, 0, func(i int) (*RunResultDetails, error) { - sess, err := c.setupSession(1) - if err != nil { - return newRunResultDetails(1, err), err - } - defer sess.Close() - // Create the ssh key and then tar up the public, private and // authorized_keys files and output them to stdout. We'll take this output // and pipe it back into tar on the other nodes in the cluster. @@ -960,12 +947,15 @@ test -f .ssh/id_rsa || \ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys ` + sess := c.newSession(l, 1, cmd, "ssh-gen-key") + defer sess.Close() + var stdout bytes.Buffer var stderr bytes.Buffer sess.SetStdout(&stdout) sess.SetStderr(&stderr) - res := newRunResultDetails(1, sess.Run(ctx, cmd)) + res := newRunResultDetails(1, sess.Run(ctx)) res.Stdout = stdout.String() res.Stderr = stderr.String() @@ -974,7 +964,7 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys } sshTar = []byte(res.Stdout) return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -982,16 +972,14 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys nodes := c.Nodes[1:] if err := c.Parallel(l, "distributing ssh key", len(nodes), 0, func(i int) (*RunResultDetails, error) { node := nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } + cmd := `tar xf -` + + sess := c.newSession(l, node, cmd, "ssh-dist-key") defer sess.Close() sess.SetStdin(bytes.NewReader(sshTar)) - cmd := `tar xf -` - out, cmdErr := sess.CombinedOutput(ctx, cmd) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -999,7 +987,7 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys return res, errors.Wrapf(res.Err, "%s: output:\n%s", cmd, res.CombinedOut) } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -1013,7 +1001,7 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys res := &RunResultDetails{Node: node} for j := 0; j < 20 && ips[i] == ""; j++ { var err error - ips[i], err = c.GetInternalIP(ctx, node) + ips[i], err = c.GetInternalIP(l, ctx, node) if err != nil { res.Err = errors.Wrapf(err, "pgurls") return res, res.Err @@ -1025,7 +1013,7 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys return res, res.Err } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -1035,11 +1023,6 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys var knownHostsData []byte if err := c.Parallel(l, "scanning hosts", 1, 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() // ssh-keyscan may return fewer than the desired number of entries if the // remote nodes are not responding yet, so we loop until we have a scan that @@ -1063,12 +1046,16 @@ for i in {1..20}; do done exit 1 ` + + sess := c.newSession(l, node, cmd, "ssh-scan-hosts") + defer sess.Close() + var stdout bytes.Buffer var stderr bytes.Buffer sess.SetStdout(&stdout) sess.SetStderr(&stderr) - res := newRunResultDetails(node, sess.Run(ctx, cmd)) + res := newRunResultDetails(node, sess.Run(ctx)) res.Stdout = stdout.String() res.Stderr = stderr.String() @@ -1077,19 +1064,12 @@ exit 1 } knownHostsData = stdout.Bytes() return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } if err := c.Parallel(l, "distributing known_hosts", len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - - sess.SetStdin(bytes.NewReader(knownHostsData)) const cmd = ` known_hosts_data="$(cat)" set -e @@ -1117,7 +1097,12 @@ if [[ "$(whoami)" != "` + config.SharedUser + `" ]]; then '"'"'{}'"'"' ~` + config.SharedUser + `/.ssh' \; fi ` - out, cmdErr := sess.CombinedOutput(ctx, cmd) + + sess := c.newSession(l, node, cmd, "ssh-dist-known-hosts") + defer sess.Close() + + sess.SetStdin(bytes.NewReader(knownHostsData)) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -1125,7 +1110,7 @@ fi return res, errors.Wrapf(res.Err, "%s: output:\n%s", cmd, res.CombinedOut) } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -1137,13 +1122,6 @@ fi // platforms. if err := c.Parallel(l, "adding additional authorized keys", len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - - sess.SetStdin(bytes.NewReader(c.AuthorizedKeys)) const cmd = ` keys_data="$(cat)" set -e @@ -1166,7 +1144,12 @@ if [[ "$(whoami)" != "` + config.SharedUser + `" ]]; then "${tmp2}" ~` + config.SharedUser + `/.ssh/authorized_keys fi ` - out, cmdErr := sess.CombinedOutput(ctx, cmd) + + sess := c.newSession(l, node, cmd, "ssh-add-extra-keys") + defer sess.Close() + + sess.SetStdin(bytes.NewReader(c.AuthorizedKeys)) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -1174,7 +1157,7 @@ fi return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } } @@ -1203,12 +1186,6 @@ func (c *SyncedCluster) DistributeCerts(ctx context.Context, l *logger.Logger) e var msg string display := fmt.Sprintf("%s: initializing certs", c.Name) if err := c.Parallel(l, display, 1, 0, func(i int) (*RunResultDetails, error) { - sess, err := c.setupSession(1) - if err != nil { - return newRunResultDetails(1, err), err - } - defer sess.Close() - var cmd string if c.IsLocal() { cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(1)) @@ -1223,7 +1200,10 @@ mkdir -p certs tar cvf %[3]s certs `, cockroachNodeBinary(c, 1), strings.Join(nodeNames, " "), certsTarName) - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, 1, cmd, "init-certs") + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(1, cmdErr) res.CombinedOut = out @@ -1231,7 +1211,7 @@ tar cvf %[3]s certs msg = fmt.Sprintf("%s: %v", res.CombinedOut, res.Err) } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -1293,14 +1273,9 @@ func (c *SyncedCluster) createTenantCertBundle( display := fmt.Sprintf("%s: initializing tenant certs", c.Name) return c.Parallel(l, display, 1, 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() var tenantScopeArg string - if c.cockroachBinSupportsTenantScope(ctx, node) { + if c.cockroachBinSupportsTenantScope(l, ctx, node) { tenantScopeArg = fmt.Sprintf("--tenant-scope %d", tenantID) } @@ -1329,7 +1304,10 @@ tar cvf %[5]s $CERT_DIR bundleName, ) - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, "create-tenant-cert-bundle") + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -1337,7 +1315,7 @@ tar cvf %[5]s $CERT_DIR return res, errors.Wrapf(res.Err, "certificate creation error: %s", res.CombinedOut) } return res, nil - }) + }, DefaultSSHRetryOpts) } // cockroachBinSupportsTenantScope is a hack to figure out if the version of @@ -1347,15 +1325,14 @@ tar cvf %[5]s $CERT_DIR // contain an integer count of commits, which does not sort correctly. Once // this feature ships in a release, it will be easier to do a version comparison // on whether this command line flag is supported. -func (c *SyncedCluster) cockroachBinSupportsTenantScope(ctx context.Context, node Node) bool { - sess, err := c.newSession(node) - if err != nil { - return false - } +func (c *SyncedCluster) cockroachBinSupportsTenantScope( + l *logger.Logger, ctx context.Context, node Node, +) bool { + cmd := fmt.Sprintf("%s cert create-client --help | grep '\\--tenant-scope'", cockroachNodeBinary(c, node)) + sess := c.newSession(l, node, cmd, "") defer sess.Close() - cmd := fmt.Sprintf("%s cert create-client --help | grep '\\--tenant-scope'", cockroachNodeBinary(c, node)) - return sess.Run(ctx, cmd) == nil + return sess.Run(ctx) == nil } // getFile retrieves the given file from the first node in the cluster. The @@ -1379,7 +1356,7 @@ func (c *SyncedCluster) getFileFromFirstNode( } srcFileName := fmt.Sprintf("%s@%s:%s", c.user(1), c.Host(1), name) - if res, _ := scpWithDefaultRetry(l, srcFileName, tmpfile.Name()); res.Err != nil { + if res, _ := scpWithRetry(l, srcFileName, tmpfile.Name()); res.Err != nil { cleanup() return "", nil, res.Err } @@ -1415,19 +1392,16 @@ func (c *SyncedCluster) fileExistsOnFirstNode( display := fmt.Sprintf("%s: checking %s", c.Name, path) if err := c.Parallel(l, display, 1, 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } + sess := c.newSession(l, node, `test -e `+path, "") defer sess.Close() - out, cmdErr := sess.CombinedOutput(ctx, `test -e `+path) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out existsErr = res.Err return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return false } return existsErr == nil @@ -1449,10 +1423,10 @@ func (c *SyncedCluster) createNodeCertArguments( node := nodes[i] res := &RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(ctx, node) + res.Stdout, res.Err = c.GetInternalIP(l, ctx, node) ips[i] = res.Stdout return res, errors.Wrapf(res.Err, "IPs") - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return nil, err } } @@ -1494,13 +1468,6 @@ func (c *SyncedCluster) distributeLocalCertsTar( display := c.Name + ": distributing certs" return c.Parallel(l, display, len(nodes), 0, func(i int) (*RunResultDetails, error) { node := nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - - sess.SetStdin(bytes.NewReader(certsTar)) var cmd string if c.IsLocal() { cmd = fmt.Sprintf("cd %s ; ", c.localVMDir(node)) @@ -1511,7 +1478,11 @@ func (c *SyncedCluster) distributeLocalCertsTar( cmd += "tar xf -" } - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, "dist-local-certs") + defer sess.Close() + + sess.SetStdin(bytes.NewReader(certsTar)) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -1519,7 +1490,7 @@ func (c *SyncedCluster) distributeLocalCertsTar( return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) } return res, nil - }) + }, DefaultSSHRetryOpts) } const progressDone = "=======================================>" @@ -1713,7 +1684,7 @@ func (c *SyncedCluster) Put( return } - res, _ := scpWithDefaultRetry(l, from, to) + res, _ := scpWithRetry(l, from, to) results <- result{i, res.Err} if res.Err != nil { @@ -2068,7 +2039,7 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err return } - res, _ := scpWithDefaultRetry(l, fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) + res, _ := scpWithRetry(l, fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) if res.Err == nil { // Make sure all created files and directories are world readable. // The CRDB process intentionally sets a 0007 umask (resulting in @@ -2187,10 +2158,10 @@ func (c *SyncedCluster) pghosts( if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*RunResultDetails, error) { node := nodes[i] res := &RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(ctx, node) + res.Stdout, res.Err = c.GetInternalIP(l, ctx, node) ips[i] = res.Stdout return res, errors.Wrapf(res.Err, "pghosts") - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return nil, err } @@ -2302,8 +2273,9 @@ func (c *SyncedCluster) Parallel( display string, count, concurrency int, fn func(i int) (*RunResultDetails, error), + runRetryOpts *RunRetryOpts, ) error { - failed, err := c.ParallelE(l, display, count, concurrency, fn) + failed, err := c.ParallelE(l, display, count, concurrency, fn, runRetryOpts) if err != nil { sort.Slice(failed, func(i, j int) bool { return failed[i].Index < failed[j].Index }) for _, f := range failed { @@ -2332,6 +2304,7 @@ func (c *SyncedCluster) ParallelE( display string, count, concurrency int, fn func(i int) (*RunResultDetails, error), + runRetryOpts *RunRetryOpts, ) ([]ParallelResult, error) { if concurrency == 0 || concurrency > count { concurrency = count @@ -2348,7 +2321,7 @@ func (c *SyncedCluster) ParallelE( startNext := func() { go func(i int) { defer wg.Done() - res, err := runWithDefaultSSHRetry(l, func() (*RunResultDetails, error) { return fn(i) }) + res, err := runWithMaybeRetry(l, runRetryOpts, func() (*RunResultDetails, error) { return fn(i) }) results <- ParallelResult{i, res.CombinedOut, err} }(index) index++ @@ -2451,3 +2424,34 @@ func (c *SyncedCluster) Init(ctx context.Context, l *logger.Logger, node Node) e return nil } + +// GenFilenameFromArgs given a list of cmd args, returns an alphahumeric string up to +// `maxLen` in length with hyphen delimiters, suitable for use in a filename. +// e.g. ["/bin/bash", "-c", "'sudo dmesg > dmesg.txt'"] -> binbash-c-sudo-dmesg +func GenFilenameFromArgs(maxLen int, args ...string) string { + cmd := strings.Join(args, " ") + var sb strings.Builder + lastCharSpace := true + + writeByte := func(b byte) { + if b == ' ' { + if lastCharSpace { + return + } + sb.WriteByte('-') + lastCharSpace = true + } else if ('a' <= b && b <= 'z') || ('A' <= b && b <= 'Z') || ('0' <= b && b <= '9') { + sb.WriteByte(b) + lastCharSpace = false + } + } + + for i := 0; i < len(cmd); i++ { + writeByte(cmd[i]) + if sb.Len() == maxLen { + return sb.String() + } + } + + return sb.String() +} diff --git a/pkg/roachprod/install/cluster_synced_test.go b/pkg/roachprod/install/cluster_synced_test.go index a5e58706a69b..d02ff540008e 100644 --- a/pkg/roachprod/install/cluster_synced_test.go +++ b/pkg/roachprod/install/cluster_synced_test.go @@ -98,24 +98,35 @@ func TestRunWithMaybeRetry(t *testing.T) { cases := []struct { f func() (*RunResultDetails, error) shouldRetryFn func(*RunResultDetails) bool + nilRetryOpts bool expectedAttempts int shouldError bool }{ - { // Happy path: no error, no retry required + { // 1. Happy path: no error, no retry required f: func() (*RunResultDetails, error) { return newResult(0), nil }, expectedAttempts: 1, shouldError: false, }, - { // Error, but not retry function specified + { // 2. Error, but with no retries f: func() (*RunResultDetails, error) { return newResult(1), nil }, + shouldRetryFn: func(*RunResultDetails) bool { + return false + }, expectedAttempts: 1, shouldError: true, }, - { // Error, with retries exhausted + { // 3. Error, but no retry function specified + f: func() (*RunResultDetails, error) { + return newResult(1), nil + }, + expectedAttempts: 3, + shouldError: true, + }, + { // 4. Error, with retries exhausted f: func() (*RunResultDetails, error) { return newResult(255), nil }, @@ -123,7 +134,7 @@ func TestRunWithMaybeRetry(t *testing.T) { expectedAttempts: 3, shouldError: true, }, - { // Eventual success after retries + { // 5. Eventual success after retries f: func() (*RunResultDetails, error) { attempt++ if attempt == 3 { @@ -135,12 +146,24 @@ func TestRunWithMaybeRetry(t *testing.T) { expectedAttempts: 3, shouldError: false, }, + { // 6. Error, runs once because nil retryOpts + f: func() (*RunResultDetails, error) { + return newResult(255), nil + }, + nilRetryOpts: true, + expectedAttempts: 1, + shouldError: true, + }, } for idx, tc := range cases { attempt = 0 t.Run(fmt.Sprintf("%d", idx+1), func(t *testing.T) { - res, _ := runWithMaybeRetry(l, testRetryOpts, tc.shouldRetryFn, tc.f) + var retryOpts *RunRetryOpts + if !tc.nilRetryOpts { + retryOpts = newRunRetryOpts(testRetryOpts, tc.shouldRetryFn) + } + res, _ := runWithMaybeRetry(l, retryOpts, tc.f) require.Equal(t, tc.shouldError, res.Err != nil) require.Equal(t, tc.expectedAttempts, res.Attempt) @@ -171,3 +194,10 @@ func nilLogger() *logger.Logger { } return l } + +func TestGenFilenameFromArgs(t *testing.T) { + const exp = "mkdir-p-logsredacted" + require.Equal(t, exp, GenFilenameFromArgs(20, "mkdir -p logs/redacted && ./cockroach")) + require.Equal(t, exp, GenFilenameFromArgs(20, "mkdir", "-p logs/redacted", "&& ./cockroach")) + require.Equal(t, exp, GenFilenameFromArgs(20, "mkdir -p logs/redacted && ./cockroach ")) +} diff --git a/pkg/roachprod/install/cockroach.go b/pkg/roachprod/install/cockroach.go index 1afa27771e28..a975dab69f18 100644 --- a/pkg/roachprod/install/cockroach.go +++ b/pkg/roachprod/install/cockroach.go @@ -172,6 +172,8 @@ func (c *SyncedCluster) Start(ctx context.Context, l *logger.Logger, startOpts S } l.Printf("%s: starting nodes", c.Name) + + // SSH retries are disabled by passing nil RunRetryOpts if err := c.Parallel(l, "", len(nodes), parallelism, func(nodeIdx int) (*RunResultDetails, error) { node := nodes[nodeIdx] res := &RunResultDetails{Node: node} @@ -216,7 +218,7 @@ func (c *SyncedCluster) Start(ctx context.Context, l *logger.Logger, startOpts S return res, errors.Wrap(err, "failed to set cluster settings") } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } if startOpts.ScheduleBackups { @@ -319,11 +321,6 @@ func (c *SyncedCluster) RunSQL(ctx context.Context, l *logger.Logger, args []str display := fmt.Sprintf("%s: executing sql", c.Name) if err := c.Parallel(l, display, len(c.Nodes), 0, func(nodeIdx int) (*RunResultDetails, error) { node := c.Nodes[nodeIdx] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() var cmd string if c.IsLocal() { @@ -333,7 +330,10 @@ func (c *SyncedCluster) RunSQL(ctx context.Context, l *logger.Logger, args []str c.NodeURL("localhost", c.NodePort(node)) + " " + ssh.Escape(args) - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, "run-sql") + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -342,7 +342,7 @@ func (c *SyncedCluster) RunSQL(ctx context.Context, l *logger.Logger, args []str } resultChan <- result{node: node, output: string(res.CombinedOut)} return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -369,19 +369,17 @@ func (c *SyncedCluster) startNode( } if err := func() error { - sess, err := c.newSession(node) - if err != nil { - return err - } - defer sess.Close() - - sess.SetStdin(strings.NewReader(startCmd)) var cmd string if c.IsLocal() { cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(node)) } cmd += `cat > cockroach.sh && chmod +x cockroach.sh` - if out, err := sess.CombinedOutput(ctx, cmd); err != nil { + + sess := c.newSession(l, node, cmd, "") + defer sess.Close() + + sess.SetStdin(strings.NewReader(startCmd)) + if out, err := sess.CombinedOutput(ctx); err != nil { return errors.Wrapf(err, "failed to upload start script: %s", out) } @@ -390,18 +388,16 @@ func (c *SyncedCluster) startNode( return "", err } - sess, err := c.newSession(node) - if err != nil { - return "", err - } - defer sess.Close() - var cmd string if c.IsLocal() { cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(node)) } cmd += "./cockroach.sh" - out, err := sess.CombinedOutput(ctx, cmd) + + sess := c.newSession(l, node, cmd, "") + defer sess.Close() + + out, err := sess.CombinedOutput(ctx) if err != nil { return "", errors.Wrapf(err, "~ %s\n%s", cmd, out) } @@ -630,13 +626,10 @@ func (c *SyncedCluster) initializeCluster(ctx context.Context, l *logger.Logger, l.Printf("%s: initializing cluster\n", c.Name) initCmd := c.generateInitCmd(node) - sess, err := c.newSession(node) - if err != nil { - return err - } + sess := c.newSession(l, node, initCmd, "init-cluster") defer sess.Close() - out, err := sess.CombinedOutput(ctx, initCmd) + out, err := sess.CombinedOutput(ctx) if err != nil { return errors.Wrapf(err, "~ %s\n%s", initCmd, out) } @@ -651,13 +644,10 @@ func (c *SyncedCluster) setClusterSettings(ctx context.Context, l *logger.Logger l.Printf("%s: setting cluster settings", c.Name) clusterSettingCmd := c.generateClusterSettingCmd(l, node) - sess, err := c.newSession(node) - if err != nil { - return err - } + sess := c.newSession(l, node, clusterSettingCmd, "set-cluster-settings") defer sess.Close() - out, err := sess.CombinedOutput(ctx, clusterSettingCmd) + out, err := sess.CombinedOutput(ctx) if err != nil { return errors.Wrapf(err, "~ %s\n%s", clusterSettingCmd, out) } diff --git a/pkg/roachprod/install/session.go b/pkg/roachprod/install/session.go index ea2fd8fe9b17..a33ba329a313 100644 --- a/pkg/roachprod/install/session.go +++ b/pkg/roachprod/install/session.go @@ -12,6 +12,7 @@ package install import ( "context" + "fmt" "io" "os" "os/exec" @@ -20,16 +21,18 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/config" rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) type session interface { - CombinedOutput(ctx context.Context, cmd string) ([]byte, error) - Run(ctx context.Context, cmd string) error + CombinedOutput(ctx context.Context) ([]byte, error) + Run(ctx context.Context) error SetStdin(r io.Reader) SetStdout(w io.Writer) SetStderr(w io.Writer) - Start(cmd string) error + Start() error StdinPipe() (io.WriteCloser, error) StdoutPipe() (io.Reader, error) StderrPipe() (io.Reader, error) @@ -44,25 +47,44 @@ type remoteSession struct { logfile string // captures ssh -vvv } -func newRemoteSession(user, host string, logdir string) (*remoteSession, error) { - // TODO(tbg): this is disabled at the time of writing. It was difficult - // to assign the logfiles to the roachtest and as a bonus our CI harness - // never actually managed to collect the files since they had wrong - // permissions; instead they clogged up the roachprod dir. - // logfile := filepath.Join( - // logdir, - // fmt.Sprintf("ssh_%s_%s", host, timeutil.Now().Format(time.RFC3339)), - // ) - const logfile = "" - args := []string{ - user + "@" + host, +type remoteCommand struct { + node Node + user string + host string + cmd string + debugName string +} - // TODO(tbg): see above. - //"-vvv", "-E", logfile, - // NB: -q suppresses -E, at least on OSX. Difficult decisions will have - // to be made if omitting -q leads to annoyance on stdout/stderr. +func newRemoteSession(l *logger.Logger, command remoteCommand) *remoteSession { + var loggingArgs []string + + if command.debugName == "" { + command.debugName = GenFilenameFromArgs(20, command.cmd) + } + + cl, err := l.ChildLogger(filepath.Join("ssh", fmt.Sprintf( + "ssh_%s_n%v_%s", + timeutil.Now().Format(`150405.000000000`), + command.node, + command.debugName, + ))) + + // Check the logger file since running roachprod from the cli will result in a fileless logger. + logfile := "" + if err == nil && l.File != nil { + logfile = cl.File.Name() + loggingArgs = []string{ + "-vvv", "-E", logfile, + } + cl.Close() + } else { + // NB: -q suppresses -E, at least on *nix. + loggingArgs = []string{"-q"} + } + //const logfile = "" + args := []string{ + command.user + "@" + command.host, - "-q", "-o", "UserKnownHostsFile=/dev/null", "-o", "StrictHostKeyChecking=no", // Send keep alives every minute to prevent connections without activity @@ -75,23 +97,23 @@ func newRemoteSession(user, host string, logdir string) (*remoteSession, error) // context cancellation killing hanging roachprod processes. "-o", "ConnectTimeout=5", } + args = append(args, loggingArgs...) args = append(args, sshAuthArgs()...) + args = append(args, command.cmd) ctx, cancel := context.WithCancel(context.Background()) - cmd := exec.CommandContext(ctx, "ssh", args...) - return &remoteSession{cmd, cancel, logfile}, nil + fullCmd := exec.CommandContext(ctx, "ssh", args...) + return &remoteSession{fullCmd, cancel, logfile} } func (s *remoteSession) errWithDebug(err error) error { if err != nil && s.logfile != "" { - err = errors.Wrapf(err, "ssh verbose log retained in %s", s.logfile) + err = errors.Wrapf(err, "ssh verbose log retained in %s", filepath.Base(s.logfile)) s.logfile = "" // prevent removal on close } return err } -func (s *remoteSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, error) { - s.Cmd.Args = append(s.Cmd.Args, cmd) - +func (s *remoteSession) CombinedOutput(ctx context.Context) ([]byte, error) { var b []byte var err error commandFinished := make(chan struct{}) @@ -111,9 +133,7 @@ func (s *remoteSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, } } -func (s *remoteSession) Run(ctx context.Context, cmd string) error { - s.Cmd.Args = append(s.Cmd.Args, cmd) - +func (s *remoteSession) Run(ctx context.Context) error { var err error commandFinished := make(chan struct{}) go func() { @@ -130,9 +150,8 @@ func (s *remoteSession) Run(ctx context.Context, cmd string) error { } } -func (s *remoteSession) Start(cmd string) error { - s.Cmd.Args = append(s.Cmd.Args, cmd) - return rperrors.ClassifyCmdError(s.Cmd.Start()) +func (s *remoteSession) Start() error { + return rperrors.ClassifyCmdError(s.errWithDebug(s.Cmd.Start())) } func (s *remoteSession) SetStdin(r io.Reader) { @@ -182,15 +201,13 @@ type localSession struct { cancel func() } -func newLocalSession() *localSession { +func newLocalSession(cmd string) *localSession { ctx, cancel := context.WithCancel(context.Background()) - cmd := exec.CommandContext(ctx, "/bin/bash", "-c") - return &localSession{cmd, cancel} + fullCmd := exec.CommandContext(ctx, "/bin/bash", "-c", cmd) + return &localSession{fullCmd, cancel} } -func (s *localSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, error) { - s.Cmd.Args = append(s.Cmd.Args, cmd) - +func (s *localSession) CombinedOutput(ctx context.Context) ([]byte, error) { var b []byte var err error commandFinished := make(chan struct{}) @@ -209,9 +226,7 @@ func (s *localSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, } } -func (s *localSession) Run(ctx context.Context, cmd string) error { - s.Cmd.Args = append(s.Cmd.Args, cmd) - +func (s *localSession) Run(ctx context.Context) error { var err error commandFinished := make(chan struct{}) go func() { @@ -228,8 +243,7 @@ func (s *localSession) Run(ctx context.Context, cmd string) error { } } -func (s *localSession) Start(cmd string) error { - s.Cmd.Args = append(s.Cmd.Args, cmd) +func (s *localSession) Start() error { return rperrors.ClassifyCmdError(s.Cmd.Start()) } diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go index 810fbe15baba..fc7d38872781 100644 --- a/pkg/roachprod/roachprod.go +++ b/pkg/roachprod/roachprod.go @@ -457,10 +457,10 @@ func IP( if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*install.RunResultDetails, error) { node := nodes[i] res := &install.RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(ctx, node) + res.Stdout, res.Err = c.GetInternalIP(l, ctx, node) ips[i] = res.Stdout return res, err - }); err != nil { + }, install.DefaultSSHRetryOpts); err != nil { return nil, err } } @@ -681,7 +681,7 @@ func Monitor( if err != nil { return nil, err } - return c.Monitor(ctx, opts), nil + return c.Monitor(l, ctx, opts), nil } // StopOpts is used to pass options to Stop. @@ -872,10 +872,10 @@ func PgURL( if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*install.RunResultDetails, error) { node := nodes[i] res := &install.RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(ctx, node) + res.Stdout, res.Err = c.GetInternalIP(l, ctx, node) ips[i] = res.Stdout return res, err - }); err != nil { + }, install.DefaultSSHRetryOpts); err != nil { return nil, err } } @@ -1063,7 +1063,7 @@ func Pprof(l *logger.Logger, clusterName string, opts PprofOpts) error { outputFiles = append(outputFiles, outputFile) mu.Unlock() return res, nil - }) + }, install.DefaultSSHRetryOpts) for _, s := range outputFiles { l.Printf("Created %s", s) @@ -1724,7 +1724,7 @@ func sendCaptureCommand( } } return res, res.Err - }) + }, install.DefaultSSHRetryOpts) return err }