Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion lib/kube/proxy/cluster_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

Expand All @@ -53,6 +54,8 @@ type kubeDetails struct {
dynamicLabels *labels.Dynamic
// kubeCluster is the dynamic kube_cluster or a static generated from kubeconfig and that only has the name populated.
kubeCluster types.KubeCluster
// kubeClusterVersion is the version of the kube_cluster's related Kubernetes server.
kubeClusterVersion *version.Info

// rwMu is the mutex to protect the kubeCodecs, gvkSupportedResources, and rbacSupportedTypes.
rwMu sync.RWMutex
Expand Down Expand Up @@ -124,9 +127,12 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
dynLabels.Sync()
go dynLabels.Start()
}

kubeClient := creds.getKubeClient()

var isClusterOffline bool
// Create the codec factory and the list of supported types for RBAC.
codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, kubeClient)
if err != nil {
cfg.log.WithError(err).Warn("Failed to create cluster schema. Possibly the cluster is offline.")
// If the cluster is offline, we will not be able to create the codec factory
Expand All @@ -136,11 +142,17 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
isClusterOffline = true
}

kubeVersion, err := kubeClient.Discovery().ServerVersion()
if err != nil {
cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.")
}

ctx, cancel := context.WithCancel(ctx)
k := &kubeDetails{
kubeCreds: creds,
dynamicLabels: dynLabels,
kubeCluster: cfg.cluster,
kubeClusterVersion: kubeVersion,
kubeCodecs: codecFactory,
rbacSupportedTypes: rbacSupportedTypes,
cancelFunc: cancel,
Expand All @@ -165,11 +177,17 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
continue
}

kubeVersion, err := kubeClient.Discovery().ServerVersion()
if err != nil {
cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.")
}

k.rwMu.Lock()
k.kubeCodecs = codecFactory
k.rbacSupportedTypes = rbacSupportedTypes
k.gvkSupportedResources = gvkSupportedResources
k.isClusterOffline = false
k.kubeClusterVersion = kubeVersion
k.rwMu.Unlock()
}
}
Expand Down
80 changes: 79 additions & 1 deletion lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/google/uuid"
"github.com/gorilla/websocket"
gwebsocket "github.com/gorilla/websocket"
"github.com/gravitational/trace"
"github.com/gravitational/ttlmap"
"github.com/jonboulle/clockwork"
Expand All @@ -50,8 +51,10 @@ import (
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy"
kwebsocket "k8s.io/client-go/transport/websocket"
kubeexec "k8s.io/client-go/util/exec"

"github.com/gravitational/teleport"
Expand Down Expand Up @@ -2035,7 +2038,83 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h
}
}

func (f *Forwarder) getWebsocketExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) {
f.log.Debugf("Creating websocket remote executor for request %s %s", req.Method, req.RequestURI)

tlsConfig, useImpersonation, err := f.getTLSConfig(sess)
if err != nil {
return nil, trace.Wrap(err)
}

upgradeRoundTripper := NewWebsocketRoundTripperWithDialer(roundTripperConfig{
Comment thread
zmb3 marked this conversation as resolved.
ctx: req.Context(),
log: f.log,
sess: sess,
dialWithContext: sess.DialWithContext(),
tlsConfig: tlsConfig,
originalHeaders: req.Header,
useIdentityForwarding: useImpersonation,
proxier: sess.getProxier(),
})
rt := http.RoundTripper(upgradeRoundTripper)
if sess.kubeAPICreds != nil {
var err error
rt, err = sess.kubeAPICreds.wrapTransport(rt)
if err != nil {
return nil, trace.Wrap(err)
}
}
rt = tracehttp.NewTransport(rt)

cfg := &rest.Config{
// WrapTransport will replace default roundTripper created for the WebsocketExecutor
// and on successfully established connection we will set upgrader's websocket connection.
WrapTransport: func(baseRt http.RoundTripper) http.RoundTripper {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we supposed to do this?
What's the reasoning behind this wrap?
How many requests do we do using this wrapper? Doesn't the default round tripper built by remotecommand.NewWebSocketExecutor, does a request?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use our custom dialer to be able to reach kube service over the reverse tunnel, default round tripper won't be able to dial. We do the same thing for the SPDY executor - we provide our custom dialer:
image

The only thing that for SPDY they provide NewSPDYExecutorForTransports where we can set our custom transport directly, but for the websocket executor they didn't expose this API, so we replace transport in the wrap to achieve the same effect.

if wrt, ok := baseRt.(*kwebsocket.RoundTripper); ok {
upgradeRoundTripper.onConnected = func(wsConn *gwebsocket.Conn) {
wrt.Conn = wsConn
}
}

return rt
},
}

return remotecommand.NewWebSocketExecutor(cfg, req.Method, req.URL.String())
}

func isRelevantWebsocketError(err error) bool {
return err != nil && !strings.Contains(err.Error(), "next reader: EOF")
}

func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) {
isWSSupported := false
if sess.noAuditEvents {
// We're forwarding it to another kube_service, check if it supports new protocol.
isWSSupported = f.allServersSupportExecSubprotocolV5(sess)
} else {
// We're accessing the Kubernetes cluster directly, check if it is version that supports new protocol.
f.rwMutexDetails.RLock()
if details, ok := f.clusterDetails[sess.kubeClusterName]; ok {
details.rwMu.RLock()
isWSSupported = kubernetesSupportsExecSubprotocolV5(details.kubeClusterVersion)
details.rwMu.RUnlock()
}
f.rwMutexDetails.RUnlock()
}

if isWSSupported {
wsExec, err := f.getWebsocketExecutor(sess, req)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use this websocket executor because if the kubernetes_service doesn't support protocol v5, it will fallback to websocket v4 which doesn't support termination signals.

I see interest in dropping SPDY executor between Teleport components so we can say goodbye to the legacy and unmaintained SPDY protocol. But in order to do it, we need to ensure the other teleport component we are dialing supports protocol v5, otherwise kubectl will be mostly broken when stdin terminates.

You can either:

  • check the version of the kubernetes_service and if > some version, we allow the websocket executor because we know it supports it.
  • We implement our own fallback if the server doesn't support v5
  • we delay this until teleport v16

I think the teleport v16 approach is the sanest for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, we can still forward the requests internally using SPDY while the users request websockets. When I wrote the WS implementation, I had that in mind and teleport accepts connections for SPDY and Websocket but always initiates the requests as SPDY between components and even the kube cluster.

The only part that needs to be delayed is the internal requests/kube requests using web sockets. We can still support websocket v5 as user's frontend.

Nothing else needs to be changed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I know, I'm looking into fallback approach and generally it works great, but there's a problem with whole session context cancellation by the trackingConnection (s.connMonitorCancel), it was added to fix a flaky test, maybe there's a way around it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented fallback approach and tested it in all configurations and permutations - proxying to a v5 capable kube, v5 capable our kube_service, legacy service with both tty and tty-less execs, everything seems to be working fine, so we don't have to check if target kube/kube_service supports v5 protocol, and we can merge this and backport it to at least v15.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per discussion with Tiago - I've split this PR in two parts, first (now PR #39755 ) will add support for receiving incoming connection (e.g. kubectl) in websocket subprotocol v5, and it will be backported to all supported Teleport versions. It's a safe change, since we only receive subprotocol v5, but then make proxying requests using old stable SPDY implementation. And in this PR we add making proxying requests (to Teleport kube_services and target Kubernetes clusters) using v5 subprotocol and we'll leave it to v16.

return wsExec, trace.Wrap(err)
}

spdyExec, err := f.getSPDYExecutor(sess, req)
return spdyExec, trace.Wrap(err)
}

func (f *Forwarder) getSPDYExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) {
f.log.Debugf("Creating SPDY remote executor for request %s %s", req.Method, req.RequestURI)

tlsConfig, useImpersonation, err := f.getTLSConfig(sess)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -2279,7 +2358,6 @@ func (f *Forwarder) newClusterSessionLocal(ctx context.Context, authCtx authCont
if err != nil {
return nil, trace.Wrap(err)
}

connCtx, cancel := context.WithCancelCause(ctx)
f.log.Debugf("Handling kubernetes session for %v using local credentials.", authCtx)
return &clusterSession{
Expand Down
3 changes: 3 additions & 0 deletions lib/kube/proxy/remotecommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ func upgradeRequestToRemoteCommandProxy(req remoteCommandRequest, exec func(*rem
go proxy.resizeQueue.handleResizeEvents(proxy.resizeStream)
}
err = exec(proxy)
if !isRelevantWebsocketError(err) {
err = nil
}
Comment thread
zmb3 marked this conversation as resolved.
if err := proxy.sendStatus(err); err != nil {
log.Warningf("Failed to send status: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions lib/kube/proxy/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type roundTripperConfig struct {
// auth.TeleportImpersonateUserHeader and auth.TeleportImpersonateIPHeader
// headers instead of relying on the certificate to transport it.
useIdentityForwarding bool
// log specifies the logger.
log log.FieldLogger

proxier func(*http.Request) (*url.URL, error)
}
Expand Down
192 changes: 192 additions & 0 deletions lib/kube/proxy/roundtrip_websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Teleport
* Copyright (C) 2024 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package proxy

import (
"fmt"
"net/http"

"github.com/coreos/go-semver/semver"
gwebsocket "github.com/gorilla/websocket"
"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/util/httpstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
versionUtil "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/version"
kwebsocket "k8s.io/client-go/transport/websocket"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/utils"
)

// WebsocketRoundTripper knows how to upgrade an HTTP request to one that supports
// multiplexed streams. After RoundTrip() is invoked, Conn will be set
// and usable. WebsocketRoundTripper implements the UpgradeRoundTripper interface.
type WebsocketRoundTripper struct {
roundTripperConfig

// conn is the websocket network connection to the remote server.
conn *gwebsocket.Conn

// onConnected is a hook that happens when connection was successfully established,
// can be used to propagate established connection somewhere else - we are using it
// to set underlying connection of the native k8s websocket executor.
onConnected func(conn *gwebsocket.Conn)
}

// NewWebsocketRoundTripperWithDialer creates a new WebsocketRoundTripper that will
// dial and upgrade connection, copying impersonation setup specified in the config.
func NewWebsocketRoundTripperWithDialer(cfg roundTripperConfig) *WebsocketRoundTripper {
return &WebsocketRoundTripper{roundTripperConfig: cfg}
}

// RoundTrip executes the Request and upgrades it.
func (w *WebsocketRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
header := utilnet.CloneHeader(req.Header)
// copyImpersonationHeaders copies the headers from the original request to the new
// request headers. This is necessary to forward the original user's impersonation
// when multiple kubernetes_users are available.
copyImpersonationHeaders(header, w.originalHeaders)
if err := setupImpersonationHeaders(w.log, w.sess, header); err != nil {
return nil, trace.Wrap(err)
}

var err error

// If we're using identity forwarding, we need to add the impersonation
// headers to the request before we send the request.
if w.useIdentityForwarding {
if header, err = auth.IdentityForwardingHeaders(w.ctx, header); err != nil {
return nil, trace.Wrap(err)
}
}

clone := utilnet.CloneRequest(req)
clone.Header = header

nativeBufferSize := (&kwebsocket.RoundTripper{}).DataBufferSize()

wsDialer := gwebsocket.Dialer{
NetDialContext: w.dialWithContext,
Proxy: w.proxier,
TLSClientConfig: w.tlsConfig,
Subprotocols: header[httpstream.HeaderProtocolVersion],
ReadBufferSize: nativeBufferSize + 1024, // matching code in k8s websocket/roundripper.go
WriteBufferSize: nativeBufferSize + 1024,
}

switch clone.URL.Scheme {
case "https":
clone.URL.Scheme = "wss"
case "http":
clone.URL.Scheme = "ws"
default:
return nil, fmt.Errorf("unknown url scheme: %s", clone.URL.Scheme)
}

wsConn, wsResp, err := wsDialer.DialContext(w.ctx, clone.URL.String(), clone.Header)
if err != nil {
return nil, &httpstream.UpgradeFailureError{Cause: err}
}
w.conn = wsConn
if w.onConnected != nil {
w.onConnected(wsConn)
}

return wsResp, nil
}

// versionWithoutExecSubprotocolV5 is the version of Teleport that starts supporting websocket exec subprotocol v5.
var versionWithoutExecSubprotocolV5 = semver.New(utils.VersionBeforeAlpha("16.0.0"))

const kubernetesExecSubprotocolV5Version = "1.30.0"

func kubernetesSupportsExecSubprotocolV5(serverVersion *version.Info) bool {
if serverVersion == nil {
return false
}

parsedVersion, err := versionUtil.ParseSemantic(serverVersion.GitVersion)
if err != nil {
return false
}
requiredVersion, err := versionUtil.ParseSemantic(kubernetesExecSubprotocolV5Version)
if err != nil {
return false
}

return parsedVersion.AtLeast(requiredVersion)
}

// teleportVersionInterface is an interface that allows to get the Teleport version of
// a kube server.
// DELETE IN 17.0.0 (anton)
type teleportVersionInterface interface {
GetTeleportVersion() string
}

// allServersSupportExecSubprotocolV5 checks if all paths for this sessions support
// websocket exec subprotocol v5. If all of them do and target kubernetes cluster supports it as well
// we can use websocket executor, otherwise we'll use SPDY executor.
func (f *Forwarder) allServersSupportExecSubprotocolV5(sess *clusterSession) bool {
// If the cluster is remote, we need to check if all remote proxies
// support websocket exec subprotocol v5.
if sess.teleportCluster.isRemote {
proxies, err := f.getRemoteClusterProxies(sess.teleportCluster.name)
return err == nil && allServersSupportExecSubprotocolV5(proxies)
}
// If the cluster is not remote, validate the kube services support of
// websocket exec subprotocol v5.
return allServersSupportExecSubprotocolV5(sess.kubeServers)
}

// allServersSupportExecSubprotocolV5 returns true if all servers in the list
// support websocket exec subprotocol v5.
// DELETE IN 17.0.0 (anton)
func allServersSupportExecSubprotocolV5[T teleportVersionInterface](servers []T) bool {
if len(servers) == 0 {
return false
}

for _, server := range servers {
serverVersion := server.GetTeleportVersion()
semVer, err := semver.NewVersion(serverVersion)
if err != nil || semVer.LessThan(*versionWithoutExecSubprotocolV5) {
return false
}
}
return true
}

// getRemoteClusterProxies returns a list of proxies registered at the remote cluster.
// It's used to determine whether the remote cluster supports websocket exec subprotocol v5.
func (f *Forwarder) getRemoteClusterProxies(clusterName string) ([]types.Server, error) {
targetCluster, err := f.cfg.ReverseTunnelSrv.GetSite(clusterName)
if err != nil {
return nil, trace.Wrap(err)
}
// Get the remote cluster's cache.
caching, err := targetCluster.CachingAccessPoint()
if err != nil {
return nil, trace.Wrap(err)
}
proxies, err := caching.GetProxies()
return proxies, trace.Wrap(err)
}