diff --git a/lib/kube/proxy/exec_test.go b/lib/kube/proxy/exec_test.go index 3aca4f095b157..ef3a019f2b212 100644 --- a/lib/kube/proxy/exec_test.go +++ b/lib/kube/proxy/exec_test.go @@ -485,7 +485,7 @@ func TestExecWebsocketEndToEndErrReturn(t *testing.T) { ) require.NoError(t, err) t.Cleanup(func() { - require.EqualValues(t, 0, kubeMock.KubeExecRequests.SPDY.Load(), "expected no SPDY requests") + require.EqualValues(t, 2, kubeMock.KubeExecRequests.SPDY.Load(), "expected no SPDY requests") require.EqualValues(t, 2, kubeMock.KubeExecRequests.Websocket.Load(), "expected one websocket request") kubeMock.Close() }) @@ -594,8 +594,6 @@ func TestExecWebsocketEndToEndErrReturn(t *testing.T) { require.Equal(t, "403", execEvent.ExitCode) require.NotEmpty(t, execEvent.Error) eventsLock.Unlock() - }) } - } diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 3ebd5edb505f5..7be267b48ec24 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -2174,29 +2174,27 @@ func isRelevantWebsocketError(err error) bool { } func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { - isWSSupported := false - if !sess.isLocalKubernetesCluster { - // We're forwarding it to another Teleport kube_service, - // which supports the websocket protocol. - isWSSupported = true - } else { - // We're accessing the Kubernetes cluster directly, check if it is version that supports new protocol. - f.rwMutexDetails.RLock() - if details, ok := f.clusterDetails[sess.kubeClusterName]; ok { - details.rwMu.RLock() - isWSSupported = kubernetesSupportsExecSubprotocolV5(details.kubeClusterVersion) - details.rwMu.RUnlock() - } - f.rwMutexDetails.RUnlock() - } - - if isWSSupported { - wsExec, err := f.getWebsocketExecutor(sess, req) - return wsExec, trace.Wrap(err) + wsExec, err := f.getWebsocketExecutor(sess, req) + if err != nil { + return nil, trace.Wrap(err, "unable to create websocket executor") } - spdyExec, err := f.getSPDYExecutor(sess, req) - return spdyExec, trace.Wrap(err) + if err != nil { + return nil, trace.Wrap(err, "unable to create spdy executor") + } + return remotecommand.NewFallbackExecutor( + wsExec, + spdyExec, + func(err error) bool { + // If the error is a known upgrade failure, we can retry with the other protocol. + result := httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err) || kubeerrors.IsForbidden(err) || isTeleportUpgradeFailure(err) + if result { + // If the error is a known upgrade failure, we can retry with the other protocol. + // To do that, we need to reset the connection monitor context. + sess.connCtx, sess.connMonitorCancel = context.WithCancelCause(req.Context()) + } + return result + }) } func (f *Forwarder) getSPDYExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { diff --git a/lib/kube/proxy/roundtrip_websocket.go b/lib/kube/proxy/roundtrip_websocket.go index 832d51c1c9cb6..ff80b28cfef00 100644 --- a/lib/kube/proxy/roundtrip_websocket.go +++ b/lib/kube/proxy/roundtrip_websocket.go @@ -26,8 +26,6 @@ import ( "github.com/gravitational/trace" "k8s.io/apimachinery/pkg/util/httpstream" utilnet "k8s.io/apimachinery/pkg/util/net" - versionUtil "k8s.io/apimachinery/pkg/util/version" - "k8s.io/apimachinery/pkg/version" kwebsocket "k8s.io/client-go/transport/websocket" "github.com/gravitational/teleport/lib/auth" @@ -112,21 +110,3 @@ func (w *WebsocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, er return wsResp, nil } - -var kubeExecSubprotocolV5MinVersion = func() *versionUtil.Version { - const kubeExecSubprotocolV5Version = "v1.30.0" - return versionUtil.MustParse(kubeExecSubprotocolV5Version) -}() - -func kubernetesSupportsExecSubprotocolV5(serverVersion *version.Info) bool { - if serverVersion == nil { - return false - } - - parsedVersion, err := versionUtil.ParseSemantic(serverVersion.GitVersion) - if err != nil { - return false - } - - return parsedVersion.AtLeast(kubeExecSubprotocolV5MinVersion) -}