Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions lib/kube/proxy/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func TestExecWebsocketEndToEndErrReturn(t *testing.T) {
)
require.NoError(t, err)
t.Cleanup(func() {
require.EqualValues(t, 0, kubeMock.KubeExecRequests.SPDY.Load(), "expected no SPDY requests")
require.EqualValues(t, 2, kubeMock.KubeExecRequests.SPDY.Load(), "expected no SPDY requests")
require.EqualValues(t, 2, kubeMock.KubeExecRequests.Websocket.Load(), "expected one websocket request")
kubeMock.Close()
})
Expand Down Expand Up @@ -594,8 +594,6 @@ func TestExecWebsocketEndToEndErrReturn(t *testing.T) {
require.Equal(t, "403", execEvent.ExitCode)
require.NotEmpty(t, execEvent.Error)
eventsLock.Unlock()

})
}

}
39 changes: 19 additions & 20 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2150,28 +2150,27 @@ func isRelevantWebsocketError(err error) bool {
}

func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) {
isWSSupported := false
if !sess.isLocalKubernetesCluster {
// We're forwarding it to another kube_service, check if it supports new protocol.
isWSSupported = f.allServersSupportExecSubprotocolV5(sess)
} else {
// We're accessing the Kubernetes cluster directly, check if it is version that supports new protocol.
f.rwMutexDetails.RLock()
if details, ok := f.clusterDetails[sess.kubeClusterName]; ok {
details.rwMu.RLock()
isWSSupported = kubernetesSupportsExecSubprotocolV5(details.kubeClusterVersion)
details.rwMu.RUnlock()
}
f.rwMutexDetails.RUnlock()
}

if isWSSupported {
wsExec, err := f.getWebsocketExecutor(sess, req)
return wsExec, trace.Wrap(err)
wsExec, err := f.getWebsocketExecutor(sess, req)
if err != nil {
return nil, trace.Wrap(err, "unable to create websocket executor")
}

spdyExec, err := f.getSPDYExecutor(sess, req)
return spdyExec, trace.Wrap(err)
if err != nil {
return nil, trace.Wrap(err, "unable to create spdy executor")
}
return remotecommand.NewFallbackExecutor(
wsExec,
spdyExec,
func(err error) bool {
// If the error is a known upgrade failure, we can retry with the other protocol.
result := httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err) || kubeerrors.IsForbidden(err) || isTeleportUpgradeFailure(err)
if result {
// If the error is a known upgrade failure, we can retry with the other protocol.
// To do that, we need to reset the connection monitor context.
sess.connCtx, sess.connMonitorCancel = context.WithCancelCause(req.Context())
}
return result
})
}

func (f *Forwarder) getSPDYExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) {
Expand Down
29 changes: 27 additions & 2 deletions lib/kube/proxy/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -288,11 +289,35 @@ func extractKubeAPIStatusFromReq(rsp *http.Response) error {
} else {
if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil {
if status, ok := obj.(*metav1.Status); ok {
return &apierrors.StatusError{ErrStatus: *status}
return &upgradeFailureError{Cause: &apierrors.StatusError{ErrStatus: *status}}
}
}
responseError = string(responseErrorBytes)
responseError = strings.TrimSpace(responseError)
}
return fmt.Errorf("unable to upgrade connection: %s", responseError)
return &upgradeFailureError{Cause: fmt.Errorf("unable to upgrade connection: %s", responseError)}
}

func isTeleportUpgradeFailure(err error) bool {
var upgradeErr *upgradeFailureError
return errors.As(err, &upgradeErr)
}

// upgradeFailureError encapsulates the cause for why the streaming
// upgrade request failed. Implements error interface.
type upgradeFailureError struct {
Cause error
}

func (e *upgradeFailureError) Error() string {
if e.Cause == nil {
return "upgrade failed"
}
return fmt.Sprintf("upgrade failed: %v", e.Cause)
}
func (e *upgradeFailureError) Unwrap() error {
if e.Cause == nil {
return nil
}
return e.Cause
}
83 changes: 0 additions & 83 deletions lib/kube/proxy/roundtrip_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,13 @@ import (
"fmt"
"net/http"

"github.com/coreos/go-semver/semver"
gwebsocket "github.com/gorilla/websocket"
"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/util/httpstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
versionUtil "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/version"
kwebsocket "k8s.io/client-go/transport/websocket"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/utils"
)

// WebsocketRoundTripper knows how to upgrade an HTTP request to one that supports
Expand Down Expand Up @@ -115,81 +110,3 @@ func (w *WebsocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, er

return wsResp, nil
}

// versionWithoutExecSubprotocolV5 is the version of Teleport that starts supporting websocket exec subprotocol v5.
var versionWithoutExecSubprotocolV5 = semver.New(utils.VersionBeforeAlpha("16.0.0"))

const kubernetesExecSubprotocolV5Version = "1.30.0"

func kubernetesSupportsExecSubprotocolV5(serverVersion *version.Info) bool {
if serverVersion == nil {
return false
}

parsedVersion, err := versionUtil.ParseSemantic(serverVersion.GitVersion)
if err != nil {
return false
}
requiredVersion, err := versionUtil.ParseSemantic(kubernetesExecSubprotocolV5Version)
if err != nil {
return false
}

return parsedVersion.AtLeast(requiredVersion)
}

// teleportVersionInterface is an interface that allows to get the Teleport version of
// a kube server.
// DELETE IN 17.0.0 (anton)
type teleportVersionInterface interface {
GetTeleportVersion() string
}

// allServersSupportExecSubprotocolV5 checks if all paths for this sessions support
// websocket exec subprotocol v5. If all of them do and target kubernetes cluster supports it as well
// we can use websocket executor, otherwise we'll use SPDY executor.
func (f *Forwarder) allServersSupportExecSubprotocolV5(sess *clusterSession) bool {
// If the cluster is remote, we need to check if all remote proxies
// support websocket exec subprotocol v5.
if sess.teleportCluster.isRemote {
proxies, err := f.getRemoteClusterProxies(sess.teleportCluster.name)
return err == nil && allServersSupportExecSubprotocolV5(proxies)
}
// If the cluster is not remote, validate the kube services support of
// websocket exec subprotocol v5.
return allServersSupportExecSubprotocolV5(sess.kubeServers)
}

// allServersSupportExecSubprotocolV5 returns true if all servers in the list
// support websocket exec subprotocol v5.
// DELETE IN 17.0.0 (anton)
func allServersSupportExecSubprotocolV5[T teleportVersionInterface](servers []T) bool {
if len(servers) == 0 {
return false
}

for _, server := range servers {
serverVersion := server.GetTeleportVersion()
semVer, err := semver.NewVersion(serverVersion)
if err != nil || semVer.LessThan(*versionWithoutExecSubprotocolV5) {
return false
}
}
return true
}

// getRemoteClusterProxies returns a list of proxies registered at the remote cluster.
// It's used to determine whether the remote cluster supports websocket exec subprotocol v5.
func (f *Forwarder) getRemoteClusterProxies(clusterName string) ([]types.Server, error) {
targetCluster, err := f.cfg.ReverseTunnelSrv.GetSite(clusterName)
if err != nil {
return nil, trace.Wrap(err)
}
// Get the remote cluster's cache.
caching, err := targetCluster.CachingAccessPoint()
if err != nil {
return nil, trace.Wrap(err)
}
proxies, err := caching.GetProxies()
return proxies, trace.Wrap(err)
}
1 change: 1 addition & 0 deletions lib/kube/proxy/testing/kube_server/kube_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (s *KubeMockServer) formatResponseError(rw http.ResponseWriter, respErr err
}

func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr error, status *metav1.Status) {
status = status.DeepCopy()
data, err := runtime.Encode(kubeCodecs.LegacyCodec(), status)
if err != nil {
s.log.Warningf("Failed encoding error into kube Status object: %v", err)
Expand Down
Loading