diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index 2e272fc812c5..72da64eb3546 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//pkg/internal/team", "//pkg/roachprod", "//pkg/roachprod/config", + "//pkg/roachprod/errors", "//pkg/roachprod/install", "//pkg/roachprod/logger", "//pkg/roachprod/prometheus", diff --git a/pkg/cmd/roachtest/github.go b/pkg/cmd/roachtest/github.go index b7e543754522..05768e49d606 100644 --- a/pkg/cmd/roachtest/github.go +++ b/pkg/cmd/roachtest/github.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/internal/team" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" ) @@ -32,6 +33,14 @@ type githubIssues struct { teamLoader func() (team.Map, error) } +type issueCategory int + +const ( + otherErr issueCategory = iota + clusterCreationErr + sshErr +) + func newGithubIssues( disable bool, c *clusterImpl, vmCreateOpts *vm.CreateOpts, l *logger.Logger, ) *githubIssues { @@ -59,10 +68,32 @@ func (g *githubIssues) shouldPost(t test.Test) bool { t.Spec().(*registry.TestSpec).Cluster.NodeCount > 0 } -func (g *githubIssues) createPostRequest(t test.Test, message string) issues.PostRequest { +func (g *githubIssues) createPostRequest( + t test.Test, cat issueCategory, message string, +) issues.PostRequest { var mention []string var projColID int + issueOwner := t.Spec().(*registry.TestSpec).Owner + issueName := t.Name() + + messagePrefix := "" + // Overrides to shield eng teams from potential flakes + if cat == clusterCreationErr { + issueOwner = registry.OwnerDevInf + issueName = "cluster_creation" + messagePrefix = fmt.Sprintf("test %s was skipped due to ", t.Name()) + } else if cat == sshErr { + issueOwner = registry.OwnerTestEng + issueName = "ssh_problem" + messagePrefix = fmt.Sprintf("test %s failed due to ", t.Name()) + } + + teams, err := g.teamLoader() + if err != nil { + t.Fatalf("could not load teams: %v", err) + } + // Issues posted from roachtest are identifiable as such and // they are also release blockers (this label may be removed // by a human upon closer investigation). @@ -72,12 +103,7 @@ func (g *githubIssues) createPostRequest(t test.Test, message string) issues.Pos labels = append(labels, "release-blocker") } - teams, err := g.teamLoader() - if err != nil { - t.Fatalf("could not load teams: %v", err) - } - - if sl, ok := teams.GetAliasesForPurpose(ownerToAlias(t.Spec().(*registry.TestSpec).Owner), team.PurposeRoachtest); ok { + if sl, ok := teams.GetAliasesForPurpose(ownerToAlias(issueOwner), team.PurposeRoachtest); ok { for _, alias := range sl { mention = append(mention, "@"+string(alias)) if label := teams[alias].Label; label != "" { @@ -115,8 +141,8 @@ func (g *githubIssues) createPostRequest(t test.Test, message string) issues.Pos MentionOnCreate: mention, ProjectColumnID: projColID, PackageName: "roachtest", - TestName: t.Name(), - Message: message, + TestName: issueName, + Message: messagePrefix + message, Artifacts: artifacts, ExtraLabels: labels, ExtraParams: clusterParams, @@ -133,14 +159,24 @@ func (g *githubIssues) createPostRequest(t test.Test, message string) issues.Pos } } -func (g *githubIssues) MaybePost(t test.Test, message string) error { +func (g *githubIssues) MaybePost(t *testImpl, message string) error { if !g.shouldPost(t) { return nil } + cat := otherErr + + // Overrides to shield eng teams from potential flakes + firstFailure := t.firstFailure() + if failureContainsError(firstFailure, errClusterProvisioningFailed) { + cat = clusterCreationErr + } else if failureContainsError(firstFailure, rperrors.ErrSSH255) { + cat = sshErr + } + return g.issuePoster( context.Background(), issues.UnitTestFormatter, - g.createPostRequest(t, message), + g.createPostRequest(t, cat, message), ) } diff --git a/pkg/cmd/roachtest/github_test.go b/pkg/cmd/roachtest/github_test.go index baae12205bc4..094aed9ba900 100644 --- a/pkg/cmd/roachtest/github_test.go +++ b/pkg/cmd/roachtest/github_test.go @@ -30,7 +30,11 @@ var ( aliases: cockroachdb/rfc-prs: other triage_column_id: 0 - label: T-testeng` +cockroachdb/test-eng: + label: T-testeng + triage_column_id: 14041337 +cockroachdb/dev-inf: + triage_column_id: 10210759` validTeamsFn = func() (team.Map, error) { return loadYamlTeams(teamsYaml) } invalidTeamsFn = func() (team.Map, error) { return loadYamlTeams("invalid yaml") } @@ -58,7 +62,7 @@ func TestShouldPost(t *testing.T) { envTcBuildBranch string expected bool }{ - /* Cases 1 - 4 verify that issues are not posted if any of on the relevant criteria checks fail */ + /* Cases 1 - 4 verify that issues are not posted if any of the relevant criteria checks fail */ // disable {true, 1, "token", "master", false}, // nodeCount @@ -103,10 +107,11 @@ func TestCreatePostRequest(t *testing.T) { clusterCreationFailed bool loadTeamsFailed bool localSSD bool + category issueCategory expectedPost bool expectedParams map[string]string }{ - {true, false, false, false, true, + {true, false, false, false, otherErr, true, prefixAll(map[string]string{ "cloud": "gce", "encrypted": "false", @@ -116,7 +121,7 @@ func TestCreatePostRequest(t *testing.T) { "localSSD": "false", }), }, - {true, false, false, true, true, + {true, false, false, true, clusterCreationErr, true, prefixAll(map[string]string{ "cloud": "gce", "encrypted": "false", @@ -129,7 +134,7 @@ func TestCreatePostRequest(t *testing.T) { // Assert that release-blocker label exists when !nonReleaseBlocker // Also ensure that in the event of a failed cluster creation, // nil `vmOptions` and `clusterImpl` are not dereferenced - {false, true, false, false, true, + {false, true, false, false, sshErr, true, prefixAll(map[string]string{ "cloud": "gce", "ssd": "0", @@ -137,7 +142,7 @@ func TestCreatePostRequest(t *testing.T) { }), }, //Simulate failure loading TEAMS.yaml - {true, false, true, false, false, nil}, + {true, false, true, false, otherErr, false, nil}, } reg, _ := makeTestRegistry(spec.GCE, "", "", false) @@ -146,7 +151,7 @@ func TestCreatePostRequest(t *testing.T) { clusterSpec := reg.MakeClusterSpec(1) testSpec := ®istry.TestSpec{ - Name: "githubPost", + Name: "github_test", Owner: OwnerUnitTest, Cluster: clusterSpec, NonReleaseBlocker: c.nonReleaseBlocker, @@ -184,9 +189,9 @@ func TestCreatePostRequest(t *testing.T) { if c.loadTeamsFailed { // Assert that if TEAMS.yaml cannot be loaded then function panics. - assert.Panics(t, func() { github.createPostRequest(ti, "message") }) + assert.Panics(t, func() { github.createPostRequest(ti, c.category, "message") }) } else { - req := github.createPostRequest(ti, "message") + req := github.createPostRequest(ti, c.category, "message") if c.expectedParams != nil { require.Equal(t, c.expectedParams, req.ExtraParams) @@ -197,7 +202,29 @@ func TestCreatePostRequest(t *testing.T) { if !c.nonReleaseBlocker { require.True(t, contains(req.ExtraLabels, nil, "release-blocker")) } - require.Contains(t, req.ExtraLabels, "T-testeng") + + expectedTeam := "@cockroachdb/unowned" + expectedName := "github_test" + expectedLabel := "" + expectedMessagePrefix := "" + + if c.category == clusterCreationErr { + expectedTeam = "@cockroachdb/dev-inf" + expectedName = "cluster_creation" + expectedMessagePrefix = "test github_test was skipped due to " + } else if c.category == sshErr { + expectedTeam = "@cockroachdb/test-eng" + expectedName = "ssh_problem" + expectedLabel = "T-testeng" + expectedMessagePrefix = "test github_test failed due to " + } + + require.Contains(t, req.MentionOnCreate, expectedTeam) + require.Equal(t, expectedName, req.TestName) + require.True(t, strings.HasPrefix(req.Message, expectedMessagePrefix), req.Message) + if expectedLabel != "" { + require.Contains(t, req.ExtraLabels, expectedLabel) + } } } } diff --git a/pkg/cmd/roachtest/main.go b/pkg/cmd/roachtest/main.go index 2ef3a680e96f..8694704fc7c3 100644 --- a/pkg/cmd/roachtest/main.go +++ b/pkg/cmd/roachtest/main.go @@ -338,7 +338,7 @@ runner itself. if errors.Is(err, errTestsFailed) { code = ExitCodeTestsFailed } - if errors.Is(err, errClusterProvisioningFailed) { + if errors.Is(err, errSomeClusterProvisioningFailed) { code = ExitCodeClusterProvisioningFailed } // Cobra has already printed the error message. @@ -376,8 +376,15 @@ func runTests(register func(registry.Registry), cfg cliCfg) error { filter := registry.NewTestFilter(cfg.args) clusterType := roachprodCluster + bindTo := "" if local { clusterType = localCluster + + // This will suppress the annoying "Allow incoming network connections" popup from + // OSX when running a roachtest + bindTo = "localhost" + + fmt.Printf("--local specified. Binding http listener to localhost only") if cfg.parallelism != 1 { fmt.Printf("--local specified. Overriding --parallelism to 1.\n") cfg.parallelism = 1 @@ -392,7 +399,7 @@ func runTests(register func(registry.Registry), cfg cliCfg) error { keepClustersOnTestFailure: cfg.debugEnabled, clusterID: cfg.clusterID, } - if err := runner.runHTTPServer(cfg.httpPort, os.Stdout); err != nil { + if err := runner.runHTTPServer(cfg.httpPort, os.Stdout, bindTo); err != nil { return err } diff --git a/pkg/cmd/roachtest/test/test_interface.go b/pkg/cmd/roachtest/test/test_interface.go index 6ccbc94adbb9..7da9b1c030d5 100644 --- a/pkg/cmd/roachtest/test/test_interface.go +++ b/pkg/cmd/roachtest/test/test_interface.go @@ -37,6 +37,7 @@ type Test interface { VersionsBinaryOverride() map[string]string Skip(args ...interface{}) Skipf(format string, args ...interface{}) + Error(args ...interface{}) Errorf(string, ...interface{}) FailNow() Fatal(args ...interface{}) diff --git a/pkg/cmd/roachtest/test_impl.go b/pkg/cmd/roachtest/test_impl.go index d3725a639a54..43ac5482b1a7 100644 --- a/pkg/cmd/roachtest/test_impl.go +++ b/pkg/cmd/roachtest/test_impl.go @@ -11,13 +11,10 @@ package main import ( - "bytes" "context" "fmt" "io" - // For the debug http handlers. - _ "net/http/pprof" - "runtime" + "os" "strings" "time" @@ -27,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/version" + "github.com/cockroachdb/errors" "github.com/petermattis/goid" ) @@ -41,6 +39,18 @@ type testStatus struct { progress float64 } +// Holds all error information from a single invocation of t.{Fatal,Error}{,f} to +// preserve any structured errors +// e.g. t.Fatalf("foo %s %s %s", "hello", err1, err2) would mean that +// failure.errors == [err1, err2], with all args (including the non error "hello") +// being captured in the squashedErr +type failure struct { + // This is the single error created from variadic args passed to t.{Fatal,Error}{,f} + squashedErr error + // errors are all the `errors` present in the variadic args + errors []error +} + type testImpl struct { spec *registry.TestSpec @@ -71,23 +81,26 @@ type testImpl struct { mu struct { syncutil.RWMutex - done bool - failed bool - timeout bool // if failed == true, this indicates whether the test timed out + done bool + // cancel, if set, is called from the t.Fatal() family of functions when the // test is being marked as failed (i.e. when the failed field above is also // set). This is used to cancel the context passed to t.spec.Run(), so async // test goroutines can be notified. - cancel func() - failLoc struct { - file string - line int - } - failureMsg string + cancel func() + + // failures added via addFailures, in order + // A test can have multiple calls to t.Fail()/Error(), with each call + // referencing 0+ errors. failure captures all the errors + failures []failure + // status is a map from goroutine id to status set by that goroutine. A // special goroutine is indicated by runnerID; that one provides the test's // "main status". status map[int64]testStatus + + // TODO(test-eng): this should just be an in-mem (ring) buffer attached to + // `t.L()`. output []byte } // Map from version to path to the cockroach binary to be used when @@ -99,6 +112,10 @@ type testImpl struct { versionsBinaryOverride map[string]string } +func newFailure(squashedErr error, errs []error) failure { + return failure{squashedErr: squashedErr, errors: errs} +} + // BuildVersion exposes the build version of the cluster // in this test. func (t *testImpl) BuildVersion() *version.Version { @@ -246,6 +263,17 @@ func (t *testImpl) Skipf(format string, args ...interface{}) { panic(errTestFatal) } +// collectErrors extracts any arg that is an error +func collectErrors(args []interface{}) []error { + var errs []error + for _, a := range args { + if err, ok := a.(error); ok { + errs = append(errs, err) + } + } + return errs +} + // Fatal marks the test as failed, prints the args to t.L(), and calls // panic(errTestFatal). It can be called multiple times. // @@ -255,148 +283,88 @@ func (t *testImpl) Skipf(format string, args ...interface{}) { // ATTENTION: Since this calls panic(errTestFatal), it should only be called // from a test's closure. The test runner itself should never call this. func (t *testImpl) Fatal(args ...interface{}) { - t.markFailedInner("" /* format */, args...) + t.addFailure("", args...) panic(errTestFatal) } // Fatalf is like Fatal, but takes a format string. func (t *testImpl) Fatalf(format string, args ...interface{}) { - t.markFailedInner(format, args...) + t.addFailure(format, args...) panic(errTestFatal) } // FailNow implements the TestingT interface. func (t *testImpl) FailNow() { - t.Fatal() + t.addFailure("FailNow called") + panic(errTestFatal) } -// Errorf implements the TestingT interface. -func (t *testImpl) Errorf(format string, args ...interface{}) { - t.markFailedInner(format, args...) +// Error implements the TestingT interface +func (t *testImpl) Error(args ...interface{}) { + t.addFailure("", args...) } -func (t *testImpl) markFailedInner(format string, args ...interface{}) { - // Skip two frames: our own and the caller. - if format != "" { - t.printfAndFail(2 /* skip */, format, args...) - } else { - t.printAndFail(2 /* skip */, args...) - } +// Errorf implements the TestingT interface. +func (t *testImpl) Errorf(format string, args ...interface{}) { + t.addFailure(format, args...) } -func (t *testImpl) printAndFail(skip int, args ...interface{}) { - var msg string - if len(args) == 1 { - // If we were passed only an error, then format it with "%+v" in order to - // get any stack traces. - if err, ok := args[0].(error); ok { - msg = fmt.Sprintf("%+v", err) +// We take the first error from each failure which is the +// "squashed" error that contains all information of a failure +func formatFailure(b *strings.Builder, reportFailures ...failure) { + for i, failure := range reportFailures { + if i > 0 { + fmt.Fprintln(b) } + file, line, fn, ok := errors.GetOneLineSource(failure.squashedErr) + if !ok { + file, line, fn = "", 0, "unknown" + } + fmt.Fprintf(b, "(%s:%d).%s: %v", file, line, fn, failure.squashedErr) } - if msg == "" { - msg = fmt.Sprint(args...) - } - t.failWithMsg(t.decorate(skip+1, msg)) } -func (t *testImpl) printfAndFail(skip int, format string, args ...interface{}) { +func (t *testImpl) addFailure(format string, args ...interface{}) { if format == "" { - panic(fmt.Sprintf("invalid empty format. args: %s", args)) + format = strings.Repeat(" %v", len(args))[1:] } - t.failWithMsg(t.decorate(skip+1, fmt.Sprintf(format, args...))) -} + reportFailure := newFailure(errors.NewWithDepthf(1, format, args...), collectErrors(args)) -func (t *testImpl) failWithMsg(msg string) { t.mu.Lock() defer t.mu.Unlock() - prefix := "" - if t.mu.failed { - prefix = "[not the first failure] " - // NB: the first failure is not always the relevant one due to: - // https://github.com/cockroachdb/cockroach/issues/44436 - // - // So we chain all failures together in the order in which we see - // them. - msg = "\n" + msg + t.mu.failures = append(t.mu.failures, reportFailure) + + var b strings.Builder + formatFailure(&b, reportFailure) + msg := b.String() + + t.L().Printf("test failure #%d: %s", len(t.mu.failures), msg) + // Also dump the verbose error (incl. all stack traces) to a log file, in case + // we need it. The stacks are sometimes helpful, but we don't want them in the + // main log as they are highly verbose. + { + cl, err := t.L().ChildLogger( + fmt.Sprintf("failure_%d", len(t.mu.failures)), + logger.QuietStderr, logger.QuietStdout, + ) + if err == nil { + // We don't actually log through this logger since it adds an unrelated + // file:line caller (namely ours). The error already has stack traces + // so it's better to write only it to the file to avoid confusion. + path := cl.File.Name() + cl.Close() // we just wanted the filename + _ = os.WriteFile(path, []byte(fmt.Sprintf("%+v", reportFailure.squashedErr)), 0644) + } } - t.L().Printf("%stest failure: %s", prefix, msg) - t.mu.failed = true - t.mu.failureMsg += msg t.mu.output = append(t.mu.output, msg...) + t.mu.output = append(t.mu.output, '\n') if t.mu.cancel != nil { t.mu.cancel() } } -// Args: -// skip: The number of stack frames to exclude from the result. 0 means that -// -// the caller will be the first frame identified. 1 means the caller's caller -// will be the first, etc. -func (t *testImpl) decorate(skip int, s string) string { - // Skip two extra frames to account for this function and runtime.Callers - // itself. - var pc [50]uintptr - n := runtime.Callers(2+skip, pc[:]) - if n == 0 { - panic("zero callers found") - } - - buf := new(bytes.Buffer) - frames := runtime.CallersFrames(pc[:n]) - sep := "\t" - runnerFound := false - for { - if runnerFound { - break - } - - frame, more := frames.Next() - if !more { - break - } - if frame.Function == t.runner { - runnerFound = true - - // Handle the special case of the runner function being the caller of - // t.Fatal(). In that case, that's the line to be used for issue creation. - if t.mu.failLoc.file == "" { - t.mu.failLoc.file = frame.File - t.mu.failLoc.line = frame.Line - } - } - if !t.mu.failed && !runnerFound { - // Keep track of the highest stack frame that is lower than the t.runner - // stack frame. This is used to determine the author of that line of code - // and issue assignment. - t.mu.failLoc.file = frame.File - t.mu.failLoc.line = frame.Line - } - file := frame.File - if index := strings.LastIndexByte(file, '/'); index >= 0 { - file = file[index+1:] - } - fmt.Fprintf(buf, "%s%s:%d", sep, file, frame.Line) - sep = "," - } - buf.WriteString(": ") - - lines := strings.Split(s, "\n") - if l := len(lines); l > 1 && lines[l-1] == "" { - lines = lines[:l-1] - } - for i, line := range lines { - if i > 0 { - buf.WriteString("\n\t\t") - } - buf.WriteString(line) - } - buf.WriteByte('\n') - return buf.String() -} - func (t *testImpl) duration() time.Duration { return t.end.Sub(t.start) } @@ -404,13 +372,39 @@ func (t *testImpl) duration() time.Duration { func (t *testImpl) Failed() bool { t.mu.RLock() defer t.mu.RUnlock() - return t.mu.failed + return t.failedRLocked() +} + +func (t *testImpl) failedRLocked() bool { + return len(t.mu.failures) > 0 } -func (t *testImpl) FailureMsg() string { +func (t *testImpl) firstFailure() failure { t.mu.RLock() defer t.mu.RUnlock() - return t.mu.failureMsg + if len(t.mu.failures) <= 0 { + return failure{} + } + return t.mu.failures[0] +} + +func (t *testImpl) failureMsg() string { + t.mu.RLock() + defer t.mu.RUnlock() + var b strings.Builder + formatFailure(&b, t.mu.failures...) + return b.String() +} + +// failureContainsError returns true if any of the errors in a given failure +// matches the reference error +func failureContainsError(f failure, refError error) bool { + for _, err := range f.errors { + if errors.Is(err, refError) { + return true + } + } + return errors.Is(f.squashedErr, refError) } func (t *testImpl) ArtifactsDir() string { diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index 1b62b7527816..27eb3ad73fb6 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -50,8 +50,13 @@ import ( ) var ( - errTestsFailed = fmt.Errorf("some tests failed") - errClusterProvisioningFailed = fmt.Errorf("some clusters could not be created") + errTestsFailed = fmt.Errorf("some tests failed") + + // reference error used by main.go at the end of a run of tests + errSomeClusterProvisioningFailed = fmt.Errorf("some clusters could not be created") + + // reference error used when cluster creation fails for a test + errClusterProvisioningFailed = fmt.Errorf("cluster could not be created") ) // testRunner runs tests. @@ -309,7 +314,7 @@ func (r *testRunner) Run( if r.numClusterErrs > 0 { shout(ctx, l, lopt.stdout, "%d clusters could not be created", r.numClusterErrs) - return errClusterProvisioningFailed + return errSomeClusterProvisioningFailed } if len(r.status.fail) > 0 { @@ -571,27 +576,29 @@ func (r *testRunner) runWorker( wStatus.SetStatus("creating cluster") c, vmCreateOpts, clusterCreateErr = allocateCluster(ctx, testToRun.spec, testToRun.alloc, artifactsRootDir, wStatus) if clusterCreateErr != nil { + clusterCreateErr = errors.Mark(clusterCreateErr, errClusterProvisioningFailed) atomic.AddInt32(&r.numClusterErrs, 1) shout(ctx, l, stdout, "Unable to create (or reuse) cluster for test %s due to: %s.", testToRun.spec.Name, clusterCreateErr) } } - // Prepare the test's logger. - logPath := "" - var artifactsDir string - var artifactsSpec string - if artifactsRootDir != "" { - escapedTestName := teamCityNameEscape(testToRun.spec.Name) - runSuffix := "run_" + strconv.Itoa(testToRun.runNum) - - artifactsDir = filepath.Join(filepath.Join(artifactsRootDir, escapedTestName), runSuffix) - logPath = filepath.Join(artifactsDir, "test.log") - - // Map artifacts/TestFoo/run_?/** => TestFoo/run_?/**, i.e. collect the artifacts - // for this test exactly as they are laid out on disk (when the time - // comes). - artifactsSpec = fmt.Sprintf("%s/%s/** => %s/%s", filepath.Join(literalArtifactsDir, escapedTestName), runSuffix, escapedTestName, runSuffix) + // Prepare the test's logger. Always set this up with real files, using a + // temp dir if necessary. This simplifies testing. + if artifactsRootDir == "" { + artifactsRootDir, _ = os.MkdirTemp("", "roachtest-logger") } + + escapedTestName := teamCityNameEscape(testToRun.spec.Name) + runSuffix := "run_" + strconv.Itoa(testToRun.runNum) + + artifactsDir := filepath.Join(filepath.Join(artifactsRootDir, escapedTestName), runSuffix) + logPath := filepath.Join(artifactsDir, "test.log") + + // Map artifacts/TestFoo/run_?/** => TestFoo/run_?/**, i.e. collect the artifacts + // for this test exactly as they are laid out on disk (when the time + // comes). + artifactsSpec := fmt.Sprintf("%s/%s/** => %s/%s", filepath.Join(literalArtifactsDir, escapedTestName), runSuffix, escapedTestName, runSuffix) + testL, err := logger.RootLogger(logPath, teeOpt) if err != nil { return err @@ -617,24 +624,13 @@ func (r *testRunner) runWorker( // N.B. cluster creation must have failed... // We don't want to prematurely abort the test suite since it's likely a transient issue. // Instead, let's report an infrastructure issue, mark the test as failed and continue with the next test. - // Note, we fake the test name so that all cluster creation errors are posted to the same github issue. - oldName := t.spec.Name - oldOwner := t.spec.Owner + // Generate failure reason and mark the test failed to preclude fetching (cluster) artifacts. - t.printAndFail(0, clusterCreateErr) - issueOutput := "test %s was skipped due to %s" - issueOutput = fmt.Sprintf(issueOutput, oldName, t.FailureMsg()) + t.Error(clusterCreateErr) // N.B. issue title is of the form "roachtest: ${t.spec.Name} failed" (see UnitTestFormatter). - t.spec.Name = "cluster_creation" - t.spec.Owner = registry.OwnerDevInf - - if err := github.MaybePost(t, issueOutput); err != nil { + if err := github.MaybePost(t, t.failureMsg()); err != nil { shout(ctx, l, stdout, "failed to post issue: %s", err) } - - // Restore test name and owner. - t.spec.Name = oldName - t.spec.Owner = oldOwner } else { c.setTest(t) err = c.PutLibraries(ctx, "./lib", t.spec.NativeLibs) @@ -668,7 +664,7 @@ func (r *testRunner) runWorker( shout(ctx, l, stdout, "test returned error: %s: %s", t.Name(), err) // Mark the test as failed if it isn't already. if !t.Failed() { - t.printAndFail(0 /* skip */, err) + t.Error(err) } } else { msg := "test passed: %s (run %d)" @@ -684,7 +680,7 @@ func (r *testRunner) runWorker( if err != nil { failureMsg += fmt.Sprintf("%+v", err) } else { - failureMsg += t.FailureMsg() + failureMsg += t.failureMsg() } if c != nil { if debug { @@ -807,10 +803,7 @@ func (r *testRunner) runTest( // goroutine accidentally ends up calling t.Fatal; the test runs in a // different goroutine. if err := recover(); err != nil && err != errTestFatal { - t.mu.Lock() - t.mu.failed = true - t.mu.output = append(t.mu.output, t.decorate(0 /* skip */, fmt.Sprint(err))...) - t.mu.Unlock() + t.Error(err) } t.mu.Lock() @@ -819,9 +812,7 @@ func (r *testRunner) runTest( durationStr := fmt.Sprintf("%.2fs", t.duration().Seconds()) if t.Failed() { - t.mu.Lock() - output := fmt.Sprintf("test artifacts and logs in: %s\n", t.ArtifactsDir()) + string(t.mu.output) - t.mu.Unlock() + output := fmt.Sprintf("test artifacts and logs in: %s\n%s", t.ArtifactsDir(), t.failureMsg()) if teamCity { shout(ctx, l, stdout, "##teamcity[testFailed name='%s' details='%s' flowId='%s']", @@ -867,7 +858,7 @@ func (r *testRunner) runTest( start: t.start, end: t.end, pass: !t.Failed(), - failure: t.FailureMsg(), + failure: t.failureMsg(), }) r.status.Lock() delete(r.status.running, t) @@ -922,6 +913,8 @@ func (r *testRunner) runTest( // produced by t.Fatal*(). if r := recover(); r != nil && r != errTestFatal { // NB: we're careful to avoid t.Fatalf here, which re-panics. + // Note that the error will be logged to a file, and the stack will + // contain the source of the panic. t.Errorf("test panicked: %v", r) } }() @@ -1193,14 +1186,15 @@ func (r *testRunner) removeWorker(ctx context.Context, name string) { // runHTTPServer starts a server running in the background. // // httpPort: The port on which to serve the web interface. Pass 0 for allocating +// bindTo: The host/ip on which to bind. Leave empty to bind on all local ips // // a port automatically (which will be printed to stdout). -func (r *testRunner) runHTTPServer(httpPort int, stdout io.Writer) error { +func (r *testRunner) runHTTPServer(httpPort int, stdout io.Writer, bindTo string) error { http.HandleFunc("/", r.serveHTTP) // Run an http server in the background. // We handle the case where httpPort is 0, which means we automatically // allocate a port. - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", httpPort)) + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", bindTo, httpPort)) if err != nil { return err } @@ -1210,7 +1204,11 @@ func (r *testRunner) runHTTPServer(httpPort int, stdout io.Writer) error { panic(err) } }() - fmt.Fprintf(stdout, "HTTP server listening on all network interfaces, port %d.\n", httpPort) + bindToDesc := "all network interfaces" + if bindTo != "" { + bindToDesc = bindTo + } + fmt.Fprintf(stdout, "HTTP server listening on %s, port %d.\n", bindToDesc, httpPort) return nil } diff --git a/pkg/cmd/roachtest/test_test.go b/pkg/cmd/roachtest/test_test.go index 0ed208df8c98..b61659325479 100644 --- a/pkg/cmd/roachtest/test_test.go +++ b/pkg/cmd/roachtest/test_test.go @@ -15,6 +15,7 @@ import ( "context" "io" "math/rand" + "path/filepath" "regexp" "sort" "strings" @@ -184,6 +185,7 @@ func TestRunnerRun(t *testing.T) { if exp := c.expOut; exp != "" && !strings.Contains(out, exp) { t.Fatalf("'%s' not found in output:\n%s", exp, out) } + t.Log(out) }) } } @@ -253,7 +255,13 @@ func setupRunnerTest(t *testing.T, r testRegistryImpl, testFilters []string) *ru var stdout syncedBuffer var stderr syncedBuffer lopt := loggingOpt{ - l: nilLogger(), + l: func() *logger.Logger { + l, err := logger.RootLogger(filepath.Join(t.TempDir(), "test.log"), logger.NoTee) + if err != nil { + panic(err) + } + return l + }(), tee: logger.NoTee, stdout: &stdout, stderr: &stderr, diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 5ebbdb67619b..bfa1237799d1 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -178,6 +178,7 @@ go_library( "//pkg/kv", "//pkg/roachpb", "//pkg/roachprod", + "//pkg/roachprod/errors", "//pkg/roachprod/install", "//pkg/roachprod/logger", "//pkg/roachprod/prometheus", diff --git a/pkg/cmd/roachtest/tests/activerecord.go b/pkg/cmd/roachtest/tests/activerecord.go index 90c2d2dcfd93..61a29b633621 100644 --- a/pkg/cmd/roachtest/tests/activerecord.go +++ b/pkg/cmd/roachtest/tests/activerecord.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/errors" ) @@ -181,16 +182,10 @@ func registerActiveRecord(r registry.Registry) { `sudo RUBYOPT="-W0" TESTOPTS="-v" bundle exec rake test`, ) - // Expected to fail but we should still scan the error to check if - // there's an SSH/roachprod error. - if err != nil { - // install.NonZeroExitCode includes unrelated to SSH errors ("255") - // or roachprod errors, so we call t.Fatal if the error is not an - // install.NonZeroExitCode error - commandError := (*install.NonZeroExitCode)(nil) - if !errors.As(err, &commandError) { - t.Fatal(err) - } + // Fatal for a roachprod or SSH error. A roachprod error is when result.Err==nil. + // Proceed for any other (command) errors + if err != nil && (result.Err == nil || errors.Is(err, rperrors.ErrSSH255)) { + t.Fatal(err) } // Result error contains stdout, stderr, and any error content returned by exec package. diff --git a/pkg/cmd/roachtest/tests/django.go b/pkg/cmd/roachtest/tests/django.go index db00c1493949..d6f4155a04e2 100644 --- a/pkg/cmd/roachtest/tests/django.go +++ b/pkg/cmd/roachtest/tests/django.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/errors" ) @@ -186,16 +187,10 @@ func registerDjango(r registry.Registry) { t.Status("Running django test app ", testName) result, err := c.RunWithDetailsSingleNode(ctx, t.L(), node, fmt.Sprintf(djangoRunTestCmd, testName)) - // Expected to fail but we should still scan the error to check if - // there's an SSH/roachprod error. - if err != nil { - // install.NonZeroExitCode includes unrelated to SSH errors ("255") - // or roachprod errors, so we call t.Fatal if the error is not an - // install.NonZeroExitCode error - commandError := (*install.NonZeroExitCode)(nil) - if !errors.As(err, &commandError) { - t.Fatal(err) - } + // Fatal for a roachprod or SSH error. A roachprod error is when result.Err==nil. + // Proceed for any other (command) errors + if err != nil && (result.Err == nil || errors.Is(err, rperrors.ErrSSH255)) { + t.Fatal(err) } rawResults := []byte(result.Stdout + result.Stderr) diff --git a/pkg/cmd/roachtest/tests/gopg.go b/pkg/cmd/roachtest/tests/gopg.go index b1e54a4ac741..1424536cda3f 100644 --- a/pkg/cmd/roachtest/tests/gopg.go +++ b/pkg/cmd/roachtest/tests/gopg.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/errors" ) @@ -115,16 +116,10 @@ func registerGopg(r registry.Registry) { destPath, removeColorCodes, resultsFilePath), ) - // Expected to fail but we should still scan the error to check if - // there's an SSH/roachprod error. - if err != nil { - // install.NonZeroExitCode includes unrelated to SSH errors ("255") - // or roachprod errors, so we call t.Fatal if the error is not an - // install.NonZeroExitCode error - commandError := (*install.NonZeroExitCode)(nil) - if !errors.As(err, &commandError) { - t.Fatal(err) - } + // Fatal for a roachprod or SSH error. A roachprod error is when result.Err==nil. + // Proceed for any other (command) errors + if err != nil && (result.Err == nil || errors.Is(err, rperrors.ErrSSH255)) { + t.Fatal(err) } rawResults := []byte(result.Stdout + result.Stderr) @@ -152,16 +147,10 @@ func registerGopg(r registry.Registry) { destPath, goPath, resultsFilePath, goPath), ) - // Expected to fail but we should still scan the error to check if - // there's an SSH/roachprod error. - if err != nil { - // install.NonZeroExitCode includes unrelated to SSH errors ("255") - // or roachprod errors, so we call t.Fatal if the error is not an - // install.NonZeroExitCode error - commandError := (*install.NonZeroExitCode)(nil) - if !errors.As(err, &commandError) { - t.Fatal(err) - } + // Fatal for a roachprod or SSH error. A roachprod error is when result.Err==nil. + // Proceed for any other (command) errors + if err != nil && (result.Err == nil || errors.Is(err, rperrors.ErrSSH255)) { + t.Fatal(err) } xmlResults := []byte(result.Stdout + result.Stderr) diff --git a/pkg/cmd/roachtest/tests/jepsen.go b/pkg/cmd/roachtest/tests/jepsen.go index 1b68f357ce54..c7d760c4173b 100644 --- a/pkg/cmd/roachtest/tests/jepsen.go +++ b/pkg/cmd/roachtest/tests/jepsen.go @@ -107,7 +107,7 @@ func initJepsen(ctx context.Context, t test.Test, c cluster.Cluster) { ctx, t.L(), controller, "sh", "-c", `"sudo DEBIAN_FRONTEND=noninteractive apt-get -qqy install openjdk-8-jre openjdk-8-jre-headless libjna-java gnuplot > /dev/null 2>&1"`, ); err != nil { - if result.RemoteExitStatus == "100" { + if result.RemoteExitStatus == 100 { t.Skip("apt-get failure (#31944)", result.Stdout+result.Stderr) } t.Fatal(err) diff --git a/pkg/cmd/roachtest/tests/mixed_version_decl_schemachange_compat.go b/pkg/cmd/roachtest/tests/mixed_version_decl_schemachange_compat.go index 39237170a775..48bd885ff987 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_decl_schemachange_compat.go +++ b/pkg/cmd/roachtest/tests/mixed_version_decl_schemachange_compat.go @@ -84,14 +84,14 @@ func validateCorpusFile( // Detect validation failures in standard output first, and dump those out. failureRegex := regexp.MustCompile(`failed to validate.*`) if matches := failureRegex.FindAllString(details.Stdout, -1); len(matches) > 0 { - t.Fatalf("Validation of corpus has failed (exit status %s): \n%s", + t.Fatalf("Validation of corpus has failed (exit status %d): \n%s", details.RemoteExitStatus, strings.Join(matches, "\n")) } // If no error is logged dump out both stdout and std error. - if details.RemoteExitStatus != "0" { - t.Fatalf("Validation command failed with exist status %s, output:\n %s\n%s\n", + if details.RemoteExitStatus != 0 { + t.Fatalf("Validation command failed with exit status %d, output:\n %s\n%s\n", details.RemoteExitStatus, details.Stdout, details.Stderr, diff --git a/pkg/cmd/roachtest/tests/nodejs_postgres.go b/pkg/cmd/roachtest/tests/nodejs_postgres.go index 223e5b1d536c..9065511956aa 100644 --- a/pkg/cmd/roachtest/tests/nodejs_postgres.go +++ b/pkg/cmd/roachtest/tests/nodejs_postgres.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -141,16 +142,10 @@ PGSSLCERT=$HOME/certs/client.%s.crt PGSSLKEY=$HOME/certs/client.%s.key PGSSLROOT ), ) - // Expected to fail but we should still scan the error to check if - // there's an SSH/roachprod error. - if err != nil { - // install.NonZeroExitCode includes unrelated to SSH errors ("255") - // or roachprod errors, so we call t.Fatal if the error is not an - // install.NonZeroExitCode error - commandError := (*install.NonZeroExitCode)(nil) - if !errors.As(err, &commandError) { - t.Fatal(err) - } + // Fatal for a roachprod or SSH error. A roachprod error is when result.Err==nil. + // Proceed for any other (command) errors + if err != nil && (result.Err == nil || errors.Is(err, rperrors.ErrSSH255)) { + t.Fatal(err) } rawResultsStr := result.Stdout + result.Stderr diff --git a/pkg/cmd/roachtest/tests/pgx.go b/pkg/cmd/roachtest/tests/pgx.go index 6144ad3583ef..c59b6954d2aa 100644 --- a/pkg/cmd/roachtest/tests/pgx.go +++ b/pkg/cmd/roachtest/tests/pgx.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/errors" ) @@ -121,16 +122,10 @@ func registerPgx(r registry.Registry) { "`go env GOPATH`/bin/go-junit-report", ) - // Expected to fail but we should still scan the error to check if - // there's an SSH/roachprod error. - if err != nil { - // install.NonZeroExitCode includes unrelated to SSH errors ("255") - // or roachprod errors, so we call t.Fatal if the error is not an - // install.NonZeroExitCode error - commandError := (*install.NonZeroExitCode)(nil) - if !errors.As(err, &commandError) { - t.Fatal(err) - } + // Fatal for a roachprod or SSH error. A roachprod error is when result.Err==nil. + // Proceed for any other (command) errors + if err != nil && (result.Err == nil || errors.Is(err, rperrors.ErrSSH255)) { + t.Fatal(err) } // Result error contains stdout, stderr, and any error content returned by exec package. diff --git a/pkg/cmd/roachtest/tests/psycopg.go b/pkg/cmd/roachtest/tests/psycopg.go index b556720c02f6..b9de3e7ca68f 100644 --- a/pkg/cmd/roachtest/tests/psycopg.go +++ b/pkg/cmd/roachtest/tests/psycopg.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/errors" ) @@ -124,16 +125,10 @@ func registerPsycopg(r registry.Registry) { make check PYTHON_VERSION=3`, ) - // Expected to fail but we should still scan the error to check if - // there's an SSH/roachprod error. - if err != nil { - // install.NonZeroExitCode includes unrelated to SSH errors ("255") - // or roachprod errors, so we call t.Fatal if the error is not an - // install.NonZeroExitCode error - commandError := (*install.NonZeroExitCode)(nil) - if !errors.As(err, &commandError) { - t.Fatal(err) - } + // Fatal for a roachprod or SSH error. A roachprod error is when result.Err==nil. + // Proceed for any other (command) errors + if err != nil && (result.Err == nil || errors.Is(err, rperrors.ErrSSH255)) { + t.Fatal(err) } // Result error contains stdout, stderr, and any error content returned by exec package. diff --git a/pkg/cmd/roachtest/tests/ruby_pg.go b/pkg/cmd/roachtest/tests/ruby_pg.go index 7418263e99fc..5e3874e541ec 100644 --- a/pkg/cmd/roachtest/tests/ruby_pg.go +++ b/pkg/cmd/roachtest/tests/ruby_pg.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -152,16 +153,10 @@ func registerRubyPG(r registry.Registry) { `cd /mnt/data1/ruby-pg/ && bundle exec rake compile test`, ) - // Expected to fail but we should still scan the error to check if - // there's an SSH/roachprod error. - if err != nil { - // install.NonZeroExitCode includes unrelated to SSH errors ("255") - // or roachprod errors, so we call t.Fatal if the error is not an - // install.NonZeroExitCode error - commandError := (*install.NonZeroExitCode)(nil) - if !errors.As(err, &commandError) { - t.Fatal(err) - } + // Fatal for a roachprod or SSH error. A roachprod error is when result.Err==nil. + // Proceed for any other (command) errors + if err != nil && (result.Err == nil || errors.Is(err, rperrors.ErrSSH255)) { + t.Fatal(err) } rawResults := []byte(result.Stdout + result.Stderr) diff --git a/pkg/cmd/roachtest/tests/sqlalchemy.go b/pkg/cmd/roachtest/tests/sqlalchemy.go index 0951fea093da..da9604893c8b 100644 --- a/pkg/cmd/roachtest/tests/sqlalchemy.go +++ b/pkg/cmd/roachtest/tests/sqlalchemy.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/errors" ) @@ -171,16 +172,10 @@ func runSQLAlchemy(ctx context.Context, t test.Test, c cluster.Cluster) { test/test_suite_sqlalchemy.py `) - // Expected to fail but we should still scan the error to check if - // there's an SSH/roachprod error. - if err != nil { - // install.NonZeroExitCode includes unrelated to SSH errors ("255") - // or roachprod errors, so we call t.Fatal if the error is not an - // install.NonZeroExitCode error - commandError := (*install.NonZeroExitCode)(nil) - if !errors.As(err, &commandError) { - t.Fatal(err) - } + // Fatal for a roachprod or SSH error. A roachprod error is when result.Err==nil. + // Proceed for any other (command) errors + if err != nil && (result.Err == nil || errors.Is(err, rperrors.ErrSSH255)) { + t.Fatal(err) } rawResults := []byte(result.Stdout + result.Stderr) diff --git a/pkg/roachprod/errors/errors.go b/pkg/roachprod/errors/errors.go index 544e460bbcdb..a6f5823200ea 100644 --- a/pkg/roachprod/errors/errors.go +++ b/pkg/roachprod/errors/errors.go @@ -33,6 +33,10 @@ const ( unclassifiedExitCode = 1 ) +// ErrSSH255 is a reference error used to mark an SSH error with an exit +// code of 255. This could be indicative of an SSH flake. +var ErrSSH255 = errors.New("SSH error occurred with exit code 255") + // Cmd wraps errors that result from a command run against the cluster. type Cmd struct { Err error @@ -114,9 +118,9 @@ func ClassifyCmdError(err error) Error { return nil } - if exitErr, ok := asExitError(err); ok { - if exitErr.ExitCode() == 255 { - return SSH{err} + if exitCode, ok := GetExitCode(err); ok { + if exitCode == 255 { + return SSH{errors.Mark(err, ErrSSH255)} } return Cmd{err} } @@ -124,6 +128,16 @@ func ClassifyCmdError(err error) Error { return Unclassified{err} } +// GetExitCode returns an exit code, true if the error is an instance +// of an ExitError, or -1, false otherwise +func GetExitCode(err error) (int, bool) { + if exitErr, ok := asExitError(err); ok { + return exitErr.ExitCode(), true + } + + return -1, false +} + // Extract the ExitError from err's error tree or (nil, false) if none exists. func asExitError(err error) (*exec.ExitError, bool) { var exitErr *exec.ExitError @@ -141,40 +155,3 @@ func AsError(err error) (Error, bool) { } return nil, false } - -// SelectPriorityError selects an error from the list in this priority order: -// -// - the Error with the highest exit code -// - one of the `error`s -// - nil -func SelectPriorityError(errors []error) error { - var result Error - for _, err := range errors { - if err == nil { - continue - } - - rpErr, _ := AsError(err) - if result == nil { - result = rpErr - continue - } - - if rpErr.ExitCode() > result.ExitCode() { - result = rpErr - } - } - - if result != nil { - return result - } - - for _, err := range errors { - if err != nil { - return err - } - } - return nil -} - -var _ = SelectPriorityError diff --git a/pkg/roachprod/install/BUILD.bazel b/pkg/roachprod/install/BUILD.bazel index 184678ddc639..1b13417f9c6f 100644 --- a/pkg/roachprod/install/BUILD.bazel +++ b/pkg/roachprod/install/BUILD.bazel @@ -53,8 +53,11 @@ go_test( data = glob(["testdata/**"]), embed = [":install"], deps = [ + "//pkg/roachprod/logger", "//pkg/testutils", + "//pkg/util/retry", "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index df0bac7e9590..4d543d64cc1d 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -23,7 +23,6 @@ import ( "os/exec" "os/signal" "path/filepath" - "reflect" "sort" "strings" "sync" @@ -101,6 +100,82 @@ func NewSyncedCluster( return c, nil } +// ErrAfterRetry marks an error that has occurred/persisted after retries +var ErrAfterRetry = errors.New("error occurred after retries") + +// The first retry is after 5s, the second and final is after 25s +var defaultRunRetryOpt = retry.Options{ + InitialBackoff: 5 * time.Second, + Multiplier: 5, + MaxBackoff: 1 * time.Minute, + // This will run a total of 3 times `runWithMaybeRetry` + MaxRetries: 2, +} + +// runWithMaybeRetry will run the specified function `f` at least once. +// Any returned error from `f` is passed to the `shouldRetryFn` which, +// if it returns true, will result in `f` being retried using the `retryOpts` +// If the `shouldRetryFn` is not specified (nil), then no retries will be +// performed. +// +// We operate on a pointer to RunResultDetails as it has already have been +// captured in a *RunResultDetails[] in Run, but here we may enrich with attempt +// number and a wrapper error. +func runWithMaybeRetry( + l *logger.Logger, + retryOpts retry.Options, + shouldRetryFn func(*RunResultDetails) bool, + f func() (*RunResultDetails, error), +) (*RunResultDetails, error) { + var err error + var res *RunResultDetails + + var cmdErr error + + for r := retry.Start(retryOpts); r.Next(); { + res, err = f() + res.Attempt = r.CurrentAttempt() + 1 + // nil err (denoting a roachprod error) indicates a potentially retryable res.Err + if err == nil && res.Err != nil { + cmdErr = errors.CombineErrors(cmdErr, res.Err) + if shouldRetryFn != nil && shouldRetryFn(res) { + l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1) + continue + } + } + break + } + + if res.Err != nil && res.Attempt > 1 { + // An error cannot be marked with more than one reference error. Since res.Err may already be marked, we create + // a new error here and mark it. + res.Err = errors.Mark(errors.Wrapf(cmdErr, "error persisted after %v attempts", res.Attempt), ErrAfterRetry) + } + return res, err +} + +// runWithDefaultSSHRetry will retry an SSH command which returns an error with exit code 255 +func runWithDefaultSSHRetry( + l *logger.Logger, f func() (*RunResultDetails, error), +) (*RunResultDetails, error) { + return runWithMaybeRetry( + l, + defaultRunRetryOpt, + func(res *RunResultDetails) bool { return errors.Is(res.Err, rperrors.ErrSSH255) }, + f, + ) +} + +// scpWithDefaultRetry assumes that any error returned from an scp attempt is retryable +func scpWithDefaultRetry(l *logger.Logger, src, dest string) (*RunResultDetails, error) { + return runWithMaybeRetry( + l, + defaultRunRetryOpt, + func(*RunResultDetails) bool { return true }, + func() (*RunResultDetails, error) { return scp(src, dest) }, + ) +} + // Host returns the public IP of a node. func (c *SyncedCluster) Host(n Node) string { return c.VMs[n-1].PublicIP @@ -238,10 +313,11 @@ func (c *SyncedCluster) Stop( if wait { display += " and waiting" } - return c.Parallel(l, display, len(c.Nodes), 0, func(i int) ([]byte, error) { - sess, err := c.newSession(c.Nodes[i]) + return c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { + node := c.Nodes[i] + sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() @@ -264,8 +340,8 @@ func (c *SyncedCluster) Stop( done echo "${pid}: dead" >> %[1]s/roachprod.log done`, - c.LogDir(c.Nodes[i]), // [1] - maxWait, // [2] + c.LogDir(node), // [1] + maxWait, // [2] ) } @@ -282,12 +358,15 @@ if [ -n "${pids}" ]; then kill -%[3]d ${pids} %[4]s fi`, - c.LogDir(c.Nodes[i]), // [1] - c.roachprodEnvRegex(c.Nodes[i]), // [2] - sig, // [3] - waitCmd, // [4] + c.LogDir(node), // [1] + c.roachprodEnvRegex(node), // [2] + sig, // [3] + waitCmd, // [4] ) - return sess.CombinedOutput(ctx, cmd) + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out + return res, res.Err }) } @@ -297,10 +376,11 @@ func (c *SyncedCluster) Wipe(ctx context.Context, l *logger.Logger, preserveCert if err := c.Stop(ctx, l, 9, true /* wait */, 0 /* maxWait */); err != nil { return err } - return c.Parallel(l, display, len(c.Nodes), 0, func(i int) ([]byte, error) { - sess, err := c.newSession(c.Nodes[i]) + return c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { + node := c.Nodes[i] + sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() @@ -325,7 +405,10 @@ sudo rm -fr logs && cmd += "sudo rm -fr tenant-certs* ;\n" } } - return sess.CombinedOutput(ctx, cmd) + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out + return res, res.Err }) } @@ -342,19 +425,19 @@ type NodeStatus struct { func (c *SyncedCluster) Status(ctx context.Context, l *logger.Logger) ([]NodeStatus, error) { display := fmt.Sprintf("%s: status", c.Name) results := make([]NodeStatus, len(c.Nodes)) - if err := c.Parallel(l, display, len(c.Nodes), 0, func(i int) ([]byte, error) { - sess, err := c.newSession(c.Nodes[i]) + if err := c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { + node := c.Nodes[i] + sess, err := c.newSession(node) if err != nil { - results[i] = NodeStatus{Err: err} - return nil, nil + return newRunResultDetails(node, err), err } defer sess.Close() - binary := cockroachNodeBinary(c, c.Nodes[i]) + binary := cockroachNodeBinary(c, node) cmd := fmt.Sprintf(`out=$(ps axeww -o pid -o ucomm -o command | \ sed 's/export ROACHPROD=//g' | \ awk '/%s/ {print $2, $1}'`, - c.roachprodEnvRegex(c.Nodes[i])) + c.roachprodEnvRegex(node)) cmd += ` | sort | uniq); vers=$(` + binary + ` version 2>/dev/null | awk '/Build Tag:/ {print $NF}') if [ -n "${out}" -a -n "${vers}" ]; then @@ -363,19 +446,23 @@ else echo ${out} fi ` - out, err := sess.CombinedOutput(ctx, cmd) - var msg string - if err != nil { - return nil, errors.Wrapf(err, "~ %s\n%s", cmd, out) + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out + + if res.Err != nil { + return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) } - msg = strings.TrimSpace(string(out)) + + msg := strings.TrimSpace(string(res.CombinedOut)) if msg == "" { results[i] = NodeStatus{Running: false} - return nil, nil + return res, nil } info := strings.Split(msg, " ") results[i] = NodeStatus{Running: true, Version: info[0], Pid: info[1]} - return nil, nil + + return res, nil }); err != nil { return nil, err } @@ -575,49 +662,45 @@ type RunResultDetails struct { Node Node Stdout string Stderr string + CombinedOut []byte Err error - RemoteExitStatus string + RemoteExitStatus int + Attempt int } -func processStdout(stdout string) (string, string) { - retStdout := stdout - exitStatusPattern := "LAST EXIT STATUS: " - exitStatusIndex := strings.LastIndex(retStdout, exitStatusPattern) - remoteExitStatus := "-1" - // If exitStatusIndex is -1 then "echo LAST EXIT STATUS: $?" didn't run - // mostly due to an ssh error but avoid speculation and temporarily - // use "-1" for unknown error before checking if it's SSH related later. - if exitStatusIndex != -1 { - retStdout = stdout[:exitStatusIndex] - remoteExitStatus = strings.TrimSpace(stdout[exitStatusIndex+len(exitStatusPattern):]) - } - return retStdout, remoteExitStatus +func newRunResultDetails(node Node, err error) *RunResultDetails { + res := RunResultDetails{ + Node: node, + Err: err, + } + if exitCode, success := rperrors.GetExitCode(err); success { + res.RemoteExitStatus = exitCode + } + + return &res } -func runCmdOnSingleNode( - ctx context.Context, l *logger.Logger, c *SyncedCluster, node Node, cmd string, -) (RunResultDetails, error) { - result := RunResultDetails{Node: node} +func (c *SyncedCluster) runCmdOnSingleNode( + ctx context.Context, + l *logger.Logger, + node Node, + cmd string, + combined bool, + stdout, stderr io.Writer, +) (*RunResultDetails, error) { sess, err := c.newSession(node) if err != nil { - return result, err + return newRunResultDetails(node, err), err } defer sess.Close() - sess.SetWithExitStatus(true) - var stdoutBuffer, stderrBuffer bytes.Buffer - multStdout := io.MultiWriter(&stdoutBuffer, l.Stdout) - multStderr := io.MultiWriter(&stderrBuffer, l.Stderr) - sess.SetStdout(multStdout) - sess.SetStderr(multStderr) - // Argument template expansion is node specific (e.g. for {store-dir}). e := expander{ node: node, } expandedCmd, err := e.expand(ctx, l, c, cmd) if err != nil { - return result, err + return newRunResultDetails(node, err), err } // Be careful about changing these command strings. In particular, we need @@ -634,31 +717,29 @@ func runCmdOnSingleNode( nodeCmd = fmt.Sprintf("cd %s; %s", c.localVMDir(node), nodeCmd) } - err = sess.Run(ctx, nodeCmd) - result.Stderr = stderrBuffer.String() - result.Stdout, result.RemoteExitStatus = processStdout(stdoutBuffer.String()) + var res *RunResultDetails + if combined { + out, cmdErr := sess.CombinedOutput(ctx, nodeCmd) + res = newRunResultDetails(node, cmdErr) + res.CombinedOut = out + } else { + // We stream the output if running on a single node. + var stdoutBuffer, stderrBuffer bytes.Buffer + multStdout := io.MultiWriter(&stdoutBuffer, stdout) + multStderr := io.MultiWriter(&stderrBuffer, stderr) + sess.SetStdout(multStdout) + sess.SetStderr(multStderr) - if err != nil { - detailMsg := fmt.Sprintf("Node %d. Command with error:\n```\n%s\n```\n", node, cmd) - err = errors.WithDetail(err, detailMsg) - err = rperrors.ClassifyCmdError(err) - if reflect.TypeOf(err) == reflect.TypeOf(rperrors.SSH{}) { - result.RemoteExitStatus = "255" - } - result.Err = err - } else if result.RemoteExitStatus != "0" { - result.Err = &NonZeroExitCode{fmt.Sprintf("Non-zero exit code: %s", result.RemoteExitStatus)} + res = newRunResultDetails(node, sess.Run(ctx, nodeCmd)) + res.Stderr = stderrBuffer.String() + res.Stdout = stdoutBuffer.String() } - return result, nil -} - -// NonZeroExitCode is returned when a command executed by Run() exits with a non-zero status. -type NonZeroExitCode struct { - message string -} -func (e *NonZeroExitCode) Error() string { - return e.message + if res.Err != nil { + detailMsg := fmt.Sprintf("Node %d. Command with error:\n```\n%s\n```\n", node, cmd) + res.Err = errors.WithDetail(res.Err, detailMsg) + } + return res, nil } // Run a command on >= 1 node in the cluster. @@ -682,74 +763,47 @@ func (c *SyncedCluster) Run( display = fmt.Sprintf("%s: %s", c.Name, title) } - errs := make([]error, len(nodes)) - results := make([]string, len(nodes)) - if err := c.Parallel(l, display, len(nodes), 0, func(i int) ([]byte, error) { - sess, err := c.newSession(nodes[i]) - if err != nil { - errs[i] = err - results[i] = err.Error() - return nil, nil - } - defer sess.Close() + results := make([]*RunResultDetails, len(nodes)) - // Argument template expansion is node specific (e.g. for {store-dir}). - e := expander{ - node: nodes[i], - } - expandedCmd, err := e.expand(ctx, l, c, cmd) - if err != nil { - return nil, err + // A result is the output of running a command (could be interpreted as an error) + if _, err := c.ParallelE(l, display, len(nodes), 0, func(i int) (*RunResultDetails, error) { + // An err returned here is an unexpected state within roachprod (non-command error). + // For errors that occur as part of running a command over ssh, the `result` will contain + // the actual error on a specific node. + result, err := c.runCmdOnSingleNode(ctx, l, nodes[i], cmd, !stream, stdout, stderr) + results[i] = result + return result, err + }); err != nil { + return err + } + + return processResults(results, stream, stdout) +} + +// processResults returns the error from the RunResultDetails with the highest RemoteExitStatus +func processResults(results []*RunResultDetails, stream bool, stdout io.Writer) error { + var resultWithError *RunResultDetails + for i, r := range results { + if !stream { + fmt.Fprintf(stdout, " %2d: %s\n%v\n", i+1, strings.TrimSpace(string(r.CombinedOut)), r.Err) } - // Be careful about changing these command strings. In particular, we need - // to support running commands in the background on both local and remote - // nodes. For example: - // - // roachprod run cluster -- "sleep 60 &> /dev/null < /dev/null &" - // - // That command should return immediately. And a "roachprod status" should - // reveal that the sleep command is running on the cluster. - nodeCmd := fmt.Sprintf(`export ROACHPROD=%s GOTRACEBACK=crash && bash -c %s`, - c.roachprodEnvValue(nodes[i]), ssh.Escape1(expandedCmd)) - if c.IsLocal() { - nodeCmd = fmt.Sprintf("cd %s; %s", c.localVMDir(nodes[i]), nodeCmd) - } - - if stream { - sess.SetStdout(stdout) - sess.SetStderr(stderr) - errs[i] = sess.Run(ctx, nodeCmd) - if errs[i] != nil { - detailMsg := fmt.Sprintf("Node %d. Command with error:\n```\n%s\n```\n", nodes[i], cmd) - err = errors.WithDetail(errs[i], detailMsg) - err = rperrors.ClassifyCmdError(err) - errs[i] = err + if r.Err != nil { + if resultWithError == nil { + resultWithError = r + continue } - return nil, nil - } - out, err := sess.CombinedOutput(ctx, nodeCmd) - msg := strings.TrimSpace(string(out)) - if err != nil { - detailMsg := fmt.Sprintf("Node %d. Command with error:\n```\n%s\n```\n", nodes[i], cmd) - err = errors.WithDetail(err, detailMsg) - err = rperrors.ClassifyCmdError(err) - errs[i] = err - msg += fmt.Sprintf("\n%v", err) - } - results[i] = msg - return nil, nil - }); err != nil { - return err + if r.RemoteExitStatus > resultWithError.RemoteExitStatus { + resultWithError = r + } + } } - if !stream { - for i, r := range results { - fmt.Fprintf(stdout, " %2d: %s\n", nodes[i], r) - } + if resultWithError != nil { + return resultWithError.Err } - return rperrors.SelectPriorityError(errs) + return nil } // RunWithDetails runs a command on the specified nodes and returns results details and an error. @@ -757,19 +811,24 @@ func (c *SyncedCluster) RunWithDetails( ctx context.Context, l *logger.Logger, nodes Nodes, title, cmd string, ) ([]RunResultDetails, error) { display := fmt.Sprintf("%s: %s", c.Name, title) - results := make([]RunResultDetails, len(nodes)) - failed, err := c.ParallelE(l, display, len(nodes), 0, func(i int) ([]byte, error) { - result, err := runCmdOnSingleNode(ctx, l, c, nodes[i], cmd) - if err != nil { - return nil, err - } - results[i] = result - return nil, nil + // We use pointers here as we are capturing the state of a result even though it may + // be processed further by the caller. + resultPtrs := make([]*RunResultDetails, len(nodes)) + + // Both return values are explicitly ignored because, in this case, resultPtrs + // capture both error and result state for each node + _, _ = c.ParallelE(l, display, len(nodes), 0, func(i int) (*RunResultDetails, error) { //nolint:errcheck + result, err := c.runCmdOnSingleNode(ctx, l, nodes[i], cmd, false, l.Stdout, l.Stderr) + resultPtrs[i] = result + return result, err }) - if err != nil { - for _, node := range failed { - results[node.Index].Err = node.Err + + // Return values to preserve API + results := make([]RunResultDetails, len(nodes)) + for i, v := range resultPtrs { + if v != nil { + results[i] = *v } } return results, nil @@ -808,9 +867,11 @@ func (c *SyncedCluster) RepeatRun( func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { display := fmt.Sprintf("%s: waiting for nodes to start", c.Name) errs := make([]error, len(c.Nodes)) - if err := c.Parallel(l, display, len(c.Nodes), 0, func(i int) ([]byte, error) { + if err := c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { + node := c.Nodes[i] + res := &RunResultDetails{Node: node} for j := 0; j < 600; j++ { - sess, err := c.newSession(c.Nodes[i]) + sess, err := c.newSession(node) if err != nil { time.Sleep(500 * time.Millisecond) continue @@ -822,10 +883,11 @@ func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { time.Sleep(500 * time.Millisecond) continue } - return nil, nil + return res, nil } errs[i] = errors.New("timed out after 5m") - return nil, nil + res.Err = errs[i] + return res, nil }); err != nil { return err } @@ -843,6 +905,16 @@ func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { return nil } +// setupSession is a helper which creates a new session and +// populates RunResultDetails with an error if one occurrs (unlikely +// given the code in `newSession`) +// RunResultDetails is used across all functions which +// create a session and holds error and stdout information +func (c *SyncedCluster) setupSession(node Node) (session, error) { + sess, err := c.newSession(node) + return sess, err +} + // SetupSSH configures the cluster for use with SSH. This is generally run after // the cloud.Cluster has been synced which resets the SSH credentials on the // machines and sets them up for the current user. This method enables the @@ -871,10 +943,10 @@ func (c *SyncedCluster) SetupSSH(ctx context.Context, l *logger.Logger) error { // Generate an ssh key that we'll distribute to all of the nodes in the // cluster in order to allow inter-node ssh. var sshTar []byte - if err := c.Parallel(l, "generating ssh key", 1, 0, func(i int) ([]byte, error) { - sess, err := c.newSession(1) + if err := c.Parallel(l, "generating ssh key", 1, 0, func(i int) (*RunResultDetails, error) { + sess, err := c.setupSession(1) if err != nil { - return nil, err + return newRunResultDetails(1, err), err } defer sess.Close() @@ -893,30 +965,40 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys sess.SetStdout(&stdout) sess.SetStderr(&stderr) - if err := sess.Run(ctx, cmd); err != nil { - return nil, errors.Wrapf(err, "%s: stderr:\n%s", cmd, stderr.String()) + res := newRunResultDetails(1, sess.Run(ctx, cmd)) + + res.Stdout = stdout.String() + res.Stderr = stderr.String() + if res.Err != nil { + return res, errors.Wrapf(res.Err, "%s: stderr:\n%s", cmd, res.Stderr) } - sshTar = stdout.Bytes() - return nil, nil + sshTar = []byte(res.Stdout) + return res, nil }); err != nil { return err } // Skip the first node which is where we generated the key. nodes := c.Nodes[1:] - if err := c.Parallel(l, "distributing ssh key", len(nodes), 0, func(i int) ([]byte, error) { - sess, err := c.newSession(nodes[i]) + if err := c.Parallel(l, "distributing ssh key", len(nodes), 0, func(i int) (*RunResultDetails, error) { + node := nodes[i] + sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() sess.SetStdin(bytes.NewReader(sshTar)) cmd := `tar xf -` - if out, err := sess.CombinedOutput(ctx, cmd); err != nil { - return nil, errors.Wrapf(err, "%s: output:\n%s", cmd, out) + + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out + + if res.Err != nil { + return res, errors.Wrapf(res.Err, "%s: output:\n%s", cmd, res.CombinedOut) } - return nil, nil + return res, nil }); err != nil { return err } @@ -926,19 +1008,23 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys // known hosts file in unhashed format, working around a limitation of jsch // (which is used in jepsen tests). ips := make([]string, len(c.Nodes), len(c.Nodes)*2) - if err := c.Parallel(l, "retrieving hosts", len(c.Nodes), 0, func(i int) ([]byte, error) { + if err := c.Parallel(l, "retrieving hosts", len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { + node := c.Nodes[i] + res := &RunResultDetails{Node: node} for j := 0; j < 20 && ips[i] == ""; j++ { var err error - ips[i], err = c.GetInternalIP(ctx, c.Nodes[i]) + ips[i], err = c.GetInternalIP(ctx, node) if err != nil { - return nil, errors.Wrapf(err, "pgurls") + res.Err = errors.Wrapf(err, "pgurls") + return res, res.Err } time.Sleep(time.Second) } if ips[i] == "" { - return nil, fmt.Errorf("retrieved empty IP address") + res.Err = fmt.Errorf("retrieved empty IP address") + return res, res.Err } - return nil, nil + return res, nil }); err != nil { return err } @@ -947,10 +1033,11 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys ips = append(ips, c.Host(i)) } var knownHostsData []byte - if err := c.Parallel(l, "scanning hosts", 1, 0, func(i int) ([]byte, error) { - sess, err := c.newSession(c.Nodes[i]) + if err := c.Parallel(l, "scanning hosts", 1, 0, func(i int) (*RunResultDetails, error) { + node := c.Nodes[i] + sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() @@ -980,19 +1067,25 @@ exit 1 var stderr bytes.Buffer sess.SetStdout(&stdout) sess.SetStderr(&stderr) - if err := sess.Run(ctx, cmd); err != nil { - return nil, errors.Wrapf(err, "%s: stderr:\n%s", cmd, stderr.String()) + + res := newRunResultDetails(node, sess.Run(ctx, cmd)) + + res.Stdout = stdout.String() + res.Stderr = stderr.String() + if res.Err != nil { + return res, errors.Wrapf(res.Err, "%s: stderr:\n%s", cmd, res.Stderr) } knownHostsData = stdout.Bytes() - return nil, nil + return res, nil }); err != nil { return err } - if err := c.Parallel(l, "distributing known_hosts", len(c.Nodes), 0, func(i int) ([]byte, error) { - sess, err := c.newSession(c.Nodes[i]) + if err := c.Parallel(l, "distributing known_hosts", len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { + node := c.Nodes[i] + sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() @@ -1024,10 +1117,14 @@ if [[ "$(whoami)" != "` + config.SharedUser + `" ]]; then '"'"'{}'"'"' ~` + config.SharedUser + `/.ssh' \; fi ` - if out, err := sess.CombinedOutput(ctx, cmd); err != nil { - return nil, errors.Wrapf(err, "%s: output:\n%s", cmd, out) + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out + + if res.Err != nil { + return res, errors.Wrapf(res.Err, "%s: output:\n%s", cmd, res.CombinedOut) } - return nil, nil + return res, nil }); err != nil { return err } @@ -1038,10 +1135,11 @@ fi // additional authorized_keys to both the current user (your username on // gce and the shared user on aws) as well as to the shared user on both // platforms. - if err := c.Parallel(l, "adding additional authorized keys", len(c.Nodes), 0, func(i int) ([]byte, error) { - sess, err := c.newSession(c.Nodes[i]) + if err := c.Parallel(l, "adding additional authorized keys", len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { + node := c.Nodes[i] + sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() @@ -1068,10 +1166,14 @@ if [[ "$(whoami)" != "` + config.SharedUser + `" ]]; then "${tmp2}" ~` + config.SharedUser + `/.ssh/authorized_keys fi ` - if out, err := sess.CombinedOutput(ctx, cmd); err != nil { - return nil, errors.Wrapf(err, "~ %s\n%s", cmd, out) + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out + + if res.Err != nil { + return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) } - return nil, nil + return res, nil }); err != nil { return err } @@ -1100,10 +1202,10 @@ func (c *SyncedCluster) DistributeCerts(ctx context.Context, l *logger.Logger) e // Generate the ca, client and node certificates on the first node. var msg string display := fmt.Sprintf("%s: initializing certs", c.Name) - if err := c.Parallel(l, display, 1, 0, func(i int) ([]byte, error) { - sess, err := c.newSession(1) + if err := c.Parallel(l, display, 1, 0, func(i int) (*RunResultDetails, error) { + sess, err := c.setupSession(1) if err != nil { - return nil, err + return newRunResultDetails(1, err), err } defer sess.Close() @@ -1120,10 +1222,15 @@ mkdir -p certs %[1]s cert create-node %[2]s --certs-dir=certs --ca-key=certs/ca.key tar cvf %[3]s certs `, cockroachNodeBinary(c, 1), strings.Join(nodeNames, " "), certsTarName) - if out, err := sess.CombinedOutput(ctx, cmd); err != nil { - msg = fmt.Sprintf("%s: %v", out, err) + + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(1, cmdErr) + res.CombinedOut = out + + if res.Err != nil { + msg = fmt.Sprintf("%s: %v", res.CombinedOut, res.Err) } - return nil, nil + return res, nil }); err != nil { return err } @@ -1133,7 +1240,7 @@ tar cvf %[3]s certs exit.WithCode(exit.UnspecifiedError()) } - tarfile, cleanup, err := c.getFileFromFirstNode(certsTarName) + tarfile, cleanup, err := c.getFileFromFirstNode(l, certsTarName) if err != nil { return err } @@ -1166,7 +1273,7 @@ func (c *SyncedCluster) DistributeTenantCerts( return err } - tarfile, cleanup, err := hostCluster.getFileFromFirstNode(tenantCertsTarName) + tarfile, cleanup, err := hostCluster.getFileFromFirstNode(l, tenantCertsTarName) if err != nil { return err } @@ -1184,11 +1291,11 @@ func (c *SyncedCluster) createTenantCertBundle( ctx context.Context, l *logger.Logger, bundleName string, tenantID int, nodeNames []string, ) error { display := fmt.Sprintf("%s: initializing tenant certs", c.Name) - return c.Parallel(l, display, 1, 0, func(i int) ([]byte, error) { + return c.Parallel(l, display, 1, 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() @@ -1222,10 +1329,14 @@ tar cvf %[5]s $CERT_DIR bundleName, ) - if out, err := sess.CombinedOutput(ctx, cmd); err != nil { - return nil, errors.Wrapf(err, "certificate creation error: %s", out) + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out + + if res.Err != nil { + return res, errors.Wrapf(res.Err, "certificate creation error: %s", res.CombinedOut) } - return nil, nil + return res, nil }) } @@ -1250,7 +1361,9 @@ func (c *SyncedCluster) cockroachBinSupportsTenantScope(ctx context.Context, nod // getFile retrieves the given file from the first node in the cluster. The // filename is assumed to be relative from the home directory of the node's // user. -func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error) { +func (c *SyncedCluster) getFileFromFirstNode( + l *logger.Logger, name string, +) (string, func(), error) { var tmpfileName string cleanup := func() {} if c.IsLocal() { @@ -1266,9 +1379,9 @@ func (c *SyncedCluster) getFileFromFirstNode(name string) (string, func(), error } srcFileName := fmt.Sprintf("%s@%s:%s", c.user(1), c.Host(1), name) - if err := c.scp(srcFileName, tmpfile.Name()); err != nil { + if res, _ := scpWithDefaultRetry(l, srcFileName, tmpfile.Name()); res.Err != nil { cleanup() - return "", nil, err + return "", nil, res.Err } tmpfileName = tmpfile.Name() } @@ -1300,14 +1413,20 @@ func (c *SyncedCluster) fileExistsOnFirstNode( ) bool { var existsErr error display := fmt.Sprintf("%s: checking %s", c.Name, path) - if err := c.Parallel(l, display, 1, 0, func(i int) ([]byte, error) { - sess, err := c.newSession(c.Nodes[i]) + if err := c.Parallel(l, display, 1, 0, func(i int) (*RunResultDetails, error) { + node := c.Nodes[i] + sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() - _, existsErr = sess.CombinedOutput(ctx, `test -e `+path) - return nil, nil + + out, cmdErr := sess.CombinedOutput(ctx, `test -e `+path) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out + + existsErr = res.Err + return res, nil }); err != nil { return false } @@ -1326,10 +1445,13 @@ func (c *SyncedCluster) createNodeCertArguments( nodes := allNodes(len(c.VMs)) if !c.IsLocal() { ips = make([]string, len(nodes)) - if err := c.Parallel(l, "", len(nodes), 0, func(i int) ([]byte, error) { - var err error - ips[i], err = c.GetInternalIP(ctx, nodes[i]) - return nil, errors.Wrapf(err, "IPs") + if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*RunResultDetails, error) { + node := nodes[i] + res := &RunResultDetails{Node: node} + + res.Stdout, res.Err = c.GetInternalIP(ctx, node) + ips[i] = res.Stdout + return res, errors.Wrapf(res.Err, "IPs") }); err != nil { return nil, err } @@ -1370,27 +1492,33 @@ func (c *SyncedCluster) distributeLocalCertsTar( } display := c.Name + ": distributing certs" - return c.Parallel(l, display, len(nodes), 0, func(i int) ([]byte, error) { - sess, err := c.newSession(nodes[i]) + return c.Parallel(l, display, len(nodes), 0, func(i int) (*RunResultDetails, error) { + node := nodes[i] + sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() sess.SetStdin(bytes.NewReader(certsTar)) var cmd string if c.IsLocal() { - cmd = fmt.Sprintf("cd %s ; ", c.localVMDir(nodes[i])) + cmd = fmt.Sprintf("cd %s ; ", c.localVMDir(node)) } if stripComponents > 0 { cmd += fmt.Sprintf("tar --strip-components=%d -xf -", stripComponents) } else { cmd += "tar xf -" } - if out, err := sess.CombinedOutput(ctx, cmd); err != nil { - return nil, errors.Wrapf(err, "~ %s\n%s", cmd, out) + + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out + + if res.Err != nil { + return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) } - return nil, nil + return res, nil }) } @@ -1585,10 +1713,10 @@ func (c *SyncedCluster) Put( return } - err = c.scp(from, to) - results <- result{i, err} + res, _ := scpWithDefaultRetry(l, from, to) + results <- result{i, res.Err} - if err != nil { + if res.Err != nil { // The copy failed. Re-add the original source. pushSource(srcIndex) } else { @@ -1940,8 +2068,8 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err return } - err := c.scp(fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) - if err == nil { + res, _ := scpWithDefaultRetry(l, fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) + if res.Err == nil { // Make sure all created files and directories are world readable. // The CRDB process intentionally sets a 0007 umask (resulting in // non-world-readable files). This creates annoyances during CI @@ -1961,10 +2089,10 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err } return nil } - err = filepath.Walk(dest, chmod) + res.Err = filepath.Walk(dest, chmod) } - results <- result{i, err} + results <- result{i, res.Err} }(i) } @@ -2056,10 +2184,12 @@ func (c *SyncedCluster) pghosts( ctx context.Context, l *logger.Logger, nodes Nodes, ) (map[Node]string, error) { ips := make([]string, len(nodes)) - if err := c.Parallel(l, "", len(nodes), 0, func(i int) ([]byte, error) { - var err error - ips[i], err = c.GetInternalIP(ctx, nodes[i]) - return nil, errors.Wrapf(err, "pghosts") + if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*RunResultDetails, error) { + node := nodes[i] + res := &RunResultDetails{Node: node} + res.Stdout, res.Err = c.GetInternalIP(ctx, node) + ips[i] = res.Stdout + return res, errors.Wrapf(res.Err, "pghosts") }); err != nil { return nil, err } @@ -2133,7 +2263,10 @@ func (c *SyncedCluster) SSH(ctx context.Context, l *logger.Logger, sshArgs, args return syscall.Exec(sshPath, allArgs, os.Environ()) } -func (c *SyncedCluster) scp(src, dest string) error { +// scp return type conforms to what runWithMaybeRetry expects. A nil error +// is always returned here since the only error that can happen is an scp error +// which we do want to be able to retry. +func scp(src, dest string) (*RunResultDetails, error) { args := []string{ "scp", "-r", "-C", "-o", "StrictHostKeyChecking=no", @@ -2143,9 +2276,12 @@ func (c *SyncedCluster) scp(src, dest string) error { cmd := exec.Command(args[0], args[1:]...) out, err := cmd.CombinedOutput() if err != nil { - return errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out) + err = errors.Wrapf(err, "~ %s\n%s", strings.Join(args, " "), out) } - return nil + + res := newRunResultDetails(-1, err) + res.CombinedOut = out + return res, nil } // ParallelResult captures the result of a user-defined function @@ -2162,7 +2298,10 @@ type ParallelResult struct { // // See ParallelE for more information. func (c *SyncedCluster) Parallel( - l *logger.Logger, display string, count, concurrency int, fn func(i int) ([]byte, error), + l *logger.Logger, + display string, + count, concurrency int, + fn func(i int) (*RunResultDetails, error), ) error { failed, err := c.ParallelE(l, display, count, concurrency, fn) if err != nil { @@ -2183,10 +2322,16 @@ func (c *SyncedCluster) Parallel( // `config.MaxConcurrency` if it is lower) in parallel. If `concurrency` is // 0, then it defaults to `count`. // +// The function returns a pointer to RunResultDetails as we may enrich +// the result with retry information (attempt number, wrapper error) +// // If err is non-nil, the slice of ParallelResults will contain the // results from any of the failed invocations. func (c *SyncedCluster) ParallelE( - l *logger.Logger, display string, count, concurrency int, fn func(i int) ([]byte, error), + l *logger.Logger, + display string, + count, concurrency int, + fn func(i int) (*RunResultDetails, error), ) ([]ParallelResult, error) { if concurrency == 0 || concurrency > count { concurrency = count @@ -2203,8 +2348,8 @@ func (c *SyncedCluster) ParallelE( startNext := func() { go func(i int) { defer wg.Done() - out, err := fn(i) - results <- ParallelResult{i, out, err} + res, err := runWithDefaultSSHRetry(l, func() (*RunResultDetails, error) { return fn(i) }) + results <- ParallelResult{i, res.CombinedOut, err} }(index) index++ } @@ -2283,7 +2428,11 @@ func (c *SyncedCluster) ParallelE( } if len(failed) > 0 { - return failed, errors.New("one or more parallel execution failure") + var err error + for _, res := range failed { + err = errors.CombineErrors(err, res.Err) + } + return failed, errors.Wrap(err, "parallel execution failure") } return nil, nil } diff --git a/pkg/roachprod/install/cluster_synced_test.go b/pkg/roachprod/install/cluster_synced_test.go index fb25e09ea689..a5e58706a69b 100644 --- a/pkg/roachprod/install/cluster_synced_test.go +++ b/pkg/roachprod/install/cluster_synced_test.go @@ -12,7 +12,14 @@ package install import ( "fmt" + "io" "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) // TestRoachprodEnv tests the roachprodEnvRegex and roachprodEnvValue methods. @@ -75,3 +82,92 @@ func TestRoachprodEnv(t *testing.T) { }) } } + +func TestRunWithMaybeRetry(t *testing.T) { + var testRetryOpts = retry.Options{ + InitialBackoff: 10 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 1 * time.Second, + // This will run a total of 3 times `runWithMaybeRetry` + MaxRetries: 2, + } + + l := nilLogger() + + attempt := 0 + cases := []struct { + f func() (*RunResultDetails, error) + shouldRetryFn func(*RunResultDetails) bool + expectedAttempts int + shouldError bool + }{ + { // Happy path: no error, no retry required + f: func() (*RunResultDetails, error) { + return newResult(0), nil + }, + expectedAttempts: 1, + shouldError: false, + }, + { // Error, but not retry function specified + f: func() (*RunResultDetails, error) { + return newResult(1), nil + }, + expectedAttempts: 1, + shouldError: true, + }, + { // Error, with retries exhausted + f: func() (*RunResultDetails, error) { + return newResult(255), nil + }, + shouldRetryFn: func(d *RunResultDetails) bool { return d.RemoteExitStatus == 255 }, + expectedAttempts: 3, + shouldError: true, + }, + { // Eventual success after retries + f: func() (*RunResultDetails, error) { + attempt++ + if attempt == 3 { + return newResult(0), nil + } + return newResult(255), nil + }, + shouldRetryFn: func(d *RunResultDetails) bool { return d.RemoteExitStatus == 255 }, + expectedAttempts: 3, + shouldError: false, + }, + } + + for idx, tc := range cases { + attempt = 0 + t.Run(fmt.Sprintf("%d", idx+1), func(t *testing.T) { + res, _ := runWithMaybeRetry(l, testRetryOpts, tc.shouldRetryFn, tc.f) + + require.Equal(t, tc.shouldError, res.Err != nil) + require.Equal(t, tc.expectedAttempts, res.Attempt) + + if tc.shouldError && tc.expectedAttempts == 3 { + require.True(t, errors.Is(res.Err, ErrAfterRetry)) + } + }) + } +} + +func newResult(exitCode int) *RunResultDetails { + var err error + if exitCode != 0 { + err = errors.Newf("Error with exit code %v", exitCode) + } + return &RunResultDetails{RemoteExitStatus: exitCode, Err: err} +} + +func nilLogger() *logger.Logger { + lcfg := logger.Config{ + Stdout: io.Discard, + Stderr: io.Discard, + } + l, err := lcfg.NewLogger("" /* path */) + if err != nil { + panic(err) + } + return l +} diff --git a/pkg/roachprod/install/cockroach.go b/pkg/roachprod/install/cockroach.go index 9ae9591baaa8..bb77afb9c46e 100644 --- a/pkg/roachprod/install/cockroach.go +++ b/pkg/roachprod/install/cockroach.go @@ -152,24 +152,25 @@ func (c *SyncedCluster) Start(ctx context.Context, l *logger.Logger, startOpts S } l.Printf("%s: starting nodes", c.Name) - return c.Parallel(l, "", len(nodes), parallelism, func(nodeIdx int) ([]byte, error) { + return c.Parallel(l, "", len(nodes), parallelism, func(nodeIdx int) (*RunResultDetails, error) { node := nodes[nodeIdx] - + res := &RunResultDetails{Node: node} // NB: if cockroach started successfully, we ignore the output as it is // some harmless start messaging. if _, err := c.startNode(ctx, l, node, startOpts); err != nil { - return nil, err + res.Err = err + return res, err } // Code that follows applies only for regular nodes. if startOpts.Target != StartDefault { - return nil, nil + return res, nil } // We reserve a few special operations (bootstrapping, and setting // cluster settings) for node 1. if node != 1 { - return nil, nil + return res, nil } // NB: The code blocks below are not parallelized, so it's safe for us @@ -179,7 +180,7 @@ func (c *SyncedCluster) Start(ctx context.Context, l *logger.Logger, startOpts S // 2. We don't init when invoking with `start-single-node`. if startOpts.SkipInit { - return nil, nil + return res, nil } shouldInit := !c.useStartSingleNode() @@ -187,7 +188,8 @@ func (c *SyncedCluster) Start(ctx context.Context, l *logger.Logger, startOpts S l.Printf("%s: initializing cluster", c.Name) initOut, err := c.initializeCluster(ctx, node) if err != nil { - return nil, errors.WithDetail(err, "unable to initialize cluster") + res.Err = err + return res, errors.Wrap(err, "failed to initialize cluster") } if initOut != "" { @@ -201,12 +203,13 @@ func (c *SyncedCluster) Start(ctx context.Context, l *logger.Logger, startOpts S l.Printf("%s: setting cluster settings", c.Name) clusterSettingsOut, err := c.setClusterSettings(ctx, l, node) if err != nil { - return nil, errors.Wrap(err, "unable to set cluster settings") + res.Err = err + return res, errors.Wrap(err, "unable to set cluster settings") } if clusterSettingsOut != "" { l.Printf(clusterSettingsOut) } - return nil, nil + return res, nil }) } @@ -302,11 +305,11 @@ func (c *SyncedCluster) RunSQL(ctx context.Context, l *logger.Logger, args []str resultChan := make(chan result, len(c.Nodes)) display := fmt.Sprintf("%s: executing sql", c.Name) - if err := c.Parallel(l, display, len(c.Nodes), 0, func(nodeIdx int) ([]byte, error) { + if err := c.Parallel(l, display, len(c.Nodes), 0, func(nodeIdx int) (*RunResultDetails, error) { node := c.Nodes[nodeIdx] sess, err := c.newSession(node) if err != nil { - return nil, err + return newRunResultDetails(node, err), err } defer sess.Close() @@ -318,13 +321,15 @@ func (c *SyncedCluster) RunSQL(ctx context.Context, l *logger.Logger, args []str c.NodeURL("localhost", c.NodePort(node)) + " " + ssh.Escape(args) - out, err := sess.CombinedOutput(ctx, cmd) - if err != nil { - return nil, errors.Wrapf(err, "~ %s\n%s", cmd, out) - } + out, cmdErr := sess.CombinedOutput(ctx, cmd) + res := newRunResultDetails(node, cmdErr) + res.CombinedOut = out - resultChan <- result{node: node, output: string(out)} - return nil, nil + if res.Err != nil { + return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) + } + resultChan <- result{node: node, output: string(res.CombinedOut)} + return res, nil }); err != nil { return err } diff --git a/pkg/roachprod/install/session.go b/pkg/roachprod/install/session.go index f65092d57b34..ea2fd8fe9b17 100644 --- a/pkg/roachprod/install/session.go +++ b/pkg/roachprod/install/session.go @@ -16,17 +16,16 @@ import ( "os" "os/exec" "path/filepath" - "strings" "sync" "github.com/cockroachdb/cockroach/pkg/roachprod/config" + rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/errors" ) type session interface { CombinedOutput(ctx context.Context, cmd string) ([]byte, error) Run(ctx context.Context, cmd string) error - SetWithExitStatus(value bool) SetStdin(r io.Reader) SetStdout(w io.Writer) SetStderr(w io.Writer) @@ -41,9 +40,8 @@ type session interface { type remoteSession struct { *exec.Cmd - cancel func() - logfile string // captures ssh -vvv - withExitStatus bool + cancel func() + logfile string // captures ssh -vvv } func newRemoteSession(user, host string, logdir string) (*remoteSession, error) { @@ -56,7 +54,6 @@ func newRemoteSession(user, host string, logdir string) (*remoteSession, error) // fmt.Sprintf("ssh_%s_%s", host, timeutil.Now().Format(time.RFC3339)), // ) const logfile = "" - withExitStatus := false args := []string{ user + "@" + host, @@ -81,7 +78,7 @@ func newRemoteSession(user, host string, logdir string) (*remoteSession, error) args = append(args, sshAuthArgs()...) ctx, cancel := context.WithCancel(context.Background()) cmd := exec.CommandContext(ctx, "ssh", args...) - return &remoteSession{cmd, cancel, logfile, withExitStatus}, nil + return &remoteSession{cmd, cancel, logfile}, nil } func (s *remoteSession) errWithDebug(err error) error { @@ -93,13 +90,6 @@ func (s *remoteSession) errWithDebug(err error) error { } func (s *remoteSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, error) { - if s.withExitStatus { - cmd = strings.TrimSpace(cmd) - if !strings.HasSuffix(cmd, ";") { - cmd += ";" - } - cmd += "echo -n 'LAST EXIT STATUS: '$?;" - } s.Cmd.Args = append(s.Cmd.Args, cmd) var b []byte @@ -117,18 +107,11 @@ func (s *remoteSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, s.Close() return nil, ctx.Err() case <-commandFinished: - return b, err + return b, rperrors.ClassifyCmdError(err) } } func (s *remoteSession) Run(ctx context.Context, cmd string) error { - if s.withExitStatus { - cmd = strings.TrimSpace(cmd) - if !strings.HasSuffix(cmd, ";") { - cmd += ";" - } - cmd += "echo -n 'LAST EXIT STATUS: '$?;" - } s.Cmd.Args = append(s.Cmd.Args, cmd) var err error @@ -143,24 +126,13 @@ func (s *remoteSession) Run(ctx context.Context, cmd string) error { s.Close() return ctx.Err() case <-commandFinished: - return err + return rperrors.ClassifyCmdError(err) } } func (s *remoteSession) Start(cmd string) error { - if s.withExitStatus { - cmd = strings.TrimSpace(cmd) - if !strings.HasSuffix(cmd, ";") { - cmd += ";" - } - cmd += "echo -n 'LAST EXIT STATUS: '$?;" - } s.Cmd.Args = append(s.Cmd.Args, cmd) - return s.Cmd.Start() -} - -func (s *remoteSession) SetWithExitStatus(value bool) { - s.withExitStatus = value + return rperrors.ClassifyCmdError(s.Cmd.Start()) } func (s *remoteSession) SetStdin(r io.Reader) { @@ -207,25 +179,16 @@ func (s *remoteSession) Close() { type localSession struct { *exec.Cmd - cancel func() - withExitStatus bool + cancel func() } func newLocalSession() *localSession { ctx, cancel := context.WithCancel(context.Background()) - withExitStatus := false cmd := exec.CommandContext(ctx, "/bin/bash", "-c") - return &localSession{cmd, cancel, withExitStatus} + return &localSession{cmd, cancel} } func (s *localSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, error) { - if s.withExitStatus { - cmd = strings.TrimSpace(cmd) - if !strings.HasSuffix(cmd, ";") { - cmd += ";" - } - cmd += "echo -n 'LAST EXIT STATUS: '$?;" - } s.Cmd.Args = append(s.Cmd.Args, cmd) var b []byte @@ -242,18 +205,11 @@ func (s *localSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, s.Close() return nil, ctx.Err() case <-commandFinished: - return b, err + return b, rperrors.ClassifyCmdError(err) } } func (s *localSession) Run(ctx context.Context, cmd string) error { - if s.withExitStatus { - cmd = strings.TrimSpace(cmd) - if !strings.HasSuffix(cmd, ";") { - cmd += ";" - } - cmd += "echo -n 'LAST EXIT STATUS: '$?;" - } s.Cmd.Args = append(s.Cmd.Args, cmd) var err error @@ -268,24 +224,13 @@ func (s *localSession) Run(ctx context.Context, cmd string) error { s.Close() return ctx.Err() case <-commandFinished: - return err + return rperrors.ClassifyCmdError(err) } } func (s *localSession) Start(cmd string) error { - if s.withExitStatus { - cmd = strings.TrimSpace(cmd) - if !strings.HasSuffix(cmd, ";") { - cmd += ";" - } - cmd += "echo -n 'LAST EXIT STATUS: '$?;" - } s.Cmd.Args = append(s.Cmd.Args, cmd) - return s.Cmd.Start() -} - -func (s *localSession) SetWithExitStatus(value bool) { - s.withExitStatus = value + return rperrors.ClassifyCmdError(s.Cmd.Start()) } func (s *localSession) SetStdin(r io.Reader) { diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go index 90c5ab364d33..62256476f180 100644 --- a/pkg/roachprod/roachprod.go +++ b/pkg/roachprod/roachprod.go @@ -449,9 +449,12 @@ func IP( } } else { var err error - if err := c.Parallel(l, "", len(nodes), 0, func(i int) ([]byte, error) { - ips[i], err = c.GetInternalIP(ctx, nodes[i]) - return nil, err + if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*install.RunResultDetails, error) { + node := nodes[i] + res := &install.RunResultDetails{Node: node} + res.Stdout, res.Err = c.GetInternalIP(ctx, node) + ips[i] = res.Stdout + return res, err }); err != nil { return nil, err } @@ -858,9 +861,12 @@ func PgURL( } } else { var err error - if err := c.Parallel(l, "", len(nodes), 0, func(i int) ([]byte, error) { - ips[i], err = c.GetInternalIP(ctx, nodes[i]) - return nil, err + if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*install.RunResultDetails, error) { + node := nodes[i] + res := &install.RunResultDetails{Node: node} + res.Stdout, res.Err = c.GetInternalIP(ctx, node) + ips[i] = res.Stdout + return res, err }); err != nil { return nil, err } @@ -988,8 +994,9 @@ func Pprof(l *logger.Logger, clusterName string, opts PprofOpts) error { httpClient := httputil.NewClientWithTimeout(timeout) startTime := timeutil.Now().Unix() nodes := c.TargetNodes() - failed, err := c.ParallelE(l, description, len(nodes), 0, func(i int) ([]byte, error) { + failed, err := c.ParallelE(l, description, len(nodes), 0, func(i int) (*install.RunResultDetails, error) { node := nodes[i] + res := &install.RunResultDetails{Node: node} host := c.Host(node) port := c.NodeUIPort(node) scheme := "http" @@ -1000,7 +1007,8 @@ func Pprof(l *logger.Logger, clusterName string, opts PprofOpts) error { outputDir := filepath.Dir(outputFile) file, err := os.CreateTemp(outputDir, ".pprof") if err != nil { - return nil, errors.Wrap(err, "create tmpfile for pprof download") + res.Err = errors.Wrap(err, "create tmpfile for pprof download") + return res, res.Err } defer func() { @@ -1017,31 +1025,37 @@ func Pprof(l *logger.Logger, clusterName string, opts PprofOpts) error { pprofURL := fmt.Sprintf("%s://%s:%d/%s", scheme, host, port, pprofPath) resp, err := httpClient.Get(context.Background(), pprofURL) if err != nil { - return nil, err + res.Err = err + return res, res.Err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return nil, errors.Newf("unexpected status from pprof endpoint: %s", resp.Status) + res.Err = errors.Newf("unexpected status from pprof endpoint: %s", resp.Status) + return res, res.Err } if _, err := io.Copy(file, resp.Body); err != nil { - return nil, err + res.Err = err + return res, res.Err } if err := file.Sync(); err != nil { - return nil, err + res.Err = err + return res, res.Err } if err := file.Close(); err != nil { - return nil, err + res.Err = err + return res, res.Err } if err := os.Rename(file.Name(), outputFile); err != nil { - return nil, err + res.Err = err + return res, res.Err } mu.Lock() outputFiles = append(outputFiles, outputFile) mu.Unlock() - return nil, nil + return res, nil }) for _, s := range outputFiles { diff --git a/pkg/testutils/lint/passes/fmtsafe/fmtsafe.go b/pkg/testutils/lint/passes/fmtsafe/fmtsafe.go index f55046ac48ee..b73d041a5928 100644 --- a/pkg/testutils/lint/passes/fmtsafe/fmtsafe.go +++ b/pkg/testutils/lint/passes/fmtsafe/fmtsafe.go @@ -245,13 +245,13 @@ func checkCallExpr(pass *analysis.Pass, enclosingFnName string, call *ast.CallEx return } - pass.Reportf(call.Lparen, escNl("%s(): %s argument is not a constant expression"+Tip), - enclosingFnName, argType) + pass.Reportf(call.Lparen, escNl("%s(): %s argument is not a constant expression"+Tip), enclosingFnName, argType) } // Tip is exported for use in tests. var Tip = ` -Tip: use YourFuncf("descriptive prefix %%s", ...) or list new formatting wrappers in pkg/testutils/lint/passes/fmtsafe/functions.go.` +Tip: use YourFuncf("descriptive prefix %%s", ...) or list new formatting wrappers in pkg/testutils/lint/passes/fmtsafe/functions.go. +N.B. additional entry is required functions in the main package. See functions.go#requireConstFmt` func hasNoLintComment(pass *analysis.Pass, call *ast.CallExpr, idx int) bool { fPos, f := findContainingFile(pass, call) diff --git a/pkg/testutils/lint/passes/fmtsafe/functions.go b/pkg/testutils/lint/passes/fmtsafe/functions.go index 8b3fbb104f59..d5a2ffa82b73 100644 --- a/pkg/testutils/lint/passes/fmtsafe/functions.go +++ b/pkg/testutils/lint/passes/fmtsafe/functions.go @@ -32,8 +32,14 @@ var requireConstMsg = map[string]bool{ "(*github.com/cockroachdb/cockroach/pkg/sql.optPlanningCtx).log": true, } -// requireConstFmt records functions for which the string arg -// before the final ellipsis must be a constant string. +/* +requireConstFmt records functions for which the string arg +before the final ellipsis must be a constant string. + +Definitions surrounded in parentheses are functions attached to a struct. +For functions defined in the main package, a *second* entry is required +in the form (main.yourStruct).yourFuncF +*/ var requireConstFmt = map[string]bool{ // Logging things. "log.Printf": true, @@ -85,6 +91,17 @@ var requireConstFmt = map[string]bool{ "(*github.com/cockroachdb/cockroach/pkg/util/grpcutil.grpcLogger).Errorf": true, "(*github.com/cockroachdb/cockroach/pkg/util/grpcutil.grpcLogger).Fatalf": true, + // Both of these signatures need to be included for the linter to not flag + // roachtest testImpl.addFailure since it is in the main package + "(*github.com/cockroachdb/cockroach/pkg/cmd/roachtest.testImpl).addFailure": true, + "(*main.testImpl).addFailure": true, + + "(*main.testImpl).Fatalf": true, + "(*github.com/cockroachdb/cockroach/pkg/cmd/roachtest.testImpl).Fatalf": true, + + "(*main.testImpl).Errorf": true, + "(*github.com/cockroachdb/cockroach/pkg/cmd/roachtest.testImpl).Errorf": true, + "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Debugf": true, "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Infof": true, "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Warningf": true, diff --git a/pkg/util/retry/retry.go b/pkg/util/retry/retry.go index f6010085160c..b082a462c7bf 100644 --- a/pkg/util/retry/retry.go +++ b/pkg/util/retry/retry.go @@ -158,6 +158,11 @@ func (r *Retry) NextCh() <-chan time.Time { return time.After(r.retryIn()) } +// CurrentAttempt returns the current attempt +func (r *Retry) CurrentAttempt() int { + return r.currentAttempt +} + // Do invokes the closure according to the retry options until it returns // success or no more retries are possible. Always returns an error unless the // return is prompted by a successful invocation of `fn`.