diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 1470e086b77bc..99d5c9748d2cb 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -726,11 +726,19 @@ func (f *Forwarder) writeResponseErrorToBody(rw http.ResponseWriter, respErr err func (f *Forwarder) formatStatusResponseError(rw http.ResponseWriter, respErr error) { // This detects failed requests that were terminated by the server due to GOAWAY. There // is no direct way to detect these errors. No exported constants or error types exist from the - // standard library, see https://github.com/golang/net/blob/5ac9daca088ab4f378d7df849f6c7d28bea86071/http2/transport.go#L694. + // standard library, so we have to match on the error message. The two error strings come from: + // - golang.org/x/net/http2 when its internal retry path cannot replay the body: + // https://github.com/golang/net/blob/5ac9daca088ab4f378d7df849f6c7d28bea86071/http2/transport.go#L694 + // - net/http (errCannotRewind) when, after the http2 conn pool is drained, the http1 retry + // path tries to rewind the body and fails because Request.GetBody is unset: + // https://github.com/golang/go/blob/go1.26.2/src/net/http/transport.go#L759 // When a failed request is found, we return a response that indicates to clients that they // should retry the request themselves. - if errString := respErr.Error(); strings.HasSuffix(errString, `after Request.Body was written; define Request.GetBody to avoid this error`) && - strings.Contains(errString, `http2: Transport: cannot retry err`) { + errString := respErr.Error() + isHTTP2RetryErr := strings.Contains(errString, `http2: Transport: cannot retry err`) && + strings.HasSuffix(errString, `after Request.Body was written; define Request.GetBody to avoid this error`) + isHTTP1RewindErr := strings.Contains(errString, `net/http: cannot rewind body after connection loss`) + if isHTTP2RetryErr || isHTTP1RewindErr { data, err := runtime.Encode(globalKubeCodecs.LegacyCodec(), &kubeerrors.NewTooManyRequests("Connection closed by upstream Kubernetes server", 1).ErrStatus) if err != nil { diff --git a/lib/kube/proxy/forwarder_test.go b/lib/kube/proxy/forwarder_test.go index 6d5c6ef48d785..be4e2e7b6a9e6 100644 --- a/lib/kube/proxy/forwarder_test.go +++ b/lib/kube/proxy/forwarder_test.go @@ -36,6 +36,7 @@ import ( "os" "slices" "sort" + "sync" "sync/atomic" "testing" "time" @@ -1753,6 +1754,98 @@ func TestForwarderTLSConfigCAs(t *testing.T) { require.True(t, getConnTLSRootsCalled) } +// errRoundTripper is a stub [http.RoundTripper] that drains the request body +// and returns a configured error. It mimics the behavior of a real upstream +// transport that has read the body before failing, leaving it unrewindable. +type errRoundTripper struct { + err error +} + +func (e *errRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + _ = req.Body.Close() + } + return nil, e.err +} + +// TestKubeForwarder_GOAWAYErrors verifies that errors emitted by the upstream transport +// when an HTTP/2 GOAWAY interrupts an in-flight request with an +// unrewindable body are translated into a 429 response with Retry-After: 1, so kube clients retry the request. +// This covers both the http2 transport's "cannot retry" error and the net/http transport's "cannot rewind body" error. +// (see formatStatusResponseError) +func TestKubeForwarder_GOAWAYErrors(t *testing.T) { + tests := []struct { + name string + err error + }{ + { + // Returned by golang.org/x/net/http2 when its internal retry path + // cannot replay the request body after a GOAWAY. + name: "http2 cannot retry", + err: errors.New("http2: Transport: cannot retry err [http2: Transport received Server's graceful shutdown GOAWAY] after Request.Body was written; define Request.GetBody to avoid this error"), + }, + { + // Returned by net/http when, after the http2 conn pool is drained + // by GOAWAY, the http1 retry path tries to rewind the body and + // fails because Request.GetBody is unset. See errCannotRewind in + // net/http/transport.go. + name: "net/http cannot rewind body", + err: errors.New("net/http: cannot rewind body after connection loss"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + f := newMockForwarder(ctx, t) + + // Plug a stub round tripper that returns the GOAWAY-related error + // into a fake Kubernetes cluster, so the kube proxy's full + // error-handling pipeline runs against it. + f.clusterDetails = map[string]*kubeDetails{ + "kube-cluster": { + kubeCreds: &staticKubeCreds{ + targetAddr: "kube.invalid:443", + tlsConfig: &tls.Config{InsecureSkipVerify: true}, + transport: &errRoundTripper{err: tt.err}, + }, + }, + } + + authCtx := mockAuthCtx(t, "kube-cluster", false) + sess, err := f.newClusterSession(ctx, authCtx) + require.NoError(t, err) + t.Cleanup(sess.close) + + fwd, err := f.makeSessionForwarder(sess) + require.NoError(t, err) + + forwarderServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.URL, err = url.Parse("https://kube.invalid") + require.NoError(t, err) + fwd.ServeHTTP(w, r) + })) + t.Cleanup(forwarderServer.Close) + + body := bytes.NewBuffer([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + req, err := http.NewRequest("POST", forwarderServer.URL, body) + require.NoError(t, err) + resp, err := forwarderServer.Client().Do(req) + require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, resp.Body.Close()) }) + + require.Equal(t, http.StatusTooManyRequests, resp.StatusCode) + require.Equal(t, "1", resp.Header.Get("Retry-After")) + + var status metav1.Status + require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) + require.Equal(t, metav1.StatusReasonTooManyRequests, status.Reason) + }) + } +} + func TestGOAWAYHandling(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) t.Cleanup(cancel) @@ -1827,6 +1920,100 @@ func TestGOAWAYHandling(t *testing.T) { require.Equal(t, metav1.StatusReasonTooManyRequests, status.Reason) } +// TestGOAWAYHandling_Concurrent exercises the production upstream transport configuration +// ([newH2Transport]: net/http.Transport upgraded with http2.ConfigureTransport) +// against the fake [goawayServer], with many concurrent requests. +// Without the rewind-body error translation in [Forwarder.formatStatusResponseError], +// a portion of the requests bubble up the net/http rewind-body error to clients. +// (see https://github.com/gravitational/teleport/issues/65611) +// +// Concurrency surfaces other GOAWAY-related transport errors that this PR does not translate such as: +// broken pipe, force-closed conns, reverseproxy invalid-read. +// The assertion is therefore narrow: the rewind-body error string must never reach a client. +// Other 500 responses are tolerated. +func TestGOAWAYHandling_Concurrent(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + f := newMockForwarder(ctx, t) + + cert, err := tls.X509KeyPair(fixtures.LocalhostCert, fixtures.LocalhostKey) + require.NoError(t, err) + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + gs := goawayServer{ + listener: ln, + tlsConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{http2.NextProtoTLS}, + }, + } + t.Cleanup(func() { require.NoError(t, gs.Close()) }) + go func() { _ = gs.Serve() }() + + tlsCfg := &tls.Config{InsecureSkipVerify: true} + prodTransport, err := newH2Transport(tlsCfg, nil) + require.NoError(t, err) + + f.clusterDetails = map[string]*kubeDetails{ + "kube-cluster": { + kubeCreds: &staticKubeCreds{ + targetAddr: gs.URL(), + tlsConfig: tlsCfg, + transport: prodTransport, + }, + }, + } + + authCtx := mockAuthCtx(t, "kube-cluster", false) + sess, err := f.newClusterSession(ctx, authCtx) + require.NoError(t, err) + t.Cleanup(sess.close) + + fwd, err := f.makeSessionForwarder(sess) + require.NoError(t, err) + + forwarderServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + u, err := url.Parse(gs.URL()) + require.NoError(t, err) + r.URL = u + fwd.ServeHTTP(w, r) + })) + t.Cleanup(forwarderServer.Close) + + const concurrency = 50 + reqCtx, reqCancel := context.WithTimeout(ctx, 10*time.Second) + t.Cleanup(reqCancel) + + var rewindLeaks atomic.Uint32 + var wg sync.WaitGroup + wg.Add(concurrency) + for range concurrency { + go func() { + defer wg.Done() + body := bytes.NewBuffer(make([]byte, 64*1024)) + req, err := http.NewRequestWithContext(reqCtx, "POST", forwarderServer.URL, body) + assert.NoError(t, err) + resp, err := forwarderServer.Client().Do(req) + if err != nil { + return // some requests legitimately fail at the network layer under GOAWAY storms; not what this test asserts. + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusTooManyRequests { + return + } + bodyBytes, _ := io.ReadAll(resp.Body) + if bytes.Contains(bodyBytes, []byte("cannot rewind body after connection loss")) { + rewindLeaks.Add(1) + } + }() + } + wg.Wait() + + require.Zero(t, rewindLeaks.Load(), "rewind-body error must never reach the client") +} + // goawayServer is a fake [http2.Server] that terminates all received client // connections in the same manner that a Kubernetes API Server would if // it closed the connection as a result of the GOAWAY chance being exceeded. diff --git a/lib/kube/proxy/url.go b/lib/kube/proxy/url.go index 6b50c7427fccf..32d5d755a4b61 100644 --- a/lib/kube/proxy/url.go +++ b/lib/kube/proxy/url.go @@ -256,7 +256,6 @@ func getResourceFromRequest(req *http.Request, kubeDetails *kubeDetails) (*types // It reads the full body - required because data can be proto encoded - // and decodes it into a Kubernetes object. It then extracts the resource name // from the object. -// The body is then reset to the original request body using a new buffer. func extractResourceNameFromPostRequest( req *http.Request, codecs *serializer.CodecFactory, @@ -282,9 +281,20 @@ func extractResourceNameFromPostRequest( if err := req.Body.Close(); err != nil { return "", trace.Wrap(err) } - req.Body = io.NopCloser(newBody) + + // The body is replaced with a replayable reader, and [http.Request.GetBody] is + // set so the upstream transport can retry the request after a GOAWAY without + // failing on the unrewindable network-side body. + // See https://github.com/gravitational/teleport/issues/65611 + bodyBytes := newBody.Bytes() + req.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + req.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(bodyBytes)), nil + } + req.ContentLength = int64(len(bodyBytes)) + // decode memory rw body. - obj, err := decodeAndSetGVK(decoder, newBody.Bytes(), defaults) + obj, err := decodeAndSetGVK(decoder, bodyBytes, defaults) if err != nil { return "", trace.Wrap(err) } diff --git a/lib/kube/proxy/url_test.go b/lib/kube/proxy/url_test.go index 64bd96ed72375..f88d8addee174 100644 --- a/lib/kube/proxy/url_test.go +++ b/lib/kube/proxy/url_test.go @@ -32,6 +32,46 @@ import ( "github.com/gravitational/teleport/api/types" ) +// TestExtractResourceNameFromPostRequest_Replayable verifies that +// extractResourceNameFromPostRequest sets a working [http.Request.GetBody] +// and ContentLength so the upstream transport can retry the request after a +// GOAWAY without hitting the unrewindable network-side body. See #65611. +func TestExtractResourceNameFromPostRequest_Replayable(t *testing.T) { + bodyJSON := []byte(`{"kind":"Pod","apiVersion":"v1","metadata":{"name":"test-pod","namespace":"default"}}`) + + req, err := http.NewRequest(http.MethodPost, "/api/v1/namespaces/default/pods", strings.NewReader(string(bodyJSON))) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + + // Simulate a request received over the network: stdlib only auto-sets + // GetBody/ContentLength for in-process body types like *strings.Reader. + // Server-side requests carry an opaque io.ReadCloser with neither set. + req.GetBody = nil + req.ContentLength = 0 + + name, err := extractResourceNameFromPostRequest(req, &globalKubeCodecs, &schema.GroupVersionKind{Version: "v1", Kind: "Pod"}) + require.NoError(t, err) + require.Equal(t, "test-pod", name) + + require.Equal(t, int64(len(bodyJSON)), req.ContentLength, "ContentLength must match the buffered body") + require.NotNil(t, req.GetBody, "GetBody must be set so the transport can replay the request on GOAWAY") + + // Drain the body to simulate the transport sending the request once. + drained, err := io.ReadAll(req.Body) + require.NoError(t, err) + require.Equal(t, bodyJSON, drained) + + // GetBody must return a fresh reader yielding the same bytes, twice in a row. + for i := range 2 { + replay, err := req.GetBody() + require.NoError(t, err, "GetBody call %d", i) + got, err := io.ReadAll(replay) + require.NoError(t, err) + require.Equal(t, bodyJSON, got, "GetBody call %d must yield the original body", i) + require.NoError(t, replay.Close()) + } +} + func TestParseResourcePath(t *testing.T) { tests := []struct { path string