diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index 930cfade3628..7dd34b8ff453 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -124,7 +124,7 @@ var defaultRunRetryOpt = retry.Options{ func runWithMaybeRetry( l *logger.Logger, retryOpts retry.Options, - shouldRetryFn func(details *RunResultDetails) bool, + shouldRetryFn func(*RunResultDetails) bool, f func() (*RunResultDetails, error), ) (*RunResultDetails, error) { var err error @@ -135,11 +135,11 @@ func runWithMaybeRetry( for r := retry.Start(retryOpts); r.Next(); { res, err = f() res.Attempt = r.CurrentAttempt() + 1 - // nil err indicates a potentially retryable res.Err + // nil err (denoting a roachprod error) indicates a potentially retryable res.Err if err == nil && res.Err != nil { cmdErr = errors.CombineErrors(cmdErr, res.Err) if shouldRetryFn != nil && shouldRetryFn(res) { - l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, defaultRunRetryOpt.MaxRetries+1) + l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1) continue } } @@ -154,8 +154,26 @@ func runWithMaybeRetry( return res, err } -func defaultShouldRetry(res *RunResultDetails) bool { - return errors.Is(res.Err, rperrors.ErrSSH255) +// runWithDefaultSSHRetry will retry an SSH command which returns an error with exit code 255 +func runWithDefaultSSHRetry( + l *logger.Logger, f func() (*RunResultDetails, error), +) (*RunResultDetails, error) { + return runWithMaybeRetry( + l, + defaultRunRetryOpt, + func(res *RunResultDetails) bool { return errors.Is(res.Err, rperrors.ErrSSH255) }, + f, + ) +} + +// scpWithDefaultRetry assumes that any error returned from an scp attempt is retryable +func scpWithDefaultRetry(l *logger.Logger, src, dest string) (*RunResultDetails, error) { + return runWithMaybeRetry( + l, + defaultRunRetryOpt, + func(*RunResultDetails) bool { return true }, + func() (*RunResultDetails, error) { return scp(src, dest) }, + ) } // Host returns the public IP of a node. @@ -1222,7 +1240,7 @@ tar cvf %[3]s certs exit.WithCode(exit.UnspecifiedError()) } - tarfile, cleanup, err := c.getFileFromFirstNode(certsTarName) + tarfile, cleanup, err := c.getFileFromFirstNode(l, certsTarName) if err != nil { return err } @@ -1255,7 +1273,7 @@ func (c *SyncedCluster) DistributeTenantCerts( return err } - tarfile, cleanup, err := hostCluster.getFileFromFirstNode(tenantCertsTarName) + tarfile, cleanup, err := hostCluster.getFileFromFirstNode(l, tenantCertsTarName) if err != nil { return err } @@ -1343,7 +1361,9 @@ func (c *SyncedCluster) cockroachBinSupportsTenantScope(ctx context.Context, nod // getFile retrieves the given file from the first node in the cluster. The // filename is assumed to be relative from the home directory of the node's // user. -func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error) { +func (c *SyncedCluster) getFileFromFirstNode( + l *logger.Logger, name string, +) (string, func(), error) { var tmpfileName string cleanup := func() {} if c.IsLocal() { @@ -1359,9 +1379,9 @@ func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error } srcFileName := fmt.Sprintf("%s@%s:%s", c.user(1), c.Host(1), name) - if err := c.scp(srcFileName, tmpfile.Name()); err != nil { + if res, _ := scpWithDefaultRetry(l, srcFileName, tmpfile.Name()); res.Err != nil { cleanup() - return "", nil, err + return "", nil, res.Err } tmpfileName = tmpfile.Name() } @@ -1693,10 +1713,10 @@ func (c *SyncedCluster) Put( return } - err = c.scp(from, to) - results <- result{i, err} + res, _ := scpWithDefaultRetry(l, from, to) + results <- result{i, res.Err} - if err != nil { + if res.Err != nil { // The copy failed. Re-add the original source. pushSource(srcIndex) } else { @@ -2048,8 +2068,8 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err return } - err := c.scp(fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) - if err == nil { + res, _ := scpWithDefaultRetry(l, fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) + if res.Err == nil { // Make sure all created files and directories are world readable. // The CRDB process intentionally sets a 0007 umask (resulting in // non-world-readable files). This creates annoyances during CI @@ -2069,10 +2089,10 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err } return nil } - err = filepath.Walk(dest, chmod) + res.Err = filepath.Walk(dest, chmod) } - results <- result{i, err} + results <- result{i, res.Err} }(i) } @@ -2243,7 +2263,10 @@ func (c *SyncedCluster) SSH(ctx context.Context, l *logger.Logger, sshArgs, args return syscall.Exec(sshPath, allArgs, os.Environ()) } -func (c *SyncedCluster) scp(src, dest string) error { +// scp return type conforms to what runWithMaybeRetry expects. A nil error +// is always returned here since the only error that can happen is an scp error +// which we do want to be able to retry. +func scp(src, dest string) (*RunResultDetails, error) { args := []string{ "scp", "-r", "-C", "-o", "StrictHostKeyChecking=no", @@ -2253,9 +2276,12 @@ func (c *SyncedCluster) scp(src, dest string) error { cmd := exec.Command(args[0], args[1:]...) out, err := cmd.CombinedOutput() if err != nil { - return errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out) + err = errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out) } - return nil + + res := newRunResultDetails(-1, err) + res.CombinedOut = out + return res, nil } // ParallelResult captures the result of a user-defined function @@ -2322,7 +2348,7 @@ func (c *SyncedCluster) ParallelE( startNext := func() { go func(i int) { defer wg.Done() - res, err := runWithMaybeRetry(l, defaultRunRetryOpt, defaultShouldRetry, func() (*RunResultDetails, error) { return fn(i) }) + res, err := runWithDefaultSSHRetry(l, func() (*RunResultDetails, error) { return fn(i) }) results <- ParallelResult{i, res.CombinedOut, err} }(index) index++ diff --git a/pkg/roachprod/install/cluster_synced_test.go b/pkg/roachprod/install/cluster_synced_test.go index c0490e159ef0..a5e58706a69b 100644 --- a/pkg/roachprod/install/cluster_synced_test.go +++ b/pkg/roachprod/install/cluster_synced_test.go @@ -97,7 +97,7 @@ func TestRunWithMaybeRetry(t *testing.T) { attempt := 0 cases := []struct { f func() (*RunResultDetails, error) - shouldRetryFn func(res *RunResultDetails) bool + shouldRetryFn func(*RunResultDetails) bool expectedAttempts int shouldError bool }{