From d0c72c23dcb5afc80918686f18c14c870eacbd1c Mon Sep 17 00:00:00 2001 From: "Bryan C. Mills" Date: Fri, 3 Mar 2023 14:48:19 +0000 Subject: [PATCH] cmd/go: add a GODEBUG to limit the number of concurrent network connections I implemented this in order to debug connection failures on a new-to-me VM development environment that uses Cloud NAT. It doesn't directly fix the bug, but perhaps folks will find it useful to diagnose port-exhaustion-related flakiness in other environments. For #52545. Change-Id: Icd3f13dcf62e718560c4f4a965a4df7c1bd785ce Reviewed-on: https://go-review.googlesource.com/c/go/+/473277 Run-TryBot: Bryan Mills TryBot-Result: Gopher Robot Reviewed-by: Michael Matloob Auto-Submit: Bryan Mills --- src/cmd/go/go_test.go | 62 +++++++++++++-- src/cmd/go/internal/base/limit.go | 84 ++++++++++++++++++++ src/cmd/go/internal/modfetch/codehost/git.go | 23 +++++- src/cmd/go/internal/modfetch/codehost/svn.go | 12 +++ src/cmd/go/internal/modfetch/codehost/vcs.go | 16 +++- src/cmd/go/internal/vcs/vcs.go | 25 ++++++ src/cmd/go/internal/web/http.go | 29 +++++++ src/cmd/go/script_test.go | 21 ++++- src/cmd/go/scriptconds_test.go | 23 ++++++ 9 files changed, 284 insertions(+), 11 deletions(-) create mode 100644 src/cmd/go/internal/base/limit.go diff --git a/src/cmd/go/go_test.go b/src/cmd/go/go_test.go index 291f51244727d2..da7336da9eb27b 100644 --- a/src/cmd/go/go_test.go +++ b/src/cmd/go/go_test.go @@ -19,6 +19,7 @@ import ( "io" "io/fs" "log" + "math" "os" "os/exec" "path/filepath" @@ -29,6 +30,7 @@ import ( "testing" "time" + "cmd/go/internal/base" "cmd/go/internal/cache" "cmd/go/internal/cfg" "cmd/go/internal/robustio" @@ -61,6 +63,12 @@ var ( cgoEnabled string // raw value from 'go env CGO_ENABLED' ) +// netTestSem is a semaphore limiting the number of tests that may use the +// external network in parallel. If non-nil, it contains one buffer slot per +// test (send to acquire), with a low enough limit that the overall number of +// connections (summed across subprocesses) stays at or below base.NetLimit. +var netTestSem chan struct{} + var exeSuffix string = func() string { if runtime.GOOS == "windows" { return ".exe" @@ -282,6 +290,17 @@ func TestMain(m *testing.M) { } } + if n, limited := base.NetLimit(); limited && n > 0 { + // Split the network limit into chunks, so that each parallel script can + // have one chunk. We want to run as many parallel scripts as possible, but + // also want to give each script as high a limit as possible. + // We arbitrarily split by sqrt(n) to try to balance those two goals. + netTestLimit := int(math.Sqrt(float64(n))) + netTestSem = make(chan struct{}, netTestLimit) + reducedLimit := fmt.Sprintf(",%s=%d", base.NetLimitGodebug.Name(), n/netTestLimit) + os.Setenv("GODEBUG", os.Getenv("GODEBUG")+reducedLimit) + } + // Don't let these environment variables confuse the test. os.Setenv("GOENV", "off") os.Unsetenv("GOFLAGS") @@ -369,6 +388,7 @@ type testgoData struct { tempdir string ran bool inParallel bool + hasNet bool stdout, stderr bytes.Buffer execDir string // dir for tg.run } @@ -411,6 +431,9 @@ func (tg *testgoData) parallel() { if tg.ran { tg.t.Fatal("internal testsuite error: call to parallel after run") } + if tg.hasNet { + tg.t.Fatal("internal testsuite error: call to parallel after acquireNet") + } for _, e := range tg.env { if strings.HasPrefix(e, "GOROOT=") || strings.HasPrefix(e, "GOPATH=") || strings.HasPrefix(e, "GOBIN=") { val := e[strings.Index(e, "=")+1:] @@ -423,6 +446,25 @@ func (tg *testgoData) parallel() { tg.t.Parallel() } +// acquireNet skips t if the network is unavailable, and otherwise acquires a +// netTestSem token for t to be released at the end of the test. +// +// t.Parallel must not be called after acquireNet. +func (tg *testgoData) acquireNet() { + tg.t.Helper() + if tg.hasNet { + return + } + + testenv.MustHaveExternalNetwork(tg.t) + if netTestSem != nil { + netTestSem <- struct{}{} + tg.t.Cleanup(func() { <-netTestSem }) + } + tg.setenv("TESTGONETWORK", "") + tg.hasNet = true +} + // pwd returns the current directory. func (tg *testgoData) pwd() string { tg.t.Helper() @@ -444,9 +486,6 @@ func (tg *testgoData) sleep() { // command. func (tg *testgoData) setenv(name, val string) { tg.t.Helper() - if tg.inParallel && (name == "GOROOT" || name == "GOPATH" || name == "GOBIN") && (strings.HasPrefix(val, "testdata") || strings.HasPrefix(val, "./testdata")) { - tg.t.Fatalf("internal testsuite error: call to setenv with testdata (%s=%s) after parallel", name, val) - } tg.unsetenv(name) tg.env = append(tg.env, name+"="+val) } @@ -455,7 +494,10 @@ func (tg *testgoData) setenv(name, val string) { func (tg *testgoData) unsetenv(name string) { if tg.env == nil { tg.env = append([]string(nil), os.Environ()...) - tg.env = append(tg.env, "GO111MODULE=off") + tg.env = append(tg.env, "GO111MODULE=off", "TESTGONETWORK=panic") + if testing.Short() { + tg.env = append(tg.env, "TESTGOVCS=panic") + } } for i, v := range tg.env { if strings.HasPrefix(v, name+"=") { @@ -1012,12 +1054,13 @@ func TestNewReleaseRebuildsStalePackagesInGOPATH(t *testing.T) { // cmd/go: custom import path checking should not apply to Go packages without import comment. func TestIssue10952(t *testing.T) { - testenv.MustHaveExternalNetwork(t) testenv.MustHaveExecPath(t, "git") tg := testgo(t) defer tg.cleanup() tg.parallel() + tg.acquireNet() + tg.tempDir("src") tg.setenv("GOPATH", tg.path(".")) const importPath = "github.com/zombiezen/go-get-issue-10952" @@ -1029,12 +1072,13 @@ func TestIssue10952(t *testing.T) { // Test git clone URL that uses SCP-like syntax and custom import path checking. func TestIssue11457(t *testing.T) { - testenv.MustHaveExternalNetwork(t) testenv.MustHaveExecPath(t, "git") tg := testgo(t) defer tg.cleanup() tg.parallel() + tg.acquireNet() + tg.tempDir("src") tg.setenv("GOPATH", tg.path(".")) const importPath = "rsc.io/go-get-issue-11457" @@ -1054,12 +1098,13 @@ func TestIssue11457(t *testing.T) { } func TestGetGitDefaultBranch(t *testing.T) { - testenv.MustHaveExternalNetwork(t) testenv.MustHaveExecPath(t, "git") tg := testgo(t) defer tg.cleanup() tg.parallel() + tg.acquireNet() + tg.tempDir("src") tg.setenv("GOPATH", tg.path(".")) @@ -1395,12 +1440,13 @@ func TestDefaultGOPATH(t *testing.T) { } func TestDefaultGOPATHGet(t *testing.T) { - testenv.MustHaveExternalNetwork(t) testenv.MustHaveExecPath(t, "git") tg := testgo(t) defer tg.cleanup() tg.parallel() + tg.acquireNet() + tg.setenv("GOPATH", "") tg.tempDir("home") tg.setenv(homeEnvName(), tg.path("home")) diff --git a/src/cmd/go/internal/base/limit.go b/src/cmd/go/internal/base/limit.go new file mode 100644 index 00000000000000..b4160bde021c0d --- /dev/null +++ b/src/cmd/go/internal/base/limit.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package base + +import ( + "fmt" + "internal/godebug" + "runtime" + "strconv" + "sync" +) + +var NetLimitGodebug = godebug.New("#cmdgonetlimit") + +// NetLimit returns the limit on concurrent network operations +// configured by GODEBUG=cmdgonetlimit, if any. +// +// A limit of 0 (indicated by 0, true) means that network operations should not +// be allowed. +func NetLimit() (int, bool) { + netLimitOnce.Do(func() { + s := NetLimitGodebug.Value() + if s == "" { + return + } + + n, err := strconv.Atoi(s) + if err != nil { + Fatalf("invalid %s: %v", NetLimitGodebug.Name(), err) + } + if n < 0 { + // Treat negative values as unlimited. + return + } + netLimitSem = make(chan struct{}, n) + }) + + return cap(netLimitSem), netLimitSem != nil +} + +// AcquireNet acquires a semaphore token for a network operation. +func AcquireNet() (release func(), err error) { + hasToken := false + if n, ok := NetLimit(); ok { + if n == 0 { + return nil, fmt.Errorf("network disabled by %v=%v", NetLimitGodebug.Name(), NetLimitGodebug.Value()) + } + netLimitSem <- struct{}{} + hasToken = true + } + + checker := new(netTokenChecker) + runtime.SetFinalizer(checker, (*netTokenChecker).panicUnreleased) + + return func() { + if checker.released { + panic("internal error: net token released twice") + } + checker.released = true + if hasToken { + <-netLimitSem + } + runtime.SetFinalizer(checker, nil) + }, nil +} + +var ( + netLimitOnce sync.Once + netLimitSem chan struct{} +) + +type netTokenChecker struct { + released bool + // We want to use a finalizer to check that all acquired tokens are returned, + // so we arbitrarily pad the tokens with a string to defeat the runtime's + // “tiny allocator”. + unusedAvoidTinyAllocator string +} + +func (c *netTokenChecker) panicUnreleased() { + panic("internal error: net token acquired but not released") +} diff --git a/src/cmd/go/internal/modfetch/codehost/git.go b/src/cmd/go/internal/modfetch/codehost/git.go index 60ec616c696783..d1a18a8d58925e 100644 --- a/src/cmd/go/internal/modfetch/codehost/git.go +++ b/src/cmd/go/internal/modfetch/codehost/git.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "cmd/go/internal/base" "cmd/go/internal/lockedfile" "cmd/go/internal/par" "cmd/go/internal/web" @@ -241,7 +242,14 @@ func (r *gitRepo) loadRefs(ctx context.Context) (map[string]string, error) { // The git protocol sends all known refs and ls-remote filters them on the client side, // so we might as well record both heads and tags in one shot. // Most of the time we only care about tags but sometimes we care about heads too. + release, err := base.AcquireNet() + if err != nil { + r.refsErr = err + return + } out, gitErr := Run(ctx, r.dir, "git", "ls-remote", "-q", r.remote) + release() + if gitErr != nil { if rerr, ok := gitErr.(*RunError); ok { if bytes.Contains(rerr.Stderr, []byte("fatal: could not read Username")) { @@ -531,7 +539,14 @@ func (r *gitRepo) stat(ctx context.Context, rev string) (info *RevInfo, err erro ref = hash refspec = hash + ":refs/dummy" } - _, err := Run(ctx, r.dir, "git", "fetch", "-f", "--depth=1", r.remote, refspec) + + release, err := base.AcquireNet() + if err != nil { + return nil, err + } + _, err = Run(ctx, r.dir, "git", "fetch", "-f", "--depth=1", r.remote, refspec) + release() + if err == nil { return r.statLocal(ctx, rev, ref) } @@ -566,6 +581,12 @@ func (r *gitRepo) fetchRefsLocked(ctx context.Context) error { // golang.org/issue/34266 and // https://github.com/git/git/blob/4c86140027f4a0d2caaa3ab4bd8bfc5ce3c11c8a/transport.c#L1303-L1309.) + release, err := base.AcquireNet() + if err != nil { + return err + } + defer release() + if _, err := Run(ctx, r.dir, "git", "fetch", "-f", r.remote, "refs/heads/*:refs/heads/*", "refs/tags/*:refs/tags/*"); err != nil { return err } diff --git a/src/cmd/go/internal/modfetch/codehost/svn.go b/src/cmd/go/internal/modfetch/codehost/svn.go index fe5b74f71b42cd..9c1c10097bb437 100644 --- a/src/cmd/go/internal/modfetch/codehost/svn.go +++ b/src/cmd/go/internal/modfetch/codehost/svn.go @@ -15,6 +15,8 @@ import ( "path/filepath" "strconv" "time" + + "cmd/go/internal/base" ) func svnParseStat(rev, out string) (*RevInfo, error) { @@ -66,6 +68,10 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote remotePath += "/" + subdir } + release, err := base.AcquireNet() + if err != nil { + return err + } out, err := Run(ctx, workDir, []string{ "svn", "list", "--non-interactive", @@ -75,6 +81,7 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote "--revision", rev, "--", remotePath, }) + release() if err != nil { return err } @@ -98,6 +105,10 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote } defer os.RemoveAll(exportDir) // best-effort + release, err = base.AcquireNet() + if err != nil { + return err + } _, err = Run(ctx, workDir, []string{ "svn", "export", "--non-interactive", @@ -112,6 +123,7 @@ func svnReadZip(ctx context.Context, dst io.Writer, workDir, rev, subdir, remote "--", remotePath, exportDir, }) + release() if err != nil { return err } diff --git a/src/cmd/go/internal/modfetch/codehost/vcs.go b/src/cmd/go/internal/modfetch/codehost/vcs.go index 3c0c24a8914041..5bd100556b59c7 100644 --- a/src/cmd/go/internal/modfetch/codehost/vcs.go +++ b/src/cmd/go/internal/modfetch/codehost/vcs.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "cmd/go/internal/base" "cmd/go/internal/lockedfile" "cmd/go/internal/par" "cmd/go/internal/str" @@ -109,7 +110,14 @@ func newVCSRepo(ctx context.Context, vcs, remote string) (Repo, error) { defer unlock() if _, err := os.Stat(filepath.Join(r.dir, "."+vcs)); err != nil { - if _, err := Run(ctx, r.dir, cmd.init(r.remote)); err != nil { + release, err := base.AcquireNet() + if err != nil { + return nil, err + } + _, err = Run(ctx, r.dir, cmd.init(r.remote)) + release() + + if err != nil { os.RemoveAll(r.dir) return nil, err } @@ -355,7 +363,13 @@ func (r *vcsRepo) Stat(ctx context.Context, rev string) (*RevInfo, error) { func (r *vcsRepo) fetch(ctx context.Context) { if len(r.cmd.fetch) > 0 { + release, err := base.AcquireNet() + if err != nil { + r.fetchErr = err + return + } _, r.fetchErr = Run(ctx, r.dir, r.cmd.fetch) + release() } } diff --git a/src/cmd/go/internal/vcs/vcs.go b/src/cmd/go/internal/vcs/vcs.go index 2ef115da3160f2..c65dd0f624a1b6 100644 --- a/src/cmd/go/internal/vcs/vcs.go +++ b/src/cmd/go/internal/vcs/vcs.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "cmd/go/internal/base" "cmd/go/internal/cfg" "cmd/go/internal/search" "cmd/go/internal/str" @@ -719,12 +720,24 @@ func (v *Cmd) Ping(scheme, repo string) error { } os.MkdirAll(dir, 0777) // Ignore errors — if unsuccessful, the command will likely fail. + release, err := base.AcquireNet() + if err != nil { + return err + } + defer release() + return v.runVerboseOnly(dir, v.PingCmd, "scheme", scheme, "repo", repo) } // Create creates a new copy of repo in dir. // The parent of dir must exist; dir must not. func (v *Cmd) Create(dir, repo string) error { + release, err := base.AcquireNet() + if err != nil { + return err + } + defer release() + for _, cmd := range v.CreateCmd { if err := v.run(filepath.Dir(dir), cmd, "dir", dir, "repo", repo); err != nil { return err @@ -735,6 +748,12 @@ func (v *Cmd) Create(dir, repo string) error { // Download downloads any new changes for the repo in dir. func (v *Cmd) Download(dir string) error { + release, err := base.AcquireNet() + if err != nil { + return err + } + defer release() + for _, cmd := range v.DownloadCmd { if err := v.run(dir, cmd); err != nil { return err @@ -780,6 +799,12 @@ func (v *Cmd) TagSync(dir, tag string) error { } } + release, err := base.AcquireNet() + if err != nil { + return err + } + defer release() + if tag == "" && v.TagSyncDefault != nil { for _, cmd := range v.TagSyncDefault { if err := v.run(dir, cmd); err != nil { diff --git a/src/cmd/go/internal/web/http.go b/src/cmd/go/internal/web/http.go index e7935ea1842770..76b767c751e4e4 100644 --- a/src/cmd/go/internal/web/http.go +++ b/src/cmd/go/internal/web/http.go @@ -15,6 +15,7 @@ import ( "crypto/tls" "errors" "fmt" + "io" "mime" "net" "net/http" @@ -24,6 +25,7 @@ import ( "time" "cmd/go/internal/auth" + "cmd/go/internal/base" "cmd/go/internal/cfg" "cmd/internal/browser" ) @@ -193,6 +195,11 @@ func get(security SecurityMode, url *urlpkg.URL) (*Response, error) { req.URL.Host = t.ToHost } + release, err := base.AcquireNet() + if err != nil { + return nil, nil, err + } + var res *http.Response if security == Insecure && url.Scheme == "https" { // fail earlier res, err = impatientInsecureHTTPClient.Do(req) @@ -204,6 +211,17 @@ func get(security SecurityMode, url *urlpkg.URL) (*Response, error) { res, err = securityPreservingDefaultClient.Do(req) } } + + if res == nil || res.Body == nil { + release() + } else { + body := res.Body + res.Body = hookCloser{ + ReadCloser: body, + afterClose: release, + } + } + return url, res, err } @@ -358,3 +376,14 @@ func isLocalHost(u *urlpkg.URL) bool { } return false } + +type hookCloser struct { + io.ReadCloser + afterClose func() +} + +func (c hookCloser) Close() error { + err := c.ReadCloser.Close() + c.afterClose() + return err +} diff --git a/src/cmd/go/script_test.go b/src/cmd/go/script_test.go index 072a2dfef547fa..e21e57002b311d 100644 --- a/src/cmd/go/script_test.go +++ b/src/cmd/go/script_test.go @@ -116,7 +116,7 @@ func TestScript(t *testing.T) { defer removeAll(workdir) } - s, err := script.NewState(ctx, workdir, env) + s, err := script.NewState(tbContext(ctx, t), workdir, env) if err != nil { t.Fatal(err) } @@ -156,6 +156,23 @@ func TestScript(t *testing.T) { } } +// testingTBKey is the Context key for a testing.TB. +type testingTBKey struct{} + +// tbContext returns a Context derived from ctx and associated with t. +func tbContext(ctx context.Context, t testing.TB) context.Context { + return context.WithValue(ctx, testingTBKey{}, t) +} + +// tbFromContext returns the testing.TB associated with ctx, if any. +func tbFromContext(ctx context.Context) (testing.TB, bool) { + t := ctx.Value(testingTBKey{}) + if t == nil { + return nil, false + } + return t.(testing.TB), true +} + // initScriptState creates the initial directory structure in s for unpacking a // cmd/go script. func initScriptDirs(t testing.TB, s *script.State) { @@ -216,6 +233,7 @@ func scriptEnv(srv *vcstest.Server, srvCertFile string) ([]string, error) { "TESTGO_VCSTEST_HOST=" + httpURL.Host, "TESTGO_VCSTEST_TLS_HOST=" + httpsURL.Host, "TESTGO_VCSTEST_CERT=" + srvCertFile, + "TESTGONETWORK=panic", // cleared by the [net] condition "GOSUMDB=" + testSumDBVerifierKey, "GONOPROXY=", "GONOSUMDB=", @@ -239,6 +257,7 @@ func scriptEnv(srv *vcstest.Server, srvCertFile string) ([]string, error) { // Require all tests that use VCS commands to be skipped in short mode. env = append(env, "TESTGOVCS=panic") } + if os.Getenv("CGO_ENABLED") != "" || runtime.GOOS != goHostOS || runtime.GOARCH != goHostArch { // If the actual CGO_ENABLED might not match the cmd/go default, set it // explicitly in the environment. Otherwise, leave it unset so that we also diff --git a/src/cmd/go/scriptconds_test.go b/src/cmd/go/scriptconds_test.go index 3c893eeae56138..641f69f312bf1f 100644 --- a/src/cmd/go/scriptconds_test.go +++ b/src/cmd/go/scriptconds_test.go @@ -19,6 +19,7 @@ import ( "runtime" "runtime/debug" "strings" + "sync" ) func scriptConditions() map[string]script.Cond { @@ -107,6 +108,8 @@ func hasBuildmode(s *script.State, mode string) (bool, error) { return platform.BuildModeSupported(runtime.Compiler, mode, GOOS, GOARCH), nil } +var scriptNetEnabled sync.Map // testing.TB → already enabled + func hasNet(s *script.State, host string) (bool, error) { if !testenv.HasExternalNetwork() { return false, nil @@ -115,6 +118,26 @@ func hasNet(s *script.State, host string) (bool, error) { // TODO(bcmills): Add a flag or environment variable to allow skipping tests // for specific hosts and/or skipping all net tests except for specific hosts. + t, ok := tbFromContext(s.Context()) + if !ok { + return false, errors.New("script Context unexpectedly missing testing.TB key") + } + + if netTestSem != nil { + // When the number of external network connections is limited, we limit the + // number of net tests that can run concurrently so that the overall number + // of network connections won't exceed the limit. + _, dup := scriptNetEnabled.LoadOrStore(t, true) + if !dup { + // Acquire a net token for this test until the test completes. + netTestSem <- struct{}{} + t.Cleanup(func() { + <-netTestSem + scriptNetEnabled.Delete(t) + }) + } + } + // Since we have confirmed that the network is available, // allow cmd/go to use it. s.Setenv("TESTGONETWORK", "")