diff --git a/lib/kube/proxy/cluster_details.go b/lib/kube/proxy/cluster_details.go index 50c973ebed2dc..e64ffdde11fd5 100644 --- a/lib/kube/proxy/cluster_details.go +++ b/lib/kube/proxy/cluster_details.go @@ -34,6 +34,7 @@ import ( "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -53,6 +54,8 @@ type kubeDetails struct { dynamicLabels *labels.Dynamic // kubeCluster is the dynamic kube_cluster or a static generated from kubeconfig and that only has the name populated. kubeCluster types.KubeCluster + // kubeClusterVersion is the version of the kube_cluster's related Kubernetes server. + kubeClusterVersion *version.Info // rwMu is the mutex to protect the kubeCodecs, gvkSupportedResources, and rbacSupportedTypes. rwMu sync.RWMutex @@ -124,9 +127,12 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe dynLabels.Sync() go dynLabels.Start() } + + kubeClient := creds.getKubeClient() + var isClusterOffline bool // Create the codec factory and the list of supported types for RBAC. - codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient()) + codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, kubeClient) if err != nil { cfg.log.WithError(err).Warn("Failed to create cluster schema. Possibly the cluster is offline.") // If the cluster is offline, we will not be able to create the codec factory @@ -136,11 +142,17 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe isClusterOffline = true } + kubeVersion, err := kubeClient.Discovery().ServerVersion() + if err != nil { + cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.") + } + ctx, cancel := context.WithCancel(ctx) k := &kubeDetails{ kubeCreds: creds, dynamicLabels: dynLabels, kubeCluster: cfg.cluster, + kubeClusterVersion: kubeVersion, kubeCodecs: codecFactory, rbacSupportedTypes: rbacSupportedTypes, cancelFunc: cancel, @@ -165,11 +177,17 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe continue } + kubeVersion, err := kubeClient.Discovery().ServerVersion() + if err != nil { + cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.") + } + k.rwMu.Lock() k.kubeCodecs = codecFactory k.rbacSupportedTypes = rbacSupportedTypes k.gvkSupportedResources = gvkSupportedResources k.isClusterOffline = false + k.kubeClusterVersion = kubeVersion k.rwMu.Unlock() } } diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 67db08a98366a..adfc58a768786 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -36,6 +36,7 @@ import ( "github.com/google/uuid" "github.com/gorilla/websocket" + gwebsocket "github.com/gorilla/websocket" "github.com/gravitational/trace" "github.com/gravitational/ttlmap" "github.com/jonboulle/clockwork" @@ -50,8 +51,10 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/transport/spdy" + kwebsocket "k8s.io/client-go/transport/websocket" kubeexec "k8s.io/client-go/util/exec" "github.com/gravitational/teleport" @@ -2035,7 +2038,83 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h } } +func (f *Forwarder) getWebsocketExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { + f.log.Debugf("Creating websocket remote executor for request %s %s", req.Method, req.RequestURI) + + tlsConfig, useImpersonation, err := f.getTLSConfig(sess) + if err != nil { + return nil, trace.Wrap(err) + } + + upgradeRoundTripper := NewWebsocketRoundTripperWithDialer(roundTripperConfig{ + ctx: req.Context(), + log: f.log, + sess: sess, + dialWithContext: sess.DialWithContext(), + tlsConfig: tlsConfig, + originalHeaders: req.Header, + useIdentityForwarding: useImpersonation, + proxier: sess.getProxier(), + }) + rt := http.RoundTripper(upgradeRoundTripper) + if sess.kubeAPICreds != nil { + var err error + rt, err = sess.kubeAPICreds.wrapTransport(rt) + if err != nil { + return nil, trace.Wrap(err) + } + } + rt = tracehttp.NewTransport(rt) + + cfg := &rest.Config{ + // WrapTransport will replace default roundTripper created for the WebsocketExecutor + // and on successfully established connection we will set upgrader's websocket connection. + WrapTransport: func(baseRt http.RoundTripper) http.RoundTripper { + if wrt, ok := baseRt.(*kwebsocket.RoundTripper); ok { + upgradeRoundTripper.onConnected = func(wsConn *gwebsocket.Conn) { + wrt.Conn = wsConn + } + } + + return rt + }, + } + + return remotecommand.NewWebSocketExecutor(cfg, req.Method, req.URL.String()) +} + +func isRelevantWebsocketError(err error) bool { + return err != nil && !strings.Contains(err.Error(), "next reader: EOF") +} + func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { + isWSSupported := false + if sess.noAuditEvents { + // We're forwarding it to another kube_service, check if it supports new protocol. + isWSSupported = f.allServersSupportExecSubprotocolV5(sess) + } else { + // We're accessing the Kubernetes cluster directly, check if it is version that supports new protocol. + f.rwMutexDetails.RLock() + if details, ok := f.clusterDetails[sess.kubeClusterName]; ok { + details.rwMu.RLock() + isWSSupported = kubernetesSupportsExecSubprotocolV5(details.kubeClusterVersion) + details.rwMu.RUnlock() + } + f.rwMutexDetails.RUnlock() + } + + if isWSSupported { + wsExec, err := f.getWebsocketExecutor(sess, req) + return wsExec, trace.Wrap(err) + } + + spdyExec, err := f.getSPDYExecutor(sess, req) + return spdyExec, trace.Wrap(err) +} + +func (f *Forwarder) getSPDYExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { + f.log.Debugf("Creating SPDY remote executor for request %s %s", req.Method, req.RequestURI) + tlsConfig, useImpersonation, err := f.getTLSConfig(sess) if err != nil { return nil, trace.Wrap(err) @@ -2279,7 +2358,6 @@ func (f *Forwarder) newClusterSessionLocal(ctx context.Context, authCtx authCont if err != nil { return nil, trace.Wrap(err) } - connCtx, cancel := context.WithCancelCause(ctx) f.log.Debugf("Handling kubernetes session for %v using local credentials.", authCtx) return &clusterSession{ diff --git a/lib/kube/proxy/remotecommand.go b/lib/kube/proxy/remotecommand.go index 2530d2c7ee2d5..cfd31f18b95de 100644 --- a/lib/kube/proxy/remotecommand.go +++ b/lib/kube/proxy/remotecommand.go @@ -116,6 +116,9 @@ func upgradeRequestToRemoteCommandProxy(req remoteCommandRequest, exec func(*rem go proxy.resizeQueue.handleResizeEvents(proxy.resizeStream) } err = exec(proxy) + if !isRelevantWebsocketError(err) { + err = nil + } if err := proxy.sendStatus(err); err != nil { log.Warningf("Failed to send status: %v", err) } diff --git a/lib/kube/proxy/roundtrip.go b/lib/kube/proxy/roundtrip.go index f5507170ffecc..ed68364f5fe2d 100644 --- a/lib/kube/proxy/roundtrip.go +++ b/lib/kube/proxy/roundtrip.go @@ -88,6 +88,8 @@ type roundTripperConfig struct { // auth.TeleportImpersonateUserHeader and auth.TeleportImpersonateIPHeader // headers instead of relying on the certificate to transport it. useIdentityForwarding bool + // log specifies the logger. + log log.FieldLogger proxier func(*http.Request) (*url.URL, error) } diff --git a/lib/kube/proxy/roundtrip_websocket.go b/lib/kube/proxy/roundtrip_websocket.go new file mode 100644 index 0000000000000..6e232b46eb61f --- /dev/null +++ b/lib/kube/proxy/roundtrip_websocket.go @@ -0,0 +1,192 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package proxy + +import ( + "fmt" + "net/http" + + "github.com/coreos/go-semver/semver" + gwebsocket "github.com/gorilla/websocket" + "github.com/gravitational/trace" + "k8s.io/apimachinery/pkg/util/httpstream" + utilnet "k8s.io/apimachinery/pkg/util/net" + versionUtil "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apimachinery/pkg/version" + kwebsocket "k8s.io/client-go/transport/websocket" + + "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/auth" + "github.com/gravitational/teleport/lib/utils" +) + +// WebsocketRoundTripper knows how to upgrade an HTTP request to one that supports +// multiplexed streams. After RoundTrip() is invoked, Conn will be set +// and usable. WebsocketRoundTripper implements the UpgradeRoundTripper interface. +type WebsocketRoundTripper struct { + roundTripperConfig + + // conn is the websocket network connection to the remote server. + conn *gwebsocket.Conn + + // onConnected is a hook that happens when connection was successfully established, + // can be used to propagate established connection somewhere else - we are using it + // to set underlying connection of the native k8s websocket executor. + onConnected func(conn *gwebsocket.Conn) +} + +// NewWebsocketRoundTripperWithDialer creates a new WebsocketRoundTripper that will +// dial and upgrade connection, copying impersonation setup specified in the config. +func NewWebsocketRoundTripperWithDialer(cfg roundTripperConfig) *WebsocketRoundTripper { + return &WebsocketRoundTripper{roundTripperConfig: cfg} +} + +// RoundTrip executes the Request and upgrades it. +func (w *WebsocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + header := utilnet.CloneHeader(req.Header) + // copyImpersonationHeaders copies the headers from the original request to the new + // request headers. This is necessary to forward the original user's impersonation + // when multiple kubernetes_users are available. + copyImpersonationHeaders(header, w.originalHeaders) + if err := setupImpersonationHeaders(w.log, w.sess, header); err != nil { + return nil, trace.Wrap(err) + } + + var err error + + // If we're using identity forwarding, we need to add the impersonation + // headers to the request before we send the request. + if w.useIdentityForwarding { + if header, err = auth.IdentityForwardingHeaders(w.ctx, header); err != nil { + return nil, trace.Wrap(err) + } + } + + clone := utilnet.CloneRequest(req) + clone.Header = header + + nativeBufferSize := (&kwebsocket.RoundTripper{}).DataBufferSize() + + wsDialer := gwebsocket.Dialer{ + NetDialContext: w.dialWithContext, + Proxy: w.proxier, + TLSClientConfig: w.tlsConfig, + Subprotocols: header[httpstream.HeaderProtocolVersion], + ReadBufferSize: nativeBufferSize + 1024, // matching code in k8s websocket/roundripper.go + WriteBufferSize: nativeBufferSize + 1024, + } + + switch clone.URL.Scheme { + case "https": + clone.URL.Scheme = "wss" + case "http": + clone.URL.Scheme = "ws" + default: + return nil, fmt.Errorf("unknown url scheme: %s", clone.URL.Scheme) + } + + wsConn, wsResp, err := wsDialer.DialContext(w.ctx, clone.URL.String(), clone.Header) + if err != nil { + return nil, &httpstream.UpgradeFailureError{Cause: err} + } + w.conn = wsConn + if w.onConnected != nil { + w.onConnected(wsConn) + } + + return wsResp, nil +} + +// versionWithoutExecSubprotocolV5 is the version of Teleport that starts supporting websocket exec subprotocol v5. +var versionWithoutExecSubprotocolV5 = semver.New(utils.VersionBeforeAlpha("16.0.0")) + +const kubernetesExecSubprotocolV5Version = "1.30.0" + +func kubernetesSupportsExecSubprotocolV5(serverVersion *version.Info) bool { + if serverVersion == nil { + return false + } + + parsedVersion, err := versionUtil.ParseSemantic(serverVersion.GitVersion) + if err != nil { + return false + } + requiredVersion, err := versionUtil.ParseSemantic(kubernetesExecSubprotocolV5Version) + if err != nil { + return false + } + + return parsedVersion.AtLeast(requiredVersion) +} + +// teleportVersionInterface is an interface that allows to get the Teleport version of +// a kube server. +// DELETE IN 17.0.0 (anton) +type teleportVersionInterface interface { + GetTeleportVersion() string +} + +// allServersSupportExecSubprotocolV5 checks if all paths for this sessions support +// websocket exec subprotocol v5. If all of them do and target kubernetes cluster supports it as well +// we can use websocket executor, otherwise we'll use SPDY executor. +func (f *Forwarder) allServersSupportExecSubprotocolV5(sess *clusterSession) bool { + // If the cluster is remote, we need to check if all remote proxies + // support websocket exec subprotocol v5. + if sess.teleportCluster.isRemote { + proxies, err := f.getRemoteClusterProxies(sess.teleportCluster.name) + return err == nil && allServersSupportExecSubprotocolV5(proxies) + } + // If the cluster is not remote, validate the kube services support of + // websocket exec subprotocol v5. + return allServersSupportExecSubprotocolV5(sess.kubeServers) +} + +// allServersSupportExecSubprotocolV5 returns true if all servers in the list +// support websocket exec subprotocol v5. +// DELETE IN 17.0.0 (anton) +func allServersSupportExecSubprotocolV5[T teleportVersionInterface](servers []T) bool { + if len(servers) == 0 { + return false + } + + for _, server := range servers { + serverVersion := server.GetTeleportVersion() + semVer, err := semver.NewVersion(serverVersion) + if err != nil || semVer.LessThan(*versionWithoutExecSubprotocolV5) { + return false + } + } + return true +} + +// getRemoteClusterProxies returns a list of proxies registered at the remote cluster. +// It's used to determine whether the remote cluster supports websocket exec subprotocol v5. +func (f *Forwarder) getRemoteClusterProxies(clusterName string) ([]types.Server, error) { + targetCluster, err := f.cfg.ReverseTunnelSrv.GetSite(clusterName) + if err != nil { + return nil, trace.Wrap(err) + } + // Get the remote cluster's cache. + caching, err := targetCluster.CachingAccessPoint() + if err != nil { + return nil, trace.Wrap(err) + } + proxies, err := caching.GetProxies() + return proxies, trace.Wrap(err) +}