diff --git a/lib/kube/proxy/exec_test.go b/lib/kube/proxy/exec_test.go index 3aca4f095b157..ef3a019f2b212 100644 --- a/lib/kube/proxy/exec_test.go +++ b/lib/kube/proxy/exec_test.go @@ -485,7 +485,7 @@ func TestExecWebsocketEndToEndErrReturn(t *testing.T) { ) require.NoError(t, err) t.Cleanup(func() { - require.EqualValues(t, 0, kubeMock.KubeExecRequests.SPDY.Load(), "expected no SPDY requests") + require.EqualValues(t, 2, kubeMock.KubeExecRequests.SPDY.Load(), "expected no SPDY requests") require.EqualValues(t, 2, kubeMock.KubeExecRequests.Websocket.Load(), "expected one websocket request") kubeMock.Close() }) @@ -594,8 +594,6 @@ func TestExecWebsocketEndToEndErrReturn(t *testing.T) { require.Equal(t, "403", execEvent.ExitCode) require.NotEmpty(t, execEvent.Error) eventsLock.Unlock() - }) } - } diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index b679a5d406d3b..617ec61aac882 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -2150,28 +2150,27 @@ func isRelevantWebsocketError(err error) bool { } func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { - isWSSupported := false - if !sess.isLocalKubernetesCluster { - // We're forwarding it to another kube_service, check if it supports new protocol. - isWSSupported = f.allServersSupportExecSubprotocolV5(sess) - } else { - // We're accessing the Kubernetes cluster directly, check if it is version that supports new protocol. - f.rwMutexDetails.RLock() - if details, ok := f.clusterDetails[sess.kubeClusterName]; ok { - details.rwMu.RLock() - isWSSupported = kubernetesSupportsExecSubprotocolV5(details.kubeClusterVersion) - details.rwMu.RUnlock() - } - f.rwMutexDetails.RUnlock() - } - - if isWSSupported { - wsExec, err := f.getWebsocketExecutor(sess, req) - return wsExec, trace.Wrap(err) + wsExec, err := f.getWebsocketExecutor(sess, req) + if err != nil { + return nil, trace.Wrap(err, "unable to create websocket executor") } - spdyExec, err := f.getSPDYExecutor(sess, req) - return spdyExec, trace.Wrap(err) + if err != nil { + return nil, trace.Wrap(err, "unable to create spdy executor") + } + return remotecommand.NewFallbackExecutor( + wsExec, + spdyExec, + func(err error) bool { + // If the error is a known upgrade failure, we can retry with the other protocol. + result := httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err) || kubeerrors.IsForbidden(err) || isTeleportUpgradeFailure(err) + if result { + // If the error is a known upgrade failure, we can retry with the other protocol. + // To do that, we need to reset the connection monitor context. + sess.connCtx, sess.connMonitorCancel = context.WithCancelCause(req.Context()) + } + return result + }) } func (f *Forwarder) getSPDYExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { diff --git a/lib/kube/proxy/roundtrip.go b/lib/kube/proxy/roundtrip.go index 7f4d48ba1ca05..bdc557bee4d0b 100644 --- a/lib/kube/proxy/roundtrip.go +++ b/lib/kube/proxy/roundtrip.go @@ -21,6 +21,7 @@ import ( "bytes" "context" "crypto/tls" + "errors" "fmt" "io" "net" @@ -288,11 +289,35 @@ func extractKubeAPIStatusFromReq(rsp *http.Response) error { } else { if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil { if status, ok := obj.(*metav1.Status); ok { - return &apierrors.StatusError{ErrStatus: *status} + return &upgradeFailureError{Cause: &apierrors.StatusError{ErrStatus: *status}} } } responseError = string(responseErrorBytes) responseError = strings.TrimSpace(responseError) } - return fmt.Errorf("unable to upgrade connection: %s", responseError) + return &upgradeFailureError{Cause: fmt.Errorf("unable to upgrade connection: %s", responseError)} +} + +func isTeleportUpgradeFailure(err error) bool { + var upgradeErr *upgradeFailureError + return errors.As(err, &upgradeErr) +} + +// upgradeFailureError encapsulates the cause for why the streaming +// upgrade request failed. Implements error interface. +type upgradeFailureError struct { + Cause error +} + +func (e *upgradeFailureError) Error() string { + if e.Cause == nil { + return "upgrade failed" + } + return fmt.Sprintf("upgrade failed: %v", e.Cause) +} +func (e *upgradeFailureError) Unwrap() error { + if e.Cause == nil { + return nil + } + return e.Cause } diff --git a/lib/kube/proxy/roundtrip_websocket.go b/lib/kube/proxy/roundtrip_websocket.go index 1ee847384c826..6b7f5b169f10f 100644 --- a/lib/kube/proxy/roundtrip_websocket.go +++ b/lib/kube/proxy/roundtrip_websocket.go @@ -22,18 +22,13 @@ import ( "fmt" "net/http" - "github.com/coreos/go-semver/semver" gwebsocket "github.com/gorilla/websocket" "github.com/gravitational/trace" "k8s.io/apimachinery/pkg/util/httpstream" utilnet "k8s.io/apimachinery/pkg/util/net" - versionUtil "k8s.io/apimachinery/pkg/util/version" - "k8s.io/apimachinery/pkg/version" kwebsocket "k8s.io/client-go/transport/websocket" - "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/auth" - "github.com/gravitational/teleport/lib/utils" ) // WebsocketRoundTripper knows how to upgrade an HTTP request to one that supports @@ -115,81 +110,3 @@ func (w *WebsocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, er return wsResp, nil } - -// versionWithoutExecSubprotocolV5 is the version of Teleport that starts supporting websocket exec subprotocol v5. -var versionWithoutExecSubprotocolV5 = semver.New(utils.VersionBeforeAlpha("16.0.0")) - -const kubernetesExecSubprotocolV5Version = "1.30.0" - -func kubernetesSupportsExecSubprotocolV5(serverVersion *version.Info) bool { - if serverVersion == nil { - return false - } - - parsedVersion, err := versionUtil.ParseSemantic(serverVersion.GitVersion) - if err != nil { - return false - } - requiredVersion, err := versionUtil.ParseSemantic(kubernetesExecSubprotocolV5Version) - if err != nil { - return false - } - - return parsedVersion.AtLeast(requiredVersion) -} - -// teleportVersionInterface is an interface that allows to get the Teleport version of -// a kube server. -// DELETE IN 17.0.0 (anton) -type teleportVersionInterface interface { - GetTeleportVersion() string -} - -// allServersSupportExecSubprotocolV5 checks if all paths for this sessions support -// websocket exec subprotocol v5. If all of them do and target kubernetes cluster supports it as well -// we can use websocket executor, otherwise we'll use SPDY executor. -func (f *Forwarder) allServersSupportExecSubprotocolV5(sess *clusterSession) bool { - // If the cluster is remote, we need to check if all remote proxies - // support websocket exec subprotocol v5. - if sess.teleportCluster.isRemote { - proxies, err := f.getRemoteClusterProxies(sess.teleportCluster.name) - return err == nil && allServersSupportExecSubprotocolV5(proxies) - } - // If the cluster is not remote, validate the kube services support of - // websocket exec subprotocol v5. - return allServersSupportExecSubprotocolV5(sess.kubeServers) -} - -// allServersSupportExecSubprotocolV5 returns true if all servers in the list -// support websocket exec subprotocol v5. -// DELETE IN 17.0.0 (anton) -func allServersSupportExecSubprotocolV5[T teleportVersionInterface](servers []T) bool { - if len(servers) == 0 { - return false - } - - for _, server := range servers { - serverVersion := server.GetTeleportVersion() - semVer, err := semver.NewVersion(serverVersion) - if err != nil || semVer.LessThan(*versionWithoutExecSubprotocolV5) { - return false - } - } - return true -} - -// getRemoteClusterProxies returns a list of proxies registered at the remote cluster. -// It's used to determine whether the remote cluster supports websocket exec subprotocol v5. -func (f *Forwarder) getRemoteClusterProxies(clusterName string) ([]types.Server, error) { - targetCluster, err := f.cfg.ReverseTunnelSrv.GetSite(clusterName) - if err != nil { - return nil, trace.Wrap(err) - } - // Get the remote cluster's cache. - caching, err := targetCluster.CachingAccessPoint() - if err != nil { - return nil, trace.Wrap(err) - } - proxies, err := caching.GetProxies() - return proxies, trace.Wrap(err) -} diff --git a/lib/kube/proxy/testing/kube_server/kube_mock.go b/lib/kube/proxy/testing/kube_server/kube_mock.go index eb1cc97edacce..81b101ff7d802 100644 --- a/lib/kube/proxy/testing/kube_server/kube_mock.go +++ b/lib/kube/proxy/testing/kube_server/kube_mock.go @@ -247,6 +247,7 @@ func (s *KubeMockServer) formatResponseError(rw http.ResponseWriter, respErr err } func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr error, status *metav1.Status) { + status = status.DeepCopy() data, err := runtime.Encode(kubeCodecs.LegacyCodec(), status) if err != nil { s.log.Warningf("Failed encoding error into kube Status object: %v", err)