Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 47 additions & 21 deletions pkg/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ var defaultRunRetryOpt = retry.Options{
func runWithMaybeRetry(
l *logger.Logger,
retryOpts retry.Options,
shouldRetryFn func(details *RunResultDetails) bool,
shouldRetryFn func(*RunResultDetails) bool,
f func() (*RunResultDetails, error),
) (*RunResultDetails, error) {
var err error
Expand All @@ -135,11 +135,11 @@ func runWithMaybeRetry(
for r := retry.Start(retryOpts); r.Next(); {
res, err = f()
res.Attempt = r.CurrentAttempt() + 1
// nil err indicates a potentially retryable res.Err
// nil err (denoting a roachprod error) indicates a potentially retryable res.Err
if err == nil && res.Err != nil {
cmdErr = errors.CombineErrors(cmdErr, res.Err)
if shouldRetryFn != nil && shouldRetryFn(res) {
l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, defaultRunRetryOpt.MaxRetries+1)
l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1)
continue
}
}
Expand All @@ -154,8 +154,26 @@ func runWithMaybeRetry(
return res, err
}

func defaultShouldRetry(res *RunResultDetails) bool {
return errors.Is(res.Err, rperrors.ErrSSH255)
// runWithDefaultSSHRetry will retry an SSH command which returns an error with exit code 255
func runWithDefaultSSHRetry(
l *logger.Logger, f func() (*RunResultDetails, error),
) (*RunResultDetails, error) {
return runWithMaybeRetry(
l,
defaultRunRetryOpt,
func(res *RunResultDetails) bool { return errors.Is(res.Err, rperrors.ErrSSH255) },
f,
)
}

// scpWithDefaultRetry assumes that any error returned from an scp attempt is retryable
func scpWithDefaultRetry(l *logger.Logger, src, dest string) (*RunResultDetails, error) {
return runWithMaybeRetry(
l,
defaultRunRetryOpt,
func(*RunResultDetails) bool { return true },
func() (*RunResultDetails, error) { return scp(src, dest) },
)
}

// Host returns the public IP of a node.
Expand Down Expand Up @@ -1222,7 +1240,7 @@ tar cvf %[3]s certs
exit.WithCode(exit.UnspecifiedError())
}

tarfile, cleanup, err := c.getFileFromFirstNode(certsTarName)
tarfile, cleanup, err := c.getFileFromFirstNode(l, certsTarName)
if err != nil {
return err
}
Expand Down Expand Up @@ -1255,7 +1273,7 @@ func (c *SyncedCluster) DistributeTenantCerts(
return err
}

tarfile, cleanup, err := hostCluster.getFileFromFirstNode(tenantCertsTarName)
tarfile, cleanup, err := hostCluster.getFileFromFirstNode(l, tenantCertsTarName)
if err != nil {
return err
}
Expand Down Expand Up @@ -1343,7 +1361,9 @@ func (c *SyncedCluster) cockroachBinSupportsTenantScope(ctx context.Context, nod
// getFile retrieves the given file from the first node in the cluster. The
// filename is assumed to be relative from the home directory of the node's
// user.
func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error) {
func (c *SyncedCluster) getFileFromFirstNode(
l *logger.Logger, name string,
) (string, func(), error) {
var tmpfileName string
cleanup := func() {}
if c.IsLocal() {
Expand All @@ -1359,9 +1379,9 @@ func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error
}

srcFileName := fmt.Sprintf("%s@%s:%s", c.user(1), c.Host(1), name)
if err := c.scp(srcFileName, tmpfile.Name()); err != nil {
if res, _ := scpWithDefaultRetry(l, srcFileName, tmpfile.Name()); res.Err != nil {
cleanup()
return "", nil, err
return "", nil, res.Err
}
tmpfileName = tmpfile.Name()
}
Expand Down Expand Up @@ -1693,10 +1713,10 @@ func (c *SyncedCluster) Put(
return
}

err = c.scp(from, to)
results <- result{i, err}
res, _ := scpWithDefaultRetry(l, from, to)
results <- result{i, res.Err}

if err != nil {
if res.Err != nil {
// The copy failed. Re-add the original source.
pushSource(srcIndex)
} else {
Expand Down Expand Up @@ -2048,8 +2068,8 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err
return
}

err := c.scp(fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest)
if err == nil {
res, _ := scpWithDefaultRetry(l, fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest)
if res.Err == nil {
// Make sure all created files and directories are world readable.
// The CRDB process intentionally sets a 0007 umask (resulting in
// non-world-readable files). This creates annoyances during CI
Expand All @@ -2069,10 +2089,10 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err
}
return nil
}
err = filepath.Walk(dest, chmod)
res.Err = filepath.Walk(dest, chmod)
}

results <- result{i, err}
results <- result{i, res.Err}
}(i)
}

Expand Down Expand Up @@ -2243,7 +2263,10 @@ func (c *SyncedCluster) SSH(ctx context.Context, l *logger.Logger, sshArgs, args
return syscall.Exec(sshPath, allArgs, os.Environ())
}

func (c *SyncedCluster) scp(src, dest string) error {
// scp return type conforms to what runWithMaybeRetry expects. A nil error
// is always returned here since the only error that can happen is an scp error
// which we do want to be able to retry.
func scp(src, dest string) (*RunResultDetails, error) {
args := []string{
"scp", "-r", "-C",
"-o", "StrictHostKeyChecking=no",
Expand All @@ -2253,9 +2276,12 @@ func (c *SyncedCluster) scp(src, dest string) error {
cmd := exec.Command(args[0], args[1:]...)
out, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out)
err = errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out)
}
return nil

res := newRunResultDetails(-1, err)
res.CombinedOut = out
return res, nil
}

// ParallelResult captures the result of a user-defined function
Expand Down Expand Up @@ -2322,7 +2348,7 @@ func (c *SyncedCluster) ParallelE(
startNext := func() {
go func(i int) {
defer wg.Done()
res, err := runWithMaybeRetry(l, defaultRunRetryOpt, defaultShouldRetry, func() (*RunResultDetails, error) { return fn(i) })
res, err := runWithDefaultSSHRetry(l, func() (*RunResultDetails, error) { return fn(i) })
results <- ParallelResult{i, res.CombinedOut, err}
}(index)
index++
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachprod/install/cluster_synced_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestRunWithMaybeRetry(t *testing.T) {
attempt := 0
cases := []struct {
f func() (*RunResultDetails, error)
shouldRetryFn func(res *RunResultDetails) bool
shouldRetryFn func(*RunResultDetails) bool
expectedAttempts int
shouldError bool
}{
Expand Down