Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,11 +726,19 @@ func (f *Forwarder) writeResponseErrorToBody(rw http.ResponseWriter, respErr err
func (f *Forwarder) formatStatusResponseError(rw http.ResponseWriter, respErr error) {
// This detects failed requests that were terminated by the server due to GOAWAY. There
// is no direct way to detect these errors. No exported constants or error types exist from the
// standard library, see https://github.com/golang/net/blob/5ac9daca088ab4f378d7df849f6c7d28bea86071/http2/transport.go#L694.
// standard library, so we have to match on the error message. The two error strings come from:
// - golang.org/x/net/http2 when its internal retry path cannot replay the body:
// https://github.com/golang/net/blob/5ac9daca088ab4f378d7df849f6c7d28bea86071/http2/transport.go#L694
// - net/http (errCannotRewind) when, after the http2 conn pool is drained, the http1 retry
// path tries to rewind the body and fails because Request.GetBody is unset:
// https://github.com/golang/go/blob/go1.26.2/src/net/http/transport.go#L759
// When a failed request is found, we return a response that indicates to clients that they
// should retry the request themselves.
if errString := respErr.Error(); strings.HasSuffix(errString, `after Request.Body was written; define Request.GetBody to avoid this error`) &&
strings.Contains(errString, `http2: Transport: cannot retry err`) {
errString := respErr.Error()
isHTTP2RetryErr := strings.Contains(errString, `http2: Transport: cannot retry err`) &&
strings.HasSuffix(errString, `after Request.Body was written; define Request.GetBody to avoid this error`)
isHTTP1RewindErr := strings.Contains(errString, `net/http: cannot rewind body after connection loss`)
if isHTTP2RetryErr || isHTTP1RewindErr {

data, err := runtime.Encode(globalKubeCodecs.LegacyCodec(), &kubeerrors.NewTooManyRequests("Connection closed by upstream Kubernetes server", 1).ErrStatus)
if err != nil {
Expand Down
187 changes: 187 additions & 0 deletions lib/kube/proxy/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"os"
"slices"
"sort"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1753,6 +1754,98 @@ func TestForwarderTLSConfigCAs(t *testing.T) {
require.True(t, getConnTLSRootsCalled)
}

// errRoundTripper is a stub [http.RoundTripper] that drains the request body
// and returns a configured error. It mimics the behavior of a real upstream
// transport that has read the body before failing, leaving it unrewindable.
type errRoundTripper struct {
err error
}

func (e *errRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if req.Body != nil {
_, _ = io.Copy(io.Discard, req.Body)
_ = req.Body.Close()
}
return nil, e.err
}

// TestKubeForwarder_GOAWAYErrors verifies that errors emitted by the upstream transport
// when an HTTP/2 GOAWAY interrupts an in-flight request with an
// unrewindable body are translated into a 429 response with Retry-After: 1, so kube clients retry the request.
// This covers both the http2 transport's "cannot retry" error and the net/http transport's "cannot rewind body" error.
// (see formatStatusResponseError)
func TestKubeForwarder_GOAWAYErrors(t *testing.T) {
tests := []struct {
name string
err error
}{
{
// Returned by golang.org/x/net/http2 when its internal retry path
// cannot replay the request body after a GOAWAY.
name: "http2 cannot retry",
err: errors.New("http2: Transport: cannot retry err [http2: Transport received Server's graceful shutdown GOAWAY] after Request.Body was written; define Request.GetBody to avoid this error"),
},
{
// Returned by net/http when, after the http2 conn pool is drained
// by GOAWAY, the http1 retry path tries to rewind the body and
// fails because Request.GetBody is unset. See errCannotRewind in
// net/http/transport.go.
name: "net/http cannot rewind body",
err: errors.New("net/http: cannot rewind body after connection loss"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)
f := newMockForwarder(ctx, t)

// Plug a stub round tripper that returns the GOAWAY-related error
// into a fake Kubernetes cluster, so the kube proxy's full
// error-handling pipeline runs against it.
f.clusterDetails = map[string]*kubeDetails{
"kube-cluster": {
kubeCreds: &staticKubeCreds{
targetAddr: "kube.invalid:443",
tlsConfig: &tls.Config{InsecureSkipVerify: true},
transport: &errRoundTripper{err: tt.err},
},
},
}

authCtx := mockAuthCtx(t, "kube-cluster", false)
sess, err := f.newClusterSession(ctx, authCtx)
require.NoError(t, err)
t.Cleanup(sess.close)

fwd, err := f.makeSessionForwarder(sess)
require.NoError(t, err)

forwarderServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL, err = url.Parse("https://kube.invalid")
require.NoError(t, err)
fwd.ServeHTTP(w, r)
}))
t.Cleanup(forwarderServer.Close)

body := bytes.NewBuffer([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
req, err := http.NewRequest("POST", forwarderServer.URL, body)
require.NoError(t, err)
resp, err := forwarderServer.Client().Do(req)
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, resp.Body.Close()) })

require.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
require.Equal(t, "1", resp.Header.Get("Retry-After"))

var status metav1.Status
require.NoError(t, json.NewDecoder(resp.Body).Decode(&status))
require.Equal(t, metav1.StatusReasonTooManyRequests, status.Reason)
})
}
}

func TestGOAWAYHandling(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)
Expand Down Expand Up @@ -1827,6 +1920,100 @@ func TestGOAWAYHandling(t *testing.T) {
require.Equal(t, metav1.StatusReasonTooManyRequests, status.Reason)
}

// TestGOAWAYHandling_Concurrent exercises the production upstream transport configuration
// ([newH2Transport]: net/http.Transport upgraded with http2.ConfigureTransport)
// against the fake [goawayServer], with many concurrent requests.
// Without the rewind-body error translation in [Forwarder.formatStatusResponseError],
// a portion of the requests bubble up the net/http rewind-body error to clients.
// (see https://github.com/gravitational/teleport/issues/65611)
//
// Concurrency surfaces other GOAWAY-related transport errors that this PR does not translate such as:
// broken pipe, force-closed conns, reverseproxy invalid-read.
// The assertion is therefore narrow: the rewind-body error string must never reach a client.
// Other 500 responses are tolerated.
func TestGOAWAYHandling_Concurrent(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)
f := newMockForwarder(ctx, t)

cert, err := tls.X509KeyPair(fixtures.LocalhostCert, fixtures.LocalhostKey)
require.NoError(t, err)

ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

gs := goawayServer{
listener: ln,
tlsConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
NextProtos: []string{http2.NextProtoTLS},
},
}
t.Cleanup(func() { require.NoError(t, gs.Close()) })
go func() { _ = gs.Serve() }()

tlsCfg := &tls.Config{InsecureSkipVerify: true}
prodTransport, err := newH2Transport(tlsCfg, nil)
require.NoError(t, err)

f.clusterDetails = map[string]*kubeDetails{
"kube-cluster": {
kubeCreds: &staticKubeCreds{
targetAddr: gs.URL(),
tlsConfig: tlsCfg,
transport: prodTransport,
},
},
}

authCtx := mockAuthCtx(t, "kube-cluster", false)
sess, err := f.newClusterSession(ctx, authCtx)
require.NoError(t, err)
t.Cleanup(sess.close)

fwd, err := f.makeSessionForwarder(sess)
require.NoError(t, err)

forwarderServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u, err := url.Parse(gs.URL())
require.NoError(t, err)
r.URL = u
fwd.ServeHTTP(w, r)
}))
t.Cleanup(forwarderServer.Close)

const concurrency = 50
reqCtx, reqCancel := context.WithTimeout(ctx, 10*time.Second)
t.Cleanup(reqCancel)

var rewindLeaks atomic.Uint32
var wg sync.WaitGroup
wg.Add(concurrency)
for range concurrency {
go func() {
defer wg.Done()
body := bytes.NewBuffer(make([]byte, 64*1024))
req, err := http.NewRequestWithContext(reqCtx, "POST", forwarderServer.URL, body)
assert.NoError(t, err)
resp, err := forwarderServer.Client().Do(req)
if err != nil {
return // some requests legitimately fail at the network layer under GOAWAY storms; not what this test asserts.
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusTooManyRequests {
return
}
bodyBytes, _ := io.ReadAll(resp.Body)
if bytes.Contains(bodyBytes, []byte("cannot rewind body after connection loss")) {
rewindLeaks.Add(1)
}
}()
}
wg.Wait()

require.Zero(t, rewindLeaks.Load(), "rewind-body error must never reach the client")
}

// goawayServer is a fake [http2.Server] that terminates all received client
// connections in the same manner that a Kubernetes API Server would if
// it closed the connection as a result of the GOAWAY chance being exceeded.
Expand Down
16 changes: 13 additions & 3 deletions lib/kube/proxy/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ func getResourceFromRequest(req *http.Request, kubeDetails *kubeDetails) (*types
// It reads the full body - required because data can be proto encoded -
// and decodes it into a Kubernetes object. It then extracts the resource name
// from the object.
// The body is then reset to the original request body using a new buffer.
func extractResourceNameFromPostRequest(
req *http.Request,
codecs *serializer.CodecFactory,
Expand All @@ -282,9 +281,20 @@ func extractResourceNameFromPostRequest(
if err := req.Body.Close(); err != nil {
return "", trace.Wrap(err)
}
req.Body = io.NopCloser(newBody)

// The body is replaced with a replayable reader, and [http.Request.GetBody] is
// set so the upstream transport can retry the request after a GOAWAY without
// failing on the unrewindable network-side body.
// See https://github.com/gravitational/teleport/issues/65611
bodyBytes := newBody.Bytes()
req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
req.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(bodyBytes)), nil
}
req.ContentLength = int64(len(bodyBytes))

// decode memory rw body.
obj, err := decodeAndSetGVK(decoder, newBody.Bytes(), defaults)
obj, err := decodeAndSetGVK(decoder, bodyBytes, defaults)
if err != nil {
return "", trace.Wrap(err)
}
Expand Down
40 changes: 40 additions & 0 deletions lib/kube/proxy/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,46 @@ import (
"github.com/gravitational/teleport/api/types"
)

// TestExtractResourceNameFromPostRequest_Replayable verifies that
// extractResourceNameFromPostRequest sets a working [http.Request.GetBody]
// and ContentLength so the upstream transport can retry the request after a
// GOAWAY without hitting the unrewindable network-side body. See #65611.
func TestExtractResourceNameFromPostRequest_Replayable(t *testing.T) {
bodyJSON := []byte(`{"kind":"Pod","apiVersion":"v1","metadata":{"name":"test-pod","namespace":"default"}}`)

req, err := http.NewRequest(http.MethodPost, "/api/v1/namespaces/default/pods", strings.NewReader(string(bodyJSON)))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")

// Simulate a request received over the network: stdlib only auto-sets
// GetBody/ContentLength for in-process body types like *strings.Reader.
// Server-side requests carry an opaque io.ReadCloser with neither set.
req.GetBody = nil
req.ContentLength = 0

name, err := extractResourceNameFromPostRequest(req, &globalKubeCodecs, &schema.GroupVersionKind{Version: "v1", Kind: "Pod"})
require.NoError(t, err)
require.Equal(t, "test-pod", name)

require.Equal(t, int64(len(bodyJSON)), req.ContentLength, "ContentLength must match the buffered body")
require.NotNil(t, req.GetBody, "GetBody must be set so the transport can replay the request on GOAWAY")

// Drain the body to simulate the transport sending the request once.
drained, err := io.ReadAll(req.Body)
require.NoError(t, err)
require.Equal(t, bodyJSON, drained)

// GetBody must return a fresh reader yielding the same bytes, twice in a row.
for i := range 2 {
replay, err := req.GetBody()
require.NoError(t, err, "GetBody call %d", i)
got, err := io.ReadAll(replay)
require.NoError(t, err)
require.Equal(t, bodyJSON, got, "GetBody call %d must yield the original body", i)
require.NoError(t, replay.Close())
}
}

func TestParseResourcePath(t *testing.T) {
tests := []struct {
path string
Expand Down
Loading