From a2ec8516cd0067d6da7b9689f97aca909aab47ea Mon Sep 17 00:00:00 2001 From: STeve Huang Date: Fri, 23 Jun 2023 08:51:32 -0400 Subject: [PATCH 1/7] Connect Kube gateway part 1: lib/teleterm/gateway --- lib/srv/alpnproxy/kube.go | 44 +++ lib/srv/alpnproxy/local_proxy.go | 11 + lib/teleterm/api/uri/uri.go | 27 ++ lib/teleterm/api/uri/uri_test.go | 110 ++++++++ lib/teleterm/gateway/config.go | 4 + ...l_proxy_middleware.go => db_middleware.go} | 6 +- ...ddleware_test.go => db_middleware_test.go} | 4 +- lib/teleterm/gateway/gateway.go | 125 ++++----- lib/teleterm/gateway/gateway_db.go | 88 ++++++ lib/teleterm/gateway/gateway_kube.go | 156 +++++++++++ lib/teleterm/gateway/gateway_kube_test.go | 256 ++++++++++++++++++ lib/teleterm/gateway/kube_cert_reissuer.go | 52 ++++ lib/teleterm/gatewaytest/helpers.go | 45 ++- tool/tsh/common/kube_proxy.go | 47 +--- 14 files changed, 833 insertions(+), 142 deletions(-) rename lib/teleterm/gateway/{local_proxy_middleware.go => db_middleware.go} (89%) rename lib/teleterm/gateway/{local_proxy_middleware_test.go => db_middleware_test.go} (97%) create mode 100644 lib/teleterm/gateway/gateway_db.go create mode 100644 lib/teleterm/gateway/gateway_kube.go create mode 100644 lib/teleterm/gateway/gateway_kube_test.go create mode 100644 lib/teleterm/gateway/kube_cert_reissuer.go diff --git a/lib/srv/alpnproxy/kube.go b/lib/srv/alpnproxy/kube.go index 23209dedc596e..170a506b26e9a 100644 --- a/lib/srv/alpnproxy/kube.go +++ b/lib/srv/alpnproxy/kube.go @@ -20,6 +20,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "crypto/x509/pkix" "errors" "net" "net/http" @@ -37,6 +38,8 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" + "github.com/gravitational/teleport/api/utils/keys" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/srv/alpnproxy/common" "github.com/gravitational/teleport/lib/tlsca" "github.com/gravitational/teleport/lib/utils" @@ -282,6 +285,12 @@ func NewKubeForwardProxy(ctx context.Context, listenPort, forwardAddr string) (* return nil, trace.Wrap(err) } + fp, err := NewKubeForwardProxyWithListener(ctx, listener, forwardAddr) + return fp, trace.Wrap(err) +} + +// NewKubeForwardProxyWithListener creates a forward proxy with provided listener. +func NewKubeForwardProxyWithListener(ctx context.Context, listener net.Listener, forwardAddr string) (*ForwardProxy, error) { fp, err := NewForwardProxy(ForwardProxyConfig{ Listener: listener, CloseContext: ctx, @@ -297,3 +306,38 @@ func NewKubeForwardProxy(ctx context.Context, listenPort, forwardAddr string) (* } return fp, nil } + +// CreateKubeLocalCAs generate local CAs used for kube local proxy with provided key. +func CreateKubeLocalCAs(key *keys.PrivateKey, teleportClusters []string) (map[string]tls.Certificate, error) { + cas := make(map[string]tls.Certificate) + for _, teleportCluster := range teleportClusters { + ca, err := createLocalCA(key, time.Now().Add(defaults.CATTL), common.KubeLocalProxyWildcardDomain(teleportCluster)) + if err != nil { + return nil, trace.Wrap(err) + } + cas[teleportCluster] = ca + } + return cas, nil +} + +func createLocalCA(key *keys.PrivateKey, validUntil time.Time, dnsNames ...string) (tls.Certificate, error) { + cert, err := tlsca.GenerateSelfSignedCAWithConfig(tlsca.GenerateCAConfig{ + Entity: pkix.Name{ + CommonName: "localhost", + Organization: []string{"Teleport"}, + }, + Signer: key, + DNSNames: dnsNames, + IPAddresses: []net.IP{net.ParseIP(defaults.Localhost)}, + TTL: time.Until(validUntil), + }) + if err != nil { + return tls.Certificate{}, trace.Wrap(err) + } + + tlsCert, err := keys.X509KeyPair(cert, key.PrivateKeyPEM()) + if err != nil { + return tls.Certificate{}, trace.Wrap(err) + } + return tlsCert, nil +} diff --git a/lib/srv/alpnproxy/local_proxy.go b/lib/srv/alpnproxy/local_proxy.go index 813ae59e7c0a8..426005a0e1d12 100644 --- a/lib/srv/alpnproxy/local_proxy.go +++ b/lib/srv/alpnproxy/local_proxy.go @@ -161,6 +161,14 @@ func NewLocalProxy(cfg LocalProxyConfig, opts ...LocalProxyConfigOpt) (*LocalPro // Start starts the LocalProxy. func (l *LocalProxy) Start(ctx context.Context) error { + if l.cfg.HTTPMiddleware != nil { + return trace.Wrap(l.StartHTTPAccessProxy(ctx)) + } + return trace.Wrap(l.start(ctx)) +} + +// start starts the LocalProxy for raw TCP or raw TLS (non-HTTP) connections. +func (l *LocalProxy) start(ctx context.Context) error { if l.cfg.Middleware != nil { err := l.cfg.Middleware.OnStart(ctx, l) if err != nil { @@ -265,6 +273,9 @@ func (l *LocalProxy) makeHTTPReverseProxy(certs []tls.Certificate) *httputil.Rev outReq.URL.Host = l.cfg.RemoteProxyAddr }, ModifyResponse: func(response *http.Response) error { + // Ask the client to close the connection to avoid re-use. + response.Header.Add("Connection", "close") + errHeader := response.Header.Get(commonApp.TeleportAPIErrorHeader) if errHeader != "" { // TODO: find a cleaner way of formatting the error. diff --git a/lib/teleterm/api/uri/uri.go b/lib/teleterm/api/uri/uri.go index a16aa35954a94..1fc9a4a48c2fb 100644 --- a/lib/teleterm/api/uri/uri.go +++ b/lib/teleterm/api/uri/uri.go @@ -29,6 +29,8 @@ var pathServers = urlpath.New("/clusters/:cluster/servers/:serverUUID") var pathLeafServers = urlpath.New("/clusters/:cluster/leaves/:leaf/servers/:serverUUID") var pathDbs = urlpath.New("/clusters/:cluster/dbs/:dbName") var pathLeafDbs = urlpath.New("/clusters/:cluster/leaves/:leaf/dbs/:dbName") +var pathKubes = urlpath.New("/clusters/:cluster/kubes/:kubeName") +var pathLeafKubes = urlpath.New("/clusters/:cluster/leaves/:leaf/kubes/:kubeName") // New creates an instance of ResourceURI func New(path string) ResourceURI { @@ -111,6 +113,21 @@ func (r ResourceURI) GetDbName() string { return "" } +// GetKubeName extracts the kube name from r. Returns an empty string if path is not a kube URI. +func (r ResourceURI) GetKubeName() string { + result, ok := pathKubes.Match(r.path) + if ok { + return result.Params["kubeName"] + } + + result, ok = pathLeafKubes.Match(r.path) + if ok { + return result.Params["kubeName"] + } + + return "" +} + // GetServerUUID extracts the server UUID from r. Returns an empty string if path is not a server URI. func (r ResourceURI) GetServerUUID() string { result, ok := pathServers.Match(r.path) @@ -177,3 +194,13 @@ func (r ResourceURI) AppendAccessRequest(id string) ResourceURI { func (r ResourceURI) String() string { return r.path } + +// IsDB returns true if URI is a database resource. +func IsDB(resourceURI string) bool { + return New(resourceURI).GetDbName() != "" +} + +// IsDB returns true if URI is a kube resource. +func IsKube(resourceURI string) bool { + return New(resourceURI).GetKubeName() != "" +} diff --git a/lib/teleterm/api/uri/uri_test.go b/lib/teleterm/api/uri/uri_test.go index d12fd446040e5..e9daf0f178131 100644 --- a/lib/teleterm/api/uri/uri_test.go +++ b/lib/teleterm/api/uri/uri_test.go @@ -130,6 +130,52 @@ func TestGetDbName(t *testing.T) { } } +func TestGetKubeName(t *testing.T) { + tests := []struct { + name string + in uri.ResourceURI + out string + }{ + { + name: "returns root cluster kube name", + in: uri.NewClusterURI("foo").AppendKube("k8s"), + out: "k8s", + }, + { + name: "returns leaf cluster kube name", + in: uri.NewClusterURI("foo").AppendLeafCluster("bar").AppendKube("k8s"), + out: "k8s", + }, + { + name: "returns empty string when given root cluster URI", + in: uri.NewClusterURI("foo"), + out: "", + }, + { + name: "returns empty string when given leaf cluster URI", + in: uri.NewClusterURI("foo").AppendLeafCluster("bar"), + out: "", + }, + { + name: "returns empty string when given root cluster non-kube resource URI", + in: uri.NewClusterURI("foo").AppendDB("postgres"), + out: "", + }, + { + name: "returns empty string when given leaf cluster non-kube resource URI", + in: uri.NewClusterURI("foo").AppendLeafCluster("bar").AppendDB("postgres"), + out: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out := tt.in.GetKubeName() + require.Equal(t, tt.out, out) + }) + } +} + func TestGetServerUUID(t *testing.T) { tests := []struct { name string @@ -216,3 +262,67 @@ func TestGetRootClusterURI(t *testing.T) { }) } } + +func TestIsDB(t *testing.T) { + tests := []struct { + in string + check require.BoolAssertionFunc + }{ + { + in: uri.NewClusterURI("foo").AppendDB("db").String(), + check: require.True, + }, + { + in: uri.NewClusterURI("foo").AppendLeafCluster("bar").AppendDB("db").String(), + check: require.True, + }, + { + in: uri.NewClusterURI("foo").String(), + check: require.False, + }, + { + in: uri.NewClusterURI("foo").AppendLeafCluster("bar").String(), + check: require.False, + }, + { + in: uri.NewClusterURI("foo").AppendKube("kube").String(), + check: require.False, + }, + } + + for _, tt := range tests { + tt.check(t, uri.IsDB(tt.in)) + } +} + +func TestIsKube(t *testing.T) { + tests := []struct { + in string + check require.BoolAssertionFunc + }{ + { + in: uri.NewClusterURI("foo").AppendKube("kube").String(), + check: require.True, + }, + { + in: uri.NewClusterURI("foo").AppendLeafCluster("bar").AppendKube("kube").String(), + check: require.True, + }, + { + in: uri.NewClusterURI("foo").String(), + check: require.False, + }, + { + in: uri.NewClusterURI("foo").AppendLeafCluster("bar").String(), + check: require.False, + }, + { + in: uri.NewClusterURI("foo").AppendDB("db").String(), + check: require.False, + }, + } + + for _, tt := range tests { + tt.check(t, uri.IsKube(tt.in)) + } +} diff --git a/lib/teleterm/gateway/config.go b/lib/teleterm/gateway/config.go index 448850d33fc48..98a7590191fe7 100644 --- a/lib/teleterm/gateway/config.go +++ b/lib/teleterm/gateway/config.go @@ -42,6 +42,8 @@ type Config struct { TargetURI string // TargetUser is the target user name TargetUser string + // TargetGroups is a list of target groups + TargetGroups []string // TargetSubresourceName points at a subresource of the remote resource, for example a database // name on a database server. It is used only for generating the CLI command. TargetSubresourceName string @@ -58,6 +60,8 @@ type Config struct { KeyPath string // Insecure Insecure bool + // ClusterName is the Teleport cluster name + ClusterName string // WebProxyAddr WebProxyAddr string // Log is a component logger diff --git a/lib/teleterm/gateway/local_proxy_middleware.go b/lib/teleterm/gateway/db_middleware.go similarity index 89% rename from lib/teleterm/gateway/local_proxy_middleware.go rename to lib/teleterm/gateway/db_middleware.go index b163ae99d347c..1f62ae1ce7478 100644 --- a/lib/teleterm/gateway/local_proxy_middleware.go +++ b/lib/teleterm/gateway/db_middleware.go @@ -27,7 +27,7 @@ import ( "github.com/gravitational/teleport/lib/tlsca" ) -type localProxyMiddleware struct { +type dbMiddleware struct { onExpiredCert func(context.Context) error log *logrus.Entry dbRoute tlsca.RouteToDatabase @@ -39,7 +39,7 @@ type localProxyMiddleware struct { // // In the future, DBCertChecker is going to be extended so that it's used by both tsh and Connect // and this middleware will be removed. -func (m *localProxyMiddleware) OnNewConnection(ctx context.Context, lp *alpn.LocalProxy, conn net.Conn) error { +func (m *dbMiddleware) OnNewConnection(ctx context.Context, lp *alpn.LocalProxy, conn net.Conn) error { err := lp.CheckDBCerts(m.dbRoute) if err == nil { return nil @@ -58,6 +58,6 @@ func (m *localProxyMiddleware) OnNewConnection(ctx context.Context, lp *alpn.Loc // OnStart is a noop. client.DBCertChecker.OnStart checks cert validity too. However in Connect // there's no flow which would allow the user to create a local proxy without valid // certs. -func (m *localProxyMiddleware) OnStart(context.Context, *alpn.LocalProxy) error { +func (m *dbMiddleware) OnStart(context.Context, *alpn.LocalProxy) error { return nil } diff --git a/lib/teleterm/gateway/local_proxy_middleware_test.go b/lib/teleterm/gateway/db_middleware_test.go similarity index 97% rename from lib/teleterm/gateway/local_proxy_middleware_test.go rename to lib/teleterm/gateway/db_middleware_test.go index 03617d60ff50a..5166d05086175 100644 --- a/lib/teleterm/gateway/local_proxy_middleware_test.go +++ b/lib/teleterm/gateway/db_middleware_test.go @@ -34,7 +34,7 @@ import ( "github.com/gravitational/teleport/lib/utils/cert" ) -func TestLocalProxyMiddleware_OnNewConnection(t *testing.T) { +func TestDBMiddleware_OnNewConnection(t *testing.T) { testCert, err := cert.GenerateSelfSignedCert([]string{"localhost"}) require.NoError(t, err) tlsCert, err := keys.X509KeyPair(testCert.Cert, testCert.PrivateKey) @@ -103,7 +103,7 @@ func TestLocalProxyMiddleware_OnNewConnection(t *testing.T) { hasCalledOnExpiredCert := false - middleware := &localProxyMiddleware{ + middleware := &dbMiddleware{ onExpiredCert: func(context.Context) error { hasCalledOnExpiredCert = true return nil diff --git a/lib/teleterm/gateway/gateway.go b/lib/teleterm/gateway/gateway.go index 4287b916d2af2..60a1b93b18afe 100644 --- a/lib/teleterm/gateway/gateway.go +++ b/lib/teleterm/gateway/gateway.go @@ -70,55 +70,25 @@ func New(cfg Config) (*Gateway, error) { cfg.LocalPort = port - tlsCert, err := keys.LoadX509KeyPair(cfg.CertPath, cfg.KeyPath) - if err != nil { - return nil, trace.Wrap(err) - } - - if err := checkCertSubject(tlsCert, cfg.RouteToDatabase()); err != nil { - return nil, trace.Wrap(err, - "database certificate check failed, try restarting the database connection") - } - - localProxyConfig := alpn.LocalProxyConfig{ - InsecureSkipVerify: cfg.Insecure, - RemoteProxyAddr: cfg.WebProxyAddr, - Listener: listener, - ParentContext: closeContext, - Certs: []tls.Certificate{tlsCert}, - Clock: cfg.Clock, - ALPNConnUpgradeRequired: cfg.TLSRoutingConnUpgradeRequired, - } - - localProxyMiddleware := &localProxyMiddleware{ - log: cfg.Log, - dbRoute: cfg.RouteToDatabase(), - } - - if cfg.OnExpiredCert != nil { - localProxyConfig.Middleware = localProxyMiddleware - } - - localProxy, err := alpn.NewLocalProxy(localProxyConfig, - alpn.WithDatabaseProtocol(cfg.Protocol), - alpn.WithClusterCAsIfConnUpgrade(closeContext, cfg.RootClusterCACertPoolFunc), - ) - if err != nil { - return nil, trace.Wrap(err) - } - gateway := &Gateway{ cfg: &cfg, closeContext: closeContext, closeCancel: closeCancel, - localProxy: localProxy, } - if cfg.OnExpiredCert != nil { - localProxyMiddleware.onExpiredCert = func(ctx context.Context) error { - err := cfg.OnExpiredCert(ctx, gateway) - return trace.Wrap(err) + switch { + case uri.IsDB(cfg.TargetURI): + if err := gateway.makeLocalProxyForDB(listener); err != nil { + return nil, trace.Wrap(err) + } + + case uri.IsKube(cfg.TargetURI): + if err := gateway.makeLocalProxiesForKube(listener); err != nil { + return nil, trace.Wrap(err) } + + default: + return nil, trace.NotImplemented("gateway not supported for %v", cfg.TargetURI) } ok = true @@ -143,24 +113,46 @@ func NewWithLocalPort(gateway *Gateway, port string) (*Gateway, error) { func (g *Gateway) Close() error { g.closeCancel() - if err := g.localProxy.Close(); err != nil { - return trace.Wrap(err) + var errs []error + if g.localProxy != nil { + errs = append(errs, g.localProxy.Close()) } - - return nil + if g.forwardProxy != nil { + errs = append(errs, g.forwardProxy.Close()) + } + return trace.NewAggregate(errs...) } // Serve starts the underlying ALPN proxy. Blocks until closeContext is canceled. func (g *Gateway) Serve() error { g.cfg.Log.Info("Gateway is open.") + defer g.cfg.Log.Info("Gateway has closed.") - if err := g.localProxy.Start(g.closeContext); err != nil { - return trace.Wrap(err) + if g.forwardProxy != nil { + return trace.Wrap(g.serveWithForwardProxy()) } + return trace.Wrap(g.localProxy.Start(g.closeContext)) +} - g.cfg.Log.Info("Gateway has closed.") +func (g *Gateway) serveWithForwardProxy() error { + errChan := make(chan error, 2) + go func() { + if err := g.forwardProxy.Start(); err != nil { + errChan <- err + } + }() + go func() { + if err := g.localProxy.Start(g.closeContext); err != nil { + errChan <- err + } + }() - return nil + select { + case err := <-errChan: + return trace.Wrap(err) + case <-g.closeContext.Done(): + return nil + } } func (g *Gateway) URI() uri.ResourceURI { @@ -236,20 +228,15 @@ func (g *Gateway) CLICommand() (*api.GatewayCLICommand, error) { }, nil } -// RouteToDatabase returns tlsca.RouteToDatabase based on the config of the gateway. -// -// The tlsca.RouteToDatabase.Database field is skipped, as it's an optional field and gateways can -// change their Config.TargetSubresourceName at any moment. -func (g *Gateway) RouteToDatabase() tlsca.RouteToDatabase { - return g.cfg.RouteToDatabase() -} - // ReloadCert loads the key pair from cfg.CertPath & cfg.KeyPath and updates the cert of the running // local proxy. This is typically done after the cert is reissued and saved to disk. // // In the future, we're probably going to make this method accept the cert as an arg rather than // reading from disk. func (g *Gateway) ReloadCert() error { + if g.onNewCert == nil { + return nil + } g.cfg.Log.Debug("Reloading cert") tlsCert, err := keys.LoadX509KeyPair(g.cfg.CertPath, g.cfg.KeyPath) @@ -257,14 +244,14 @@ func (g *Gateway) ReloadCert() error { return trace.Wrap(err) } - if err := checkCertSubject(tlsCert, g.RouteToDatabase()); err != nil { - return trace.Wrap(err, - "database certificate check failed, try restarting the database connection") - } - - g.localProxy.SetCerts([]tls.Certificate{tlsCert}) + return trace.Wrap(g.onNewCert(tlsCert)) +} - return nil +func (g *Gateway) onExpiredCert(ctx context.Context) error { + if g.cfg.OnExpiredCert == nil { + return nil + } + return trace.Wrap(g.cfg.OnExpiredCert(ctx, g)) } // checkCertSubject checks if the cert subject matches the expected db route. @@ -290,8 +277,12 @@ func checkCertSubject(tlsCert tls.Certificate, dbRoute tlsca.RouteToDatabase) er // // In the future if Gateway becomes more complex it might be worthwhile to add an RWMutex to it. type Gateway struct { - cfg *Config - localProxy *alpn.LocalProxy + cfg *Config + localProxy *alpn.LocalProxy + forwardProxy *alpn.ForwardProxy + // onNewCert is a callback function that updates the local proxy when TLS + // certificate is reissued. + onNewCert func(tls.Certificate) error // closeContext and closeCancel are used to signal to any waiting goroutines // that the local proxy is now closed and to release any resources. closeContext context.Context diff --git a/lib/teleterm/gateway/gateway_db.go b/lib/teleterm/gateway/gateway_db.go new file mode 100644 index 0000000000000..a33c359f44ad9 --- /dev/null +++ b/lib/teleterm/gateway/gateway_db.go @@ -0,0 +1,88 @@ +/* +Copyright 2023 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gateway + +import ( + "crypto/tls" + "net" + + "github.com/gravitational/trace" + + "github.com/gravitational/teleport/api/utils/keys" + "github.com/gravitational/teleport/lib/srv/alpnproxy" + "github.com/gravitational/teleport/lib/tlsca" +) + +// RouteToDatabase returns tlsca.RouteToDatabase based on the config of the gateway. +// +// The tlsca.RouteToDatabase.Database field is skipped, as it's an optional field and gateways can +// change their Config.TargetSubresourceName at any moment. +func (g *Gateway) RouteToDatabase() tlsca.RouteToDatabase { + return g.cfg.RouteToDatabase() +} + +func (g *Gateway) makeLocalProxyForDB(listener net.Listener) error { + tlsCert, err := keys.LoadX509KeyPair(g.cfg.CertPath, g.cfg.KeyPath) + if err != nil { + return trace.Wrap(err) + } + + if err := checkCertSubject(tlsCert, g.RouteToDatabase()); err != nil { + return trace.Wrap(err, + "database certificate check failed, try restarting the database connection") + } + + localProxyConfig := alpnproxy.LocalProxyConfig{ + InsecureSkipVerify: g.cfg.Insecure, + RemoteProxyAddr: g.cfg.WebProxyAddr, + Listener: listener, + ParentContext: g.closeContext, + Certs: []tls.Certificate{tlsCert}, + Clock: g.cfg.Clock, + ALPNConnUpgradeRequired: g.cfg.TLSRoutingConnUpgradeRequired, + } + + if g.cfg.OnExpiredCert != nil { + localProxyConfig.Middleware = &dbMiddleware{ + log: g.cfg.Log, + dbRoute: g.cfg.RouteToDatabase(), + onExpiredCert: g.onExpiredCert, + } + } + + localProxy, err := alpnproxy.NewLocalProxy(localProxyConfig, + alpnproxy.WithDatabaseProtocol(g.cfg.Protocol), + alpnproxy.WithClusterCAsIfConnUpgrade(g.closeContext, g.cfg.RootClusterCACertPoolFunc), + ) + if err != nil { + return trace.Wrap(err) + } + + g.localProxy = localProxy + g.onNewCert = g.setDBCert + return nil +} + +func (g *Gateway) setDBCert(newCert tls.Certificate) error { + if err := checkCertSubject(newCert, g.RouteToDatabase()); err != nil { + return trace.Wrap(err, + "database certificate check failed, try restarting the database connection") + } + + g.localProxy.SetCerts([]tls.Certificate{newCert}) + return nil +} diff --git a/lib/teleterm/gateway/gateway_kube.go b/lib/teleterm/gateway/gateway_kube.go new file mode 100644 index 0000000000000..64b645a2bb365 --- /dev/null +++ b/lib/teleterm/gateway/gateway_kube.go @@ -0,0 +1,156 @@ +/* +Copyright 2023 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gateway + +import ( + "crypto/tls" + "encoding/pem" + "net" + + "github.com/gravitational/trace" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + + "github.com/gravitational/teleport/api/utils/keys" + "github.com/gravitational/teleport/lib/client" + "github.com/gravitational/teleport/lib/kube/kubeconfig" + "github.com/gravitational/teleport/lib/srv/alpnproxy" + "github.com/gravitational/teleport/lib/utils" +) + +// KubeconfigPath returns the kubeconfig path that can be used for clients to +// connect to the local proxy. +func (g *Gateway) KubeconfigPath() string { + // Assumes CertPath is unique per kube cluster. + return g.cfg.CertPath + ".kubeconfig" +} + +func (g *Gateway) makeLocalProxiesForKube(listener net.Listener) error { + if g.cfg.RootClusterCACertPoolFunc == nil { + return trace.BadParameter("missing RootClusterCACertPoolFunc") + } + + // A key is required here for generating local CAs. It can be any key. + // Reading the provided key path to avoid generating a new one. + key, err := keys.LoadPrivateKey(g.cfg.KeyPath) + if err != nil { + return trace.Wrap(err) + } + + cas, err := alpnproxy.CreateKubeLocalCAs(key, []string{g.cfg.ClusterName}) + if err != nil { + return trace.Wrap(err) + } + + if err := g.makeALPNLocalProxyForKube(cas); err != nil { + return trace.Wrap(err) + } + + if err := g.makeForwardProxyForKube(listener); err != nil { + return trace.NewAggregate(err, g.Close()) + } + + if err := g.writeKubeconfig(key, cas); err != nil { + return trace.NewAggregate(err, g.Close()) + } + return nil +} + +func (g *Gateway) makeALPNLocalProxyForKube(cas map[string]tls.Certificate) error { + // ALPN local proxy can use a random port. + listener, err := alpnproxy.NewKubeListener(cas) + if err != nil { + return trace.Wrap(err) + } + + middleware, err := g.makeKubeMiddleware() + if err != nil { + return trace.NewAggregate(err, listener.Close()) + } + + g.localProxy, err = alpnproxy.NewLocalProxy(alpnproxy.LocalProxyConfig{ + InsecureSkipVerify: g.cfg.Insecure, + RemoteProxyAddr: g.cfg.WebProxyAddr, + Listener: listener, + ParentContext: g.closeContext, + Clock: g.cfg.Clock, + ALPNConnUpgradeRequired: g.cfg.TLSRoutingConnUpgradeRequired, + }, + alpnproxy.WithHTTPMiddleware(middleware), + alpnproxy.WithSNI(client.GetKubeTLSServerName(g.cfg.WebProxyAddr)), + alpnproxy.WithClusterCAs(g.closeContext, g.cfg.RootClusterCACertPoolFunc), + ) + if err != nil { + return trace.NewAggregate(err, listener.Close()) + } + return nil +} + +func (g *Gateway) makeKubeMiddleware() (alpnproxy.LocalProxyHTTPMiddleware, error) { + cert, err := keys.LoadX509KeyPair(g.cfg.CertPath, g.cfg.KeyPath) + if err != nil { + return nil, trace.Wrap(err) + } + + certReissuer := newKubeCertReissuer(cert, g.onExpiredCert) + g.onNewCert = certReissuer.updateCert + + certs := make(alpnproxy.KubeClientCerts) + certs.Add(g.cfg.ClusterName, g.cfg.TargetName, cert) + return alpnproxy.NewKubeMiddleware(certs, certReissuer.reissueCert, g.cfg.Clock, g.cfg.Log), nil +} + +func (g *Gateway) makeForwardProxyForKube(listener net.Listener) (err error) { + // Use provided listener with user configured port for the forward proxy. + g.forwardProxy, err = alpnproxy.NewKubeForwardProxyWithListener(g.closeContext, listener, g.localProxy.GetAddr()) + return trace.Wrap(err) +} + +func (g *Gateway) writeKubeconfig(key *keys.PrivateKey, cas map[string]tls.Certificate) error { + ca, ok := cas[g.cfg.ClusterName] + if !ok { + return trace.BadParameter("CA for teleport cluster %q is missing", g.cfg.ClusterName) + } + + x509Cert, err := utils.TLSCertLeaf(ca) + if err != nil { + return trace.BadParameter("could not parse CA certificate for cluster %q", g.cfg.ClusterName) + } + + values := &kubeconfig.LocalProxyValues{ + // Ideally tc.KubeClusterAddr() should be used for + // TeleportKubeClusterAddr as it matches what tsh kube login sets in + // the kubeconfig. In this case it is not a big deal since this + // ephemeral config has only a single kube cluster. Also + // tc.KubeClusterAddr() is likely the same as WebProxyAddr anyway. + TeleportKubeClusterAddr: "https://" + g.cfg.WebProxyAddr, + LocalProxyURL: "http://" + g.forwardProxy.GetAddr(), + ClientKeyData: key.PrivateKeyPEM(), + Clusters: []kubeconfig.LocalProxyCluster{{ + TeleportCluster: g.cfg.ClusterName, + KubeCluster: g.cfg.TargetName, + Impersonate: g.cfg.TargetUser, + ImpersonateGroups: g.cfg.TargetGroups, + Namespace: g.cfg.TargetSubresourceName, + }}, + LocalProxyCAs: map[string][]byte{ + g.cfg.ClusterName: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: x509Cert.Raw}), + }, + } + + config := kubeconfig.CreateLocalProxyConfig(clientcmdapi.NewConfig(), values) + return trace.Wrap(kubeconfig.Save(g.KubeconfigPath(), *config)) +} diff --git a/lib/teleterm/gateway/gateway_kube_test.go b/lib/teleterm/gateway/gateway_kube_test.go new file mode 100644 index 0000000000000..c70cc0bc7863c --- /dev/null +++ b/lib/teleterm/gateway/gateway_kube_test.go @@ -0,0 +1,256 @@ +/* +Copyright 2023 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gateway + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/json" + "net" + "net/http" + "net/url" + "path" + "testing" + "time" + + "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/gravitational/teleport/api/utils/keys" + "github.com/gravitational/teleport/lib/client" + "github.com/gravitational/teleport/lib/defaults" + "github.com/gravitational/teleport/lib/fixtures" + "github.com/gravitational/teleport/lib/kube/kubeconfig" + "github.com/gravitational/teleport/lib/srv/alpnproxy/common" + "github.com/gravitational/teleport/lib/teleterm/api/uri" + "github.com/gravitational/teleport/lib/teleterm/gatewaytest" + "github.com/gravitational/teleport/lib/tlsca" + "github.com/gravitational/teleport/lib/utils" +) + +func TestKubeGateway(t *testing.T) { + t.Parallel() + + const ( + teleportClusterName = "example.com" + kubeClusterName = "example-kube-cluster" + ) + + identity := tlsca.Identity{ + Username: "alice", + Groups: []string{"test-group"}, + KubernetesCluster: kubeClusterName, + } + clock := clockwork.NewFakeClock() + proxy := mustStartMockProxyWithKubeAPI(t, identity) + gateway, err := New( + Config{ + Clock: clock, + TargetName: kubeClusterName, + TargetURI: uri.NewClusterURI(teleportClusterName).AppendKube(kubeClusterName).String(), + TargetUser: identity.Username, + CertPath: proxy.clientCertPath(), + KeyPath: proxy.clientKeyPath(), + WebProxyAddr: proxy.webProxyAddr, + ClusterName: teleportClusterName, + CLICommandProvider: mockCLICommandProvider{}, + RootClusterCACertPoolFunc: func(_ context.Context) (*x509.CertPool, error) { + return proxy.certPool(), nil + }, + OnExpiredCert: func(_ context.Context, gateway *Gateway) error { + return trace.Wrap(gateway.ReloadCert()) + }, + }, + ) + require.NoError(t, err) + t.Cleanup(func() { + gateway.Close() + }) + go gateway.Serve() + + // First request should succeed. + kubeClient := kubeClientForLocalProxy(t, gateway.KubeconfigPath(), teleportClusterName, kubeClusterName) + sendRequestToKubeLocalProxyAndSucceed(t, kubeClient) + + // Let proxy "rotate" client cert. Request should fail as the gateway is + // still using the old cert. + proxy.mustIssueClientCert(t, identity) + sendRequestToKubeLocalProxyAndFail(t, kubeClient) + + // Expire the cert so reissue flow is triggered: + // kubeMiddleware -> kubeCertReissuer.reissueCert -> gateway.cfg.OnExpiredCert -> gateway.ReloadCert -> kubeCertReissuer.updateCert + clock.Advance(time.Hour) + sendRequestToKubeLocalProxyAndSucceed(t, kubeClient) +} + +func sendRequestToKubeLocalProxyAndSucceed(t *testing.T, client *kubernetes.Clientset) { + t.Helper() + resp, err := client.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{}) + require.NoError(t, err) + require.Equal(t, len(resp.Items), 1) + require.Equal(t, "kube-pod-name", resp.Items[0].GetName()) +} +func sendRequestToKubeLocalProxyAndFail(t *testing.T, client *kubernetes.Clientset) { + t.Helper() + _, err := client.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{}) + require.Error(t, err) +} +func kubeClientForLocalProxy(t *testing.T, kubeconfigPath, teleportCluster, kubeCluster string) *kubernetes.Clientset { + t.Helper() + + config, err := kubeconfig.Load(kubeconfigPath) + require.NoError(t, err) + + contextName := kubeconfig.ContextName(teleportCluster, kubeCluster) + proxyURL, err := url.Parse(config.Clusters[contextName].ProxyURL) + require.NoError(t, err) + + tlsClientConfig := rest.TLSClientConfig{ + CAData: config.Clusters[contextName].CertificateAuthorityData, + CertData: config.AuthInfos[contextName].ClientCertificateData, + KeyData: config.AuthInfos[contextName].ClientKeyData, + ServerName: common.KubeLocalProxySNI(teleportCluster, kubeCluster), + } + client, err := kubernetes.NewForConfig(&rest.Config{ + Host: "https://" + teleportCluster, + TLSClientConfig: tlsClientConfig, + Proxy: http.ProxyURL(proxyURL), + }) + require.NoError(t, err) + return client +} + +type mockProxyWithKubeAPI struct { + webProxyAddr string + key *keys.PrivateKey + ca *tlsca.CertAuthority + dir string +} + +func (m *mockProxyWithKubeAPI) clientCertPath() string { + return path.Join(m.dir, "cert.pem") +} +func (m *mockProxyWithKubeAPI) clientKeyPath() string { + return path.Join(m.dir, "key.pem") +} + +func (m *mockProxyWithKubeAPI) mustIssueClientCert(t *testing.T, identity tlsca.Identity) { + t.Helper() + gatewaytest.MustGenCertSignedWithCAAndSaveToPaths(t, m.ca, identity, m.clientCertPath(), m.clientKeyPath()) +} + +func (m *mockProxyWithKubeAPI) verifyConnection(state tls.ConnectionState) error { + if len(state.PeerCertificates) != 1 { + return trace.BadParameter("expecting one client cert") + } + wantCert, err := utils.ReadCertificatesFromPath(m.clientCertPath()) + if err != nil { + return trace.Wrap(err) + } + if !bytes.Equal(state.PeerCertificates[0].Raw, wantCert[0].Raw) { + return trace.AccessDenied("client cert is invalid") + } + return nil +} + +func (m *mockProxyWithKubeAPI) certPool() *x509.CertPool { + certPool := x509.NewCertPool() + certPool.AddCert(m.ca.Cert) + return certPool +} + +func mustStartMockProxyWithKubeAPI(t *testing.T, identity tlsca.Identity) *mockProxyWithKubeAPI { + t.Helper() + + netListener, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + t.Cleanup(func() { + netListener.Close() + }) + + key, err := keys.ParsePrivateKey(fixtures.LocalhostKey) + require.NoError(t, err) + serverTLSCert, serverCA := mustGenCAForProxyKubeAddr(t, key, netListener.Addr().String()) + + m := &mockProxyWithKubeAPI{ + webProxyAddr: netListener.Addr().String(), + key: key, + ca: serverCA, + dir: t.TempDir(), + } + m.mustIssueClientCert(t, identity) + + tlsListener := tls.NewListener(netListener, &tls.Config{ + Certificates: []tls.Certificate{serverTLSCert}, + VerifyConnection: m.verifyConnection, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: m.certPool(), + }) + go http.Serve(tlsListener, mockKubeAPIHandler()) + return m +} + +func mustGenCAForProxyKubeAddr(t *testing.T, key *keys.PrivateKey, host string) (tls.Certificate, *tlsca.CertAuthority) { + t.Helper() + + certPem, err := tlsca.GenerateSelfSignedCAWithConfig(tlsca.GenerateCAConfig{ + Entity: pkix.Name{ + CommonName: "localhost", + Organization: []string{"Teleport"}, + }, + Signer: key, + DNSNames: []string{client.GetKubeTLSServerName(host)}, // Use special kube SNI. + TTL: defaults.CATTL, + }) + require.NoError(t, err) + tlsCert, err := keys.X509KeyPair(certPem, key.PrivateKeyPEM()) + require.NoError(t, err) + ca, err := tlsca.FromTLSCertificate(tlsCert) + require.NoError(t, err) + return tlsCert, ca +} + +func mockKubeAPIHandler() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/namespaces/default/pods", func(rw http.ResponseWriter, r *http.Request) { + rw.Header().Set("Content-Type", "application/json") + rw.Header().Set("Connection:", "close") + json.NewEncoder(rw).Encode(&v1.PodList{ + TypeMeta: metav1.TypeMeta{ + Kind: "PodList", + APIVersion: "v1", + }, + Items: []v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "kube-pod-name", + Namespace: "default", + }, + }, + }, + }) + }) + return mux +} diff --git a/lib/teleterm/gateway/kube_cert_reissuer.go b/lib/teleterm/gateway/kube_cert_reissuer.go new file mode 100644 index 0000000000000..5281d74727fa7 --- /dev/null +++ b/lib/teleterm/gateway/kube_cert_reissuer.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gateway + +import ( + "context" + "crypto/tls" + "sync/atomic" + + "github.com/gravitational/trace" +) + +// kubeCertReissuer implements a simple single-kube cert reissuer that can be +// used for kube local proxy middleware. +type kubeCertReissuer struct { + cert atomic.Value + onExpiredCert func(context.Context) error +} + +func newKubeCertReissuer(cert tls.Certificate, onExpiredCert func(context.Context) error) *kubeCertReissuer { + r := &kubeCertReissuer{ + onExpiredCert: onExpiredCert, + } + r.updateCert(cert) + return r +} + +func (r *kubeCertReissuer) reissueCert(ctx context.Context, _, _ string) (tls.Certificate, error) { + if err := r.onExpiredCert(ctx); err != nil { + return tls.Certificate{}, trace.Wrap(err) + } + return r.cert.Load().(tls.Certificate), nil +} + +func (r *kubeCertReissuer) updateCert(cert tls.Certificate) error { + r.cert.Store(cert) + return nil +} diff --git a/lib/teleterm/gatewaytest/helpers.go b/lib/teleterm/gatewaytest/helpers.go index e8e7be6d0a17e..e8decfcf75627 100644 --- a/lib/teleterm/gatewaytest/helpers.go +++ b/lib/teleterm/gatewaytest/helpers.go @@ -24,6 +24,7 @@ import ( "fmt" "net" "os" + "path" "testing" "time" @@ -32,6 +33,8 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/exp/slices" + "github.com/gravitational/teleport" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/tlsca" ) @@ -148,41 +151,33 @@ type KeyPairPaths struct { func MustGenAndSaveCert(t *testing.T, identity tlsca.Identity) KeyPairPaths { t.Helper() - dir := t.TempDir() - ca := mustGenCACert(t) - tlsCert := mustGenCertSignedWithCA(t, ca, identity) - - privateKey, ok := tlsCert.PrivateKey.(*rsa.PrivateKey) - require.True(t, ok, "Failed to cast tlsCert.PrivateKey") + dir := t.TempDir() + certPath := path.Join(dir, "cert.pem") + keyPath := path.Join(dir, "key.pem") - // Save the cert. + MustGenCertSignedWithCAAndSaveToPaths(t, ca, identity, certPath, keyPath) + return KeyPairPaths{ + CertPath: certPath, + KeyPath: keyPath, + } +} - certFile, err := os.CreateTemp(dir, "cert") - require.NoError(t, err) +func MustGenCertSignedWithCAAndSaveToPaths(t *testing.T, ca *tlsca.CertAuthority, identity tlsca.Identity, certPath, keyPath string) { + t.Helper() + // Save the cert. + tlsCert := mustGenCertSignedWithCA(t, ca, identity) pemCert := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: tlsCert.Certificate[0]}) - - _, err = certFile.Write(pemCert) - require.NoError(t, err) - require.NoError(t, certFile.Close()) + require.NoError(t, os.WriteFile(certPath, pemCert, teleport.FileMaskOwnerOnly)) // Save the private key. - - keyFile, err := os.CreateTemp(dir, "key") - require.NoError(t, err) + privateKey, ok := tlsCert.PrivateKey.(*rsa.PrivateKey) + require.True(t, ok, "Failed to cast tlsCert.PrivateKey") pemPrivateKey := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)}) - - _, err = keyFile.Write(pemPrivateKey) - require.NoError(t, err) - require.NoError(t, keyFile.Close()) - - return KeyPairPaths{ - CertPath: certFile.Name(), - KeyPath: keyFile.Name(), - } + require.NoError(t, os.WriteFile(keyPath, pemPrivateKey, teleport.FileMaskOwnerOnly)) } func mustGenCACert(t *testing.T) *tlsca.CertAuthority { diff --git a/tool/tsh/common/kube_proxy.go b/tool/tsh/common/kube_proxy.go index f76aab20ff02a..1a87ad6ab71d1 100644 --- a/tool/tsh/common/kube_proxy.go +++ b/tool/tsh/common/kube_proxy.go @@ -19,7 +19,6 @@ package common import ( "context" "crypto/tls" - "crypto/x509/pkix" "encoding/pem" "fmt" "net" @@ -39,11 +38,8 @@ import ( "github.com/gravitational/teleport/api/utils/keys" "github.com/gravitational/teleport/lib/asciitable" "github.com/gravitational/teleport/lib/client" - "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/kube/kubeconfig" "github.com/gravitational/teleport/lib/srv/alpnproxy" - "github.com/gravitational/teleport/lib/srv/alpnproxy/common" - "github.com/gravitational/teleport/lib/tlsca" "github.com/gravitational/teleport/lib/utils" ) @@ -202,17 +198,12 @@ func makeKubeLocalProxy(cf *CLIConf, tc *client.TeleportClient, clusters kubecon return nil, trace.Wrap(err) } - keyPem, err := utils.ReadPath(profile.KeyPath()) + localClientKey, err := keys.LoadPrivateKey(profile.KeyPath()) if err != nil { return nil, trace.Wrap(err) } - localClientKey, err := keys.ParsePrivateKey(keyPem) - if err != nil { - return nil, trace.Wrap(err) - } - - cas, err := createKubeLocalCAs(localClientKey, clusters.TeleportClusters()) + cas, err := alpnproxy.CreateKubeLocalCAs(localClientKey, clusters.TeleportClusters()) if err != nil { return nil, trace.Wrap(err) } @@ -338,40 +329,6 @@ func (k *kubeLocalProxy) WriteKubeConfig() error { return trace.Wrap(kubeconfig.Save(k.KubeConfigPath(), *k.kubeconfig)) } -func createKubeLocalCAs(userKey *keys.PrivateKey, teleportClusters []string) (map[string]tls.Certificate, error) { - cas := make(map[string]tls.Certificate) - for _, teleportCluster := range teleportClusters { - ca, err := createLocalCA(userKey, time.Now().Add(defaults.CATTL), common.KubeLocalProxyWildcardDomain(teleportCluster)) - if err != nil { - return nil, trace.Wrap(err) - } - cas[teleportCluster] = ca - } - return cas, nil -} - -func createLocalCA(key *keys.PrivateKey, validUntil time.Time, dnsNames ...string) (tls.Certificate, error) { - cert, err := tlsca.GenerateSelfSignedCAWithConfig(tlsca.GenerateCAConfig{ - Entity: pkix.Name{ - CommonName: "localhost", - Organization: []string{"Teleport"}, - }, - Signer: key, - DNSNames: dnsNames, - IPAddresses: []net.IP{net.ParseIP(defaults.Localhost)}, - TTL: time.Until(validUntil), - }) - if err != nil { - return tls.Certificate{}, trace.Wrap(err) - } - - tlsCert, err := keys.X509KeyPair(cert, key.PrivateKeyPEM()) - if err != nil { - return tls.Certificate{}, trace.Wrap(err) - } - return tlsCert, nil -} - func loadKubeUserCerts(ctx context.Context, tc *client.TeleportClient, clusters kubeconfig.LocalProxyClusters) (alpnproxy.KubeClientCerts, error) { ctx, span := tc.Tracer.Start(ctx, "loadKubeUserCerts") defer span.End() From 77579c95791636c21e03639bf12cc0c1144ec5b0 Mon Sep 17 00:00:00 2001 From: STeve Huang Date: Mon, 26 Jun 2023 14:20:32 -0400 Subject: [PATCH 2/7] fix lint --- lib/teleterm/gateway/gateway_kube_test.go | 1 - lib/teleterm/gatewaytest/helpers.go | 13 ++++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/teleterm/gateway/gateway_kube_test.go b/lib/teleterm/gateway/gateway_kube_test.go index c70cc0bc7863c..c95dfae1740f3 100644 --- a/lib/teleterm/gateway/gateway_kube_test.go +++ b/lib/teleterm/gateway/gateway_kube_test.go @@ -236,7 +236,6 @@ func mockKubeAPIHandler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/api/v1/namespaces/default/pods", func(rw http.ResponseWriter, r *http.Request) { rw.Header().Set("Content-Type", "application/json") - rw.Header().Set("Connection:", "close") json.NewEncoder(rw).Encode(&v1.PodList{ TypeMeta: metav1.TypeMeta{ Kind: "PodList", diff --git a/lib/teleterm/gatewaytest/helpers.go b/lib/teleterm/gatewaytest/helpers.go index e8decfcf75627..5e23c075eed7b 100644 --- a/lib/teleterm/gatewaytest/helpers.go +++ b/lib/teleterm/gatewaytest/helpers.go @@ -34,7 +34,6 @@ import ( "golang.org/x/exp/slices" "github.com/gravitational/teleport" - "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/tlsca" ) @@ -151,12 +150,12 @@ type KeyPairPaths struct { func MustGenAndSaveCert(t *testing.T, identity tlsca.Identity) KeyPairPaths { t.Helper() - ca := mustGenCACert(t) - dir := t.TempDir() certPath := path.Join(dir, "cert.pem") keyPath := path.Join(dir, "key.pem") + ca := mustGenCACert(t) + MustGenCertSignedWithCAAndSaveToPaths(t, ca, identity, certPath, keyPath) return KeyPairPaths{ CertPath: certPath, @@ -167,15 +166,15 @@ func MustGenAndSaveCert(t *testing.T, identity tlsca.Identity) KeyPairPaths { func MustGenCertSignedWithCAAndSaveToPaths(t *testing.T, ca *tlsca.CertAuthority, identity tlsca.Identity, certPath, keyPath string) { t.Helper() - // Save the cert. tlsCert := mustGenCertSignedWithCA(t, ca, identity) + privateKey, ok := tlsCert.PrivateKey.(*rsa.PrivateKey) + require.True(t, ok, "Failed to cast tlsCert.PrivateKey") + + // Save the cert. pemCert := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: tlsCert.Certificate[0]}) require.NoError(t, os.WriteFile(certPath, pemCert, teleport.FileMaskOwnerOnly)) // Save the private key. - privateKey, ok := tlsCert.PrivateKey.(*rsa.PrivateKey) - require.True(t, ok, "Failed to cast tlsCert.PrivateKey") - pemPrivateKey := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)}) require.NoError(t, os.WriteFile(keyPath, pemPrivateKey, teleport.FileMaskOwnerOnly)) } From 88ea4745bbfa02f9f153bf99b5723b4ac08bf3ae Mon Sep 17 00:00:00 2001 From: STeve Huang Date: Thu, 29 Jun 2023 15:11:21 -0400 Subject: [PATCH 3/7] move IsDB/IsKube to resource URI --- lib/teleterm/api/uri/uri.go | 8 ++++---- lib/teleterm/api/uri/uri_test.go | 28 ++++++++++++++-------------- lib/teleterm/gateway/gateway.go | 6 +++--- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/lib/teleterm/api/uri/uri.go b/lib/teleterm/api/uri/uri.go index 1fc9a4a48c2fb..cc872efbe81af 100644 --- a/lib/teleterm/api/uri/uri.go +++ b/lib/teleterm/api/uri/uri.go @@ -196,11 +196,11 @@ func (r ResourceURI) String() string { } // IsDB returns true if URI is a database resource. -func IsDB(resourceURI string) bool { - return New(resourceURI).GetDbName() != "" +func (r ResourceURI) IsDB() bool { + return r.GetDbName() != "" } // IsDB returns true if URI is a kube resource. -func IsKube(resourceURI string) bool { - return New(resourceURI).GetKubeName() != "" +func (r ResourceURI) IsKube() bool { + return r.GetKubeName() != "" } diff --git a/lib/teleterm/api/uri/uri_test.go b/lib/teleterm/api/uri/uri_test.go index e9daf0f178131..7c1b24b19396c 100644 --- a/lib/teleterm/api/uri/uri_test.go +++ b/lib/teleterm/api/uri/uri_test.go @@ -265,64 +265,64 @@ func TestGetRootClusterURI(t *testing.T) { func TestIsDB(t *testing.T) { tests := []struct { - in string + in uri.ResourceURI check require.BoolAssertionFunc }{ { - in: uri.NewClusterURI("foo").AppendDB("db").String(), + in: uri.NewClusterURI("foo").AppendDB("db"), check: require.True, }, { - in: uri.NewClusterURI("foo").AppendLeafCluster("bar").AppendDB("db").String(), + in: uri.NewClusterURI("foo").AppendLeafCluster("bar").AppendDB("db"), check: require.True, }, { - in: uri.NewClusterURI("foo").String(), + in: uri.NewClusterURI("foo"), check: require.False, }, { - in: uri.NewClusterURI("foo").AppendLeafCluster("bar").String(), + in: uri.NewClusterURI("foo").AppendLeafCluster("bar"), check: require.False, }, { - in: uri.NewClusterURI("foo").AppendKube("kube").String(), + in: uri.NewClusterURI("foo").AppendKube("kube"), check: require.False, }, } for _, tt := range tests { - tt.check(t, uri.IsDB(tt.in)) + tt.check(t, tt.in.IsDB()) } } func TestIsKube(t *testing.T) { tests := []struct { - in string + in uri.ResourceURI check require.BoolAssertionFunc }{ { - in: uri.NewClusterURI("foo").AppendKube("kube").String(), + in: uri.NewClusterURI("foo").AppendKube("kube"), check: require.True, }, { - in: uri.NewClusterURI("foo").AppendLeafCluster("bar").AppendKube("kube").String(), + in: uri.NewClusterURI("foo").AppendLeafCluster("bar").AppendKube("kube"), check: require.True, }, { - in: uri.NewClusterURI("foo").String(), + in: uri.NewClusterURI("foo"), check: require.False, }, { - in: uri.NewClusterURI("foo").AppendLeafCluster("bar").String(), + in: uri.NewClusterURI("foo").AppendLeafCluster("bar"), check: require.False, }, { - in: uri.NewClusterURI("foo").AppendDB("db").String(), + in: uri.NewClusterURI("foo").AppendDB("db"), check: require.False, }, } for _, tt := range tests { - tt.check(t, uri.IsKube(tt.in)) + tt.check(t, tt.in.IsKube()) } } diff --git a/lib/teleterm/gateway/gateway.go b/lib/teleterm/gateway/gateway.go index 60a1b93b18afe..e6672b258db18 100644 --- a/lib/teleterm/gateway/gateway.go +++ b/lib/teleterm/gateway/gateway.go @@ -76,13 +76,13 @@ func New(cfg Config) (*Gateway, error) { closeCancel: closeCancel, } - switch { - case uri.IsDB(cfg.TargetURI): + switch targetURI := uri.New(cfg.TargetURI); { + case targetURI.IsDB(): if err := gateway.makeLocalProxyForDB(listener); err != nil { return nil, trace.Wrap(err) } - case uri.IsKube(cfg.TargetURI): + case targetURI.IsKube(): if err := gateway.makeLocalProxiesForKube(listener); err != nil { return nil, trace.Wrap(err) } From 8693a3ea8bf3d2c0588f3ee457c545ed806f95e7 Mon Sep 17 00:00:00 2001 From: STeve Huang Date: Tue, 4 Jul 2023 11:55:41 -0400 Subject: [PATCH 4/7] address review comments --- lib/srv/alpnproxy/forward_proxy.go | 2 +- lib/srv/alpnproxy/local_proxy.go | 2 +- lib/teleterm/gateway/config.go | 16 ++++++++++-- lib/teleterm/gateway/gateway.go | 2 +- lib/teleterm/gateway/gateway_kube.go | 29 +++++++++++++++------- lib/teleterm/gateway/gateway_kube_test.go | 12 ++++++--- lib/teleterm/gateway/kube_cert_reissuer.go | 3 +++ 7 files changed, 48 insertions(+), 18 deletions(-) diff --git a/lib/srv/alpnproxy/forward_proxy.go b/lib/srv/alpnproxy/forward_proxy.go index 684a75298fba2..06f5f3384884b 100644 --- a/lib/srv/alpnproxy/forward_proxy.go +++ b/lib/srv/alpnproxy/forward_proxy.go @@ -100,7 +100,7 @@ func (p *ForwardProxy) Start() error { // Close closes the forward proxy. func (p *ForwardProxy) Close() error { - if err := p.cfg.Listener.Close(); err != nil { + if err := p.cfg.Listener.Close(); err != nil && !utils.IsUseOfClosedNetworkError(err) { return trace.Wrap(err) } return nil diff --git a/lib/srv/alpnproxy/local_proxy.go b/lib/srv/alpnproxy/local_proxy.go index 426005a0e1d12..db95d830f0507 100644 --- a/lib/srv/alpnproxy/local_proxy.go +++ b/lib/srv/alpnproxy/local_proxy.go @@ -246,7 +246,7 @@ func (l *LocalProxy) handleDownstreamConnection(ctx context.Context, downstreamC func (l *LocalProxy) Close() error { l.cancel() if l.cfg.Listener != nil { - if err := l.cfg.Listener.Close(); err != nil { + if err := l.cfg.Listener.Close(); err != nil && !utils.IsUseOfClosedNetworkError(err) { return trace.Wrap(err) } } diff --git a/lib/teleterm/gateway/config.go b/lib/teleterm/gateway/config.go index 98a7590191fe7..3eff69e1f22db 100644 --- a/lib/teleterm/gateway/config.go +++ b/lib/teleterm/gateway/config.go @@ -18,6 +18,7 @@ package gateway import ( "context" + "crypto/x509" "runtime" "github.com/google/uuid" @@ -54,9 +55,12 @@ type Config struct { LocalAddress string // Protocol is the gateway protocol Protocol string - // CertPath + // CertPath specifies the path to the user certificate that the local proxy + // uses to connect to the Teleport Proxy. The path may depend on the type + // and the parameters of the gateway. CertPath string - // KeyPath + // KeyPath specifies the path to the private key of the cert specified in + // the CertPath. This is usually the private key of the user profile. KeyPath string // Insecure Insecure bool @@ -139,6 +143,14 @@ func (c *Config) CheckAndSetDefaults() error { c.Clock = clockwork.NewRealClock() } + if c.RootClusterCACertPoolFunc == nil { + if !c.Insecure { + return trace.BadParameter("missing RootClusterCACertPoolFunc") + } + c.RootClusterCACertPoolFunc = func(_ context.Context) (*x509.CertPool, error) { + return x509.NewCertPool(), nil + } + } return nil } diff --git a/lib/teleterm/gateway/gateway.go b/lib/teleterm/gateway/gateway.go index e6672b258db18..88e891c74cc6a 100644 --- a/lib/teleterm/gateway/gateway.go +++ b/lib/teleterm/gateway/gateway.go @@ -149,7 +149,7 @@ func (g *Gateway) serveWithForwardProxy() error { select { case err := <-errChan: - return trace.Wrap(err) + return trace.NewAggregate(err, g.Close()) case <-g.closeContext.Done(): return nil } diff --git a/lib/teleterm/gateway/gateway_kube.go b/lib/teleterm/gateway/gateway_kube.go index 64b645a2bb365..10301e27c1fd9 100644 --- a/lib/teleterm/gateway/gateway_kube.go +++ b/lib/teleterm/gateway/gateway_kube.go @@ -39,10 +39,6 @@ func (g *Gateway) KubeconfigPath() string { } func (g *Gateway) makeLocalProxiesForKube(listener net.Listener) error { - if g.cfg.RootClusterCACertPoolFunc == nil { - return trace.BadParameter("missing RootClusterCACertPoolFunc") - } - // A key is required here for generating local CAs. It can be any key. // Reading the provided key path to avoid generating a new one. key, err := keys.LoadPrivateKey(g.cfg.KeyPath) @@ -70,7 +66,9 @@ func (g *Gateway) makeLocalProxiesForKube(listener net.Listener) error { } func (g *Gateway) makeALPNLocalProxyForKube(cas map[string]tls.Certificate) error { - // ALPN local proxy can use a random port. + // ALPN local proxy can use a random port as it receives requests from the + // forward proxy so there should be no requests coming from users' clients + // directly. listener, err := alpnproxy.NewKubeListener(cas) if err != nil { return trace.Wrap(err) @@ -132,10 +130,23 @@ func (g *Gateway) writeKubeconfig(key *keys.PrivateKey, cas map[string]tls.Certi values := &kubeconfig.LocalProxyValues{ // Ideally tc.KubeClusterAddr() should be used for - // TeleportKubeClusterAddr as it matches what tsh kube login sets in - // the kubeconfig. In this case it is not a big deal since this - // ephemeral config has only a single kube cluster. Also - // tc.KubeClusterAddr() is likely the same as WebProxyAddr anyway. + // TeleportKubeClusterAddr here. + // + // Kube cluster address is used as server address when `tsh kube login` + // adds cluster entries in the default kubeconfig. When creating + // kubeconfig for a local proxy, TeleportKubeClusterAddr is mainly used + // to identify which clusters in the kubeconfig belong to the current + // tsh profile, in case the default kubeconfig has other clusters. It + // also serves as a reference so that the server address of a cluster + // in the kubeconfig of `tsh proxy kube` and `tsh kube login` are the + // same. + // + // In this case here, since the kubeconfig for the local proxy is only + // for a single kube cluster and it is not created from the default + // kubeconfig, specifying the kube cluster address is not necessary. + // + // In most cases, tc.KubeClusterAddr() is the same as + // g.cfg.WebProxyAddr anyway. TeleportKubeClusterAddr: "https://" + g.cfg.WebProxyAddr, LocalProxyURL: "http://" + g.forwardProxy.GetAddr(), ClientKeyData: key.PrivateKeyPEM(), diff --git a/lib/teleterm/gateway/gateway_kube_test.go b/lib/teleterm/gateway/gateway_kube_test.go index c95dfae1740f3..349938d44ffb9 100644 --- a/lib/teleterm/gateway/gateway_kube_test.go +++ b/lib/teleterm/gateway/gateway_kube_test.go @@ -85,10 +85,11 @@ func TestKubeGateway(t *testing.T) { }, ) require.NoError(t, err) - t.Cleanup(func() { - gateway.Close() - }) - go gateway.Serve() + serveErr := make(chan error) + go func() { + err := gateway.Serve() + serveErr <- err + }() // First request should succeed. kubeClient := kubeClientForLocalProxy(t, gateway.KubeconfigPath(), teleportClusterName, kubeClusterName) @@ -103,6 +104,9 @@ func TestKubeGateway(t *testing.T) { // kubeMiddleware -> kubeCertReissuer.reissueCert -> gateway.cfg.OnExpiredCert -> gateway.ReloadCert -> kubeCertReissuer.updateCert clock.Advance(time.Hour) sendRequestToKubeLocalProxyAndSucceed(t, kubeClient) + + require.NoError(t, gateway.Close()) + require.NoError(t, <-serveErr) } func sendRequestToKubeLocalProxyAndSucceed(t *testing.T, client *kubernetes.Clientset) { diff --git a/lib/teleterm/gateway/kube_cert_reissuer.go b/lib/teleterm/gateway/kube_cert_reissuer.go index 5281d74727fa7..4e40da7082ba5 100644 --- a/lib/teleterm/gateway/kube_cert_reissuer.go +++ b/lib/teleterm/gateway/kube_cert_reissuer.go @@ -39,6 +39,9 @@ func newKubeCertReissuer(cert tls.Certificate, onExpiredCert func(context.Contex return r } +// reissueCert implements alpnproxy.KubeCertReissuer. Arguments +// "teleportCluster" and "kubeCluster" are omitted as this reissuer is bound to +// a single kube cluster. func (r *kubeCertReissuer) reissueCert(ctx context.Context, _, _ string) (tls.Certificate, error) { if err := r.onExpiredCert(ctx); err != nil { return tls.Certificate{}, trace.Wrap(err) From cef661e0b355d573ad7cc27e1276b0cd5a3ebb50 Mon Sep 17 00:00:00 2001 From: STeve Huang Date: Tue, 4 Jul 2023 12:35:20 -0400 Subject: [PATCH 5/7] config dir --- lib/teleterm/gateway/config.go | 5 +++++ lib/teleterm/gateway/gateway.go | 6 ++++++ lib/teleterm/gateway/gateway_kube.go | 20 +++++++++++++++++--- lib/teleterm/gateway/gateway_kube_test.go | 3 ++- lib/utils/fs.go | 7 ++++--- tool/tsh/common/app.go | 5 ++++- tool/tsh/common/kube_proxy.go | 7 +++++-- 7 files changed, 43 insertions(+), 10 deletions(-) diff --git a/lib/teleterm/gateway/config.go b/lib/teleterm/gateway/config.go index 3eff69e1f22db..aa495d40e780a 100644 --- a/lib/teleterm/gateway/config.go +++ b/lib/teleterm/gateway/config.go @@ -66,6 +66,8 @@ type Config struct { Insecure bool // ClusterName is the Teleport cluster name ClusterName string + // Username is the username of the profile. + Username string // WebProxyAddr WebProxyAddr string // Log is a component logger @@ -88,6 +90,9 @@ type Config struct { // RootClusterCACertPoolFunc is callback function to fetch Root cluster CAs // when ALPN connection upgrade is required. RootClusterCACertPoolFunc alpnproxy.GetClusterCACertPoolFunc + // ConfigDir specifies a dir used for saving various configuration used by + // the gateway. + ConfigDir string } // OnExpiredCertFunc is the type of a function that is called when a new downstream connection is diff --git a/lib/teleterm/gateway/gateway.go b/lib/teleterm/gateway/gateway.go index 88e891c74cc6a..9fc449b9bfeff 100644 --- a/lib/teleterm/gateway/gateway.go +++ b/lib/teleterm/gateway/gateway.go @@ -120,6 +120,10 @@ func (g *Gateway) Close() error { if g.forwardProxy != nil { errs = append(errs, g.forwardProxy.Close()) } + + for _, cleanup := range g.cleanupFuncs { + errs = append(errs, cleanup()) + } return trace.NewAggregate(errs...) } @@ -283,6 +287,8 @@ type Gateway struct { // onNewCert is a callback function that updates the local proxy when TLS // certificate is reissued. onNewCert func(tls.Certificate) error + // cleanupFuncs contains a list of extra cleanup functions called during Close. + cleanupFuncs []func() error // closeContext and closeCancel are used to signal to any waiting goroutines // that the local proxy is now closed and to release any resources. closeContext context.Context diff --git a/lib/teleterm/gateway/gateway_kube.go b/lib/teleterm/gateway/gateway_kube.go index 10301e27c1fd9..90d6b12899d8e 100644 --- a/lib/teleterm/gateway/gateway_kube.go +++ b/lib/teleterm/gateway/gateway_kube.go @@ -24,18 +24,25 @@ import ( "github.com/gravitational/trace" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "github.com/gravitational/teleport/api/utils/keypaths" "github.com/gravitational/teleport/api/utils/keys" "github.com/gravitational/teleport/lib/client" "github.com/gravitational/teleport/lib/kube/kubeconfig" "github.com/gravitational/teleport/lib/srv/alpnproxy" + "github.com/gravitational/teleport/lib/teleterm/api/uri" "github.com/gravitational/teleport/lib/utils" ) // KubeconfigPath returns the kubeconfig path that can be used for clients to // connect to the local proxy. func (g *Gateway) KubeconfigPath() string { - // Assumes CertPath is unique per kube cluster. - return g.cfg.CertPath + ".kubeconfig" + return keypaths.KubeConfigPath( + g.cfg.ConfigDir, + uri.New(g.cfg.TargetURI).GetProfileName(), + g.cfg.Username, + g.cfg.ClusterName, + g.cfg.TargetName, + ) } func (g *Gateway) makeLocalProxiesForKube(listener net.Listener) error { @@ -163,5 +170,12 @@ func (g *Gateway) writeKubeconfig(key *keys.PrivateKey, cas map[string]tls.Certi } config := kubeconfig.CreateLocalProxyConfig(clientcmdapi.NewConfig(), values) - return trace.Wrap(kubeconfig.Save(g.KubeconfigPath(), *config)) + if err := kubeconfig.Save(g.KubeconfigPath(), *config); err != nil { + return trace.Wrap(err) + } + + g.cleanupFuncs = append(g.cleanupFuncs, func() error { + return trace.Wrap(utils.RemoveFileIfExist(g.KubeconfigPath())) + }) + return nil } diff --git a/lib/teleterm/gateway/gateway_kube_test.go b/lib/teleterm/gateway/gateway_kube_test.go index 349938d44ffb9..d4f88c0d7199c 100644 --- a/lib/teleterm/gateway/gateway_kube_test.go +++ b/lib/teleterm/gateway/gateway_kube_test.go @@ -70,12 +70,13 @@ func TestKubeGateway(t *testing.T) { Clock: clock, TargetName: kubeClusterName, TargetURI: uri.NewClusterURI(teleportClusterName).AppendKube(kubeClusterName).String(), - TargetUser: identity.Username, CertPath: proxy.clientCertPath(), KeyPath: proxy.clientKeyPath(), WebProxyAddr: proxy.webProxyAddr, ClusterName: teleportClusterName, CLICommandProvider: mockCLICommandProvider{}, + Username: identity.Username, + ConfigDir: t.TempDir(), RootClusterCACertPoolFunc: func(_ context.Context) (*x509.CertPool, error) { return proxy.certPool(), nil }, diff --git a/lib/utils/fs.go b/lib/utils/fs.go index bd4a2a358be65..e5eb3bdddde09 100644 --- a/lib/utils/fs.go +++ b/lib/utils/fs.go @@ -238,11 +238,12 @@ func overwriteFile(filePath string) (err error) { } // RemoveFileIfExist removes file if exits. -func RemoveFileIfExist(filePath string) { +func RemoveFileIfExist(filePath string) error { if !FileExists(filePath) { - return + return nil } if err := os.Remove(filePath); err != nil { - log.WithError(err).Warnf("Failed to remove %v", filePath) + return trace.ConvertSystemError(err) } + return nil } diff --git a/tool/tsh/common/app.go b/tool/tsh/common/app.go index dcc32ed708bbf..539a31664d529 100644 --- a/tool/tsh/common/app.go +++ b/tool/tsh/common/app.go @@ -495,7 +495,10 @@ func pickActiveApp(cf *CLIConf) (*tlsca.RouteToApp, error) { // removeAppLocalFiles removes generated local files for the provided app. func removeAppLocalFiles(profile *client.ProfileStatus, appName string) { - utils.RemoveFileIfExist(profile.AppLocalCAPath(appName)) + err := utils.RemoveFileIfExist(profile.AppLocalCAPath(appName)) + if err != nil { + log.WithError(err).Warnf("Failed to remove %v", profile.AppLocalCAPath(appName)) + } } // loadAppSelfSignedCA loads self-signed CA for provided app, or tries to diff --git a/tool/tsh/common/kube_proxy.go b/tool/tsh/common/kube_proxy.go index 1a87ad6ab71d1..eab78a6924eeb 100644 --- a/tool/tsh/common/kube_proxy.go +++ b/tool/tsh/common/kube_proxy.go @@ -278,8 +278,11 @@ func (k *kubeLocalProxy) Start(ctx context.Context) error { // Close removes the temporary kubeconfig and closes the listeners. func (k *kubeLocalProxy) Close() error { - utils.RemoveFileIfExist(k.KubeConfigPath()) - return trace.NewAggregate(k.forwardProxy.Close(), k.localProxy.Close()) + return trace.NewAggregate( + k.forwardProxy.Close(), + k.localProxy.Close(), + utils.RemoveFileIfExist(k.KubeConfigPath()), + ) } // GetAddr returns the address of the forward proxy for client to connect. From 2af169983d006b24c090faa6b8f4011f1c270a72 Mon Sep 17 00:00:00 2001 From: STeve Huang Date: Thu, 6 Jul 2023 12:10:44 -0400 Subject: [PATCH 6/7] use ProfileDir instead of ConfigDir --- lib/teleterm/api/uri/uri.go | 2 +- lib/teleterm/gateway/config.go | 7 +++---- lib/teleterm/gateway/gateway.go | 21 +++++++++++++-------- lib/teleterm/gateway/gateway_db.go | 2 +- lib/teleterm/gateway/gateway_kube.go | 14 ++++++++++---- lib/teleterm/gateway/gateway_kube_test.go | 8 +++++++- 6 files changed, 35 insertions(+), 19 deletions(-) diff --git a/lib/teleterm/api/uri/uri.go b/lib/teleterm/api/uri/uri.go index cc872efbe81af..8a061593580ae 100644 --- a/lib/teleterm/api/uri/uri.go +++ b/lib/teleterm/api/uri/uri.go @@ -200,7 +200,7 @@ func (r ResourceURI) IsDB() bool { return r.GetDbName() != "" } -// IsDB returns true if URI is a kube resource. +// IsKube returns true if URI is a kube resource. func (r ResourceURI) IsKube() bool { return r.GetKubeName() != "" } diff --git a/lib/teleterm/gateway/config.go b/lib/teleterm/gateway/config.go index aa495d40e780a..31c0718a5c4c6 100644 --- a/lib/teleterm/gateway/config.go +++ b/lib/teleterm/gateway/config.go @@ -64,7 +64,7 @@ type Config struct { KeyPath string // Insecure Insecure bool - // ClusterName is the Teleport cluster name + // ClusterName is the Teleport cluster name. ClusterName string // Username is the username of the profile. Username string @@ -90,9 +90,8 @@ type Config struct { // RootClusterCACertPoolFunc is callback function to fetch Root cluster CAs // when ALPN connection upgrade is required. RootClusterCACertPoolFunc alpnproxy.GetClusterCACertPoolFunc - // ConfigDir specifies a dir used for saving various configuration used by - // the gateway. - ConfigDir string + // ProfileDir specifies the tsh home dir of the user profile. + ProfileDir string } // OnExpiredCertFunc is the type of a function that is called when a new downstream connection is diff --git a/lib/teleterm/gateway/gateway.go b/lib/teleterm/gateway/gateway.go index 9fc449b9bfeff..789cd9d019017 100644 --- a/lib/teleterm/gateway/gateway.go +++ b/lib/teleterm/gateway/gateway.go @@ -121,7 +121,7 @@ func (g *Gateway) Close() error { errs = append(errs, g.forwardProxy.Close()) } - for _, cleanup := range g.cleanupFuncs { + for _, cleanup := range g.onCloseFuncs { errs = append(errs, cleanup()) } return trace.NewAggregate(errs...) @@ -238,7 +238,7 @@ func (g *Gateway) CLICommand() (*api.GatewayCLICommand, error) { // In the future, we're probably going to make this method accept the cert as an arg rather than // reading from disk. func (g *Gateway) ReloadCert() error { - if g.onNewCert == nil { + if len(g.onNewCertFuncs) == 0 { return nil } g.cfg.Log.Debug("Reloading cert") @@ -248,7 +248,12 @@ func (g *Gateway) ReloadCert() error { return trace.Wrap(err) } - return trace.Wrap(g.onNewCert(tlsCert)) + var errs []error + for _, onNewCert := range g.onNewCertFuncs { + errs = append(errs, onNewCert(tlsCert)) + } + + return trace.NewAggregate(errs...) } func (g *Gateway) onExpiredCert(ctx context.Context) error { @@ -284,11 +289,11 @@ type Gateway struct { cfg *Config localProxy *alpn.LocalProxy forwardProxy *alpn.ForwardProxy - // onNewCert is a callback function that updates the local proxy when TLS - // certificate is reissued. - onNewCert func(tls.Certificate) error - // cleanupFuncs contains a list of extra cleanup functions called during Close. - cleanupFuncs []func() error + // onNewCertFuncs contains a list of callback functions that update the local + // proxy when TLS certificate is reissued. + onNewCertFuncs []func(tls.Certificate) error + // onCloseFuncs contains a list of extra cleanup functions called during Close. + onCloseFuncs []func() error // closeContext and closeCancel are used to signal to any waiting goroutines // that the local proxy is now closed and to release any resources. closeContext context.Context diff --git a/lib/teleterm/gateway/gateway_db.go b/lib/teleterm/gateway/gateway_db.go index a33c359f44ad9..586bd32eb8c14 100644 --- a/lib/teleterm/gateway/gateway_db.go +++ b/lib/teleterm/gateway/gateway_db.go @@ -73,7 +73,7 @@ func (g *Gateway) makeLocalProxyForDB(listener net.Listener) error { } g.localProxy = localProxy - g.onNewCert = g.setDBCert + g.onNewCertFuncs = append(g.onNewCertFuncs, g.setDBCert) return nil } diff --git a/lib/teleterm/gateway/gateway_kube.go b/lib/teleterm/gateway/gateway_kube.go index 90d6b12899d8e..a4e9d7f228433 100644 --- a/lib/teleterm/gateway/gateway_kube.go +++ b/lib/teleterm/gateway/gateway_kube.go @@ -37,7 +37,7 @@ import ( // connect to the local proxy. func (g *Gateway) KubeconfigPath() string { return keypaths.KubeConfigPath( - g.cfg.ConfigDir, + g.cfg.ProfileDir, uri.New(g.cfg.TargetURI).GetProfileName(), g.cfg.Username, g.cfg.ClusterName, @@ -69,6 +69,11 @@ func (g *Gateway) makeLocalProxiesForKube(listener net.Listener) error { if err := g.writeKubeconfig(key, cas); err != nil { return trace.NewAggregate(err, g.Close()) } + // make sure kubeconfig is written again on new cert as a relogin may + // cleanup profile dir. + g.onNewCertFuncs = append(g.onNewCertFuncs, func(_ tls.Certificate) error { + return trace.Wrap(g.writeKubeconfig(key, cas)) + }) return nil } @@ -111,7 +116,7 @@ func (g *Gateway) makeKubeMiddleware() (alpnproxy.LocalProxyHTTPMiddleware, erro } certReissuer := newKubeCertReissuer(cert, g.onExpiredCert) - g.onNewCert = certReissuer.updateCert + g.onNewCertFuncs = append(g.onNewCertFuncs, certReissuer.updateCert) certs := make(alpnproxy.KubeClientCerts) certs.Add(g.cfg.ClusterName, g.cfg.TargetName, cert) @@ -150,7 +155,8 @@ func (g *Gateway) writeKubeconfig(key *keys.PrivateKey, cas map[string]tls.Certi // // In this case here, since the kubeconfig for the local proxy is only // for a single kube cluster and it is not created from the default - // kubeconfig, specifying the kube cluster address is not necessary. + // kubeconfig, specifying the correct kube cluster address is not + // necessary. // // In most cases, tc.KubeClusterAddr() is the same as // g.cfg.WebProxyAddr anyway. @@ -174,7 +180,7 @@ func (g *Gateway) writeKubeconfig(key *keys.PrivateKey, cas map[string]tls.Certi return trace.Wrap(err) } - g.cleanupFuncs = append(g.cleanupFuncs, func() error { + g.onCloseFuncs = append(g.onCloseFuncs, func() error { return trace.Wrap(utils.RemoveFileIfExist(g.KubeconfigPath())) }) return nil diff --git a/lib/teleterm/gateway/gateway_kube_test.go b/lib/teleterm/gateway/gateway_kube_test.go index d4f88c0d7199c..f8444ef777d49 100644 --- a/lib/teleterm/gateway/gateway_kube_test.go +++ b/lib/teleterm/gateway/gateway_kube_test.go @@ -26,6 +26,7 @@ import ( "net" "net/http" "net/url" + "os" "path" "testing" "time" @@ -65,6 +66,7 @@ func TestKubeGateway(t *testing.T) { } clock := clockwork.NewFakeClock() proxy := mustStartMockProxyWithKubeAPI(t, identity) + profileDir := t.TempDir() gateway, err := New( Config{ Clock: clock, @@ -76,11 +78,13 @@ func TestKubeGateway(t *testing.T) { ClusterName: teleportClusterName, CLICommandProvider: mockCLICommandProvider{}, Username: identity.Username, - ConfigDir: t.TempDir(), + ProfileDir: profileDir, RootClusterCACertPoolFunc: func(_ context.Context) (*x509.CertPool, error) { return proxy.certPool(), nil }, OnExpiredCert: func(_ context.Context, gateway *Gateway) error { + // Remove the profile dir to see if kubeconfig gets rewritten. + os.RemoveAll(profileDir) return trace.Wrap(gateway.ReloadCert()) }, }, @@ -105,9 +109,11 @@ func TestKubeGateway(t *testing.T) { // kubeMiddleware -> kubeCertReissuer.reissueCert -> gateway.cfg.OnExpiredCert -> gateway.ReloadCert -> kubeCertReissuer.updateCert clock.Advance(time.Hour) sendRequestToKubeLocalProxyAndSucceed(t, kubeClient) + require.True(t, utils.FileExists(gateway.KubeconfigPath())) require.NoError(t, gateway.Close()) require.NoError(t, <-serveErr) + require.False(t, utils.FileExists(gateway.KubeconfigPath())) } func sendRequestToKubeLocalProxyAndSucceed(t *testing.T, client *kubernetes.Clientset) { From 6a92097b3b42a5a430b2ea21b23cbd1148559171 Mon Sep 17 00:00:00 2001 From: STeve Huang Date: Thu, 6 Jul 2023 12:58:28 -0400 Subject: [PATCH 7/7] remove NewKubeForwardProxyWithListener --- integration/proxy/proxy_helpers.go | 4 +- lib/srv/alpnproxy/kube.go | 56 +++++++++++++++++++--------- lib/teleterm/gateway/gateway_kube.go | 6 ++- tool/tsh/common/kube_proxy.go | 6 ++- 4 files changed, 52 insertions(+), 20 deletions(-) diff --git a/integration/proxy/proxy_helpers.go b/integration/proxy/proxy_helpers.go index ee42ca1bf82f3..c56a8cb165cda 100644 --- a/integration/proxy/proxy_helpers.go +++ b/integration/proxy/proxy_helpers.go @@ -563,7 +563,9 @@ func mustStartALPNLocalProxyWithConfig(t *testing.T, config alpnproxy.LocalProxy func mustStartKubeForwardProxy(t *testing.T, lpAddr string) *alpnproxy.ForwardProxy { t.Helper() - fp, err := alpnproxy.NewKubeForwardProxy(context.Background(), "", lpAddr) + fp, err := alpnproxy.NewKubeForwardProxy(alpnproxy.KubeForwardProxyConfig{ + ForwardAddr: lpAddr, + }) require.NoError(t, err) t.Cleanup(func() { fp.Close() diff --git a/lib/srv/alpnproxy/kube.go b/lib/srv/alpnproxy/kube.go index 170a506b26e9a..21a737b8b7814 100644 --- a/lib/srv/alpnproxy/kube.go +++ b/lib/srv/alpnproxy/kube.go @@ -273,36 +273,58 @@ func NewKubeListener(casByTeleportCluster map[string]tls.Certificate) (net.Liste return listener, trace.Wrap(err) } -// NewKubeForwardProxy creates a forward proxy for kube access. -func NewKubeForwardProxy(ctx context.Context, listenPort, forwardAddr string) (*ForwardProxy, error) { - listenAddr := "localhost:0" - if listenPort != "" { - listenAddr = "localhost:" + listenPort - } +// KubeForwardProxyConfig is the config for making kube forward proxy. +type KubeForwardProxyConfig struct { + // CloseContext is the close context. + CloseContext context.Context + // ListenPort is the localhost port to listen. + ListenPort string + // Listener is the listener for the forward proxy. A listener is created + // from ListenPort if Listener is not provided. + Listener net.Listener + // ForwardAddr is the target address the requests get forwarded to. + ForwardAddr string +} - listener, err := net.Listen("tcp", listenAddr) - if err != nil { - return nil, trace.Wrap(err) +// CheckAndSetDefaults checks and sets default config values. +func (c *KubeForwardProxyConfig) CheckAndSetDefaults() error { + if c.ForwardAddr == "" { + return trace.BadParameter("missing forward address") + } + if c.CloseContext == nil { + c.CloseContext = context.Background() } + if c.Listener == nil { + if c.ListenPort == "" { + c.ListenPort = "0" + } - fp, err := NewKubeForwardProxyWithListener(ctx, listener, forwardAddr) - return fp, trace.Wrap(err) + listener, err := net.Listen("tcp", "localhost:"+c.ListenPort) + if err != nil { + return trace.Wrap(err) + } + c.Listener = listener + } + return nil } -// NewKubeForwardProxyWithListener creates a forward proxy with provided listener. -func NewKubeForwardProxyWithListener(ctx context.Context, listener net.Listener, forwardAddr string) (*ForwardProxy, error) { +// NewKubeForwardProxy creates a forward proxy for kube access. +func NewKubeForwardProxy(config KubeForwardProxyConfig) (*ForwardProxy, error) { + if err := config.CheckAndSetDefaults(); err != nil { + return nil, trace.Wrap(err) + } fp, err := NewForwardProxy(ForwardProxyConfig{ - Listener: listener, - CloseContext: ctx, + Listener: config.Listener, + CloseContext: config.CloseContext, Handlers: []ConnectRequestHandler{ NewForwardToHostHandler(ForwardToHostHandlerConfig{ MatchFunc: MatchAllRequests, - Host: forwardAddr, + Host: config.ForwardAddr, }), }, }) if err != nil { - return nil, trace.NewAggregate(listener.Close(), err) + return nil, trace.NewAggregate(config.Listener.Close(), err) } return fp, nil } diff --git a/lib/teleterm/gateway/gateway_kube.go b/lib/teleterm/gateway/gateway_kube.go index a4e9d7f228433..16cde17f98637 100644 --- a/lib/teleterm/gateway/gateway_kube.go +++ b/lib/teleterm/gateway/gateway_kube.go @@ -125,7 +125,11 @@ func (g *Gateway) makeKubeMiddleware() (alpnproxy.LocalProxyHTTPMiddleware, erro func (g *Gateway) makeForwardProxyForKube(listener net.Listener) (err error) { // Use provided listener with user configured port for the forward proxy. - g.forwardProxy, err = alpnproxy.NewKubeForwardProxyWithListener(g.closeContext, listener, g.localProxy.GetAddr()) + g.forwardProxy, err = alpnproxy.NewKubeForwardProxy(alpnproxy.KubeForwardProxyConfig{ + CloseContext: g.closeContext, + Listener: listener, + ForwardAddr: g.localProxy.GetAddr(), + }) return trace.Wrap(err) } diff --git a/tool/tsh/common/kube_proxy.go b/tool/tsh/common/kube_proxy.go index eab78a6924eeb..291f9bcf6218c 100644 --- a/tool/tsh/common/kube_proxy.go +++ b/tool/tsh/common/kube_proxy.go @@ -231,7 +231,11 @@ func makeKubeLocalProxy(cf *CLIConf, tc *client.TeleportClient, clusters kubecon } kubeProxy.localProxy = localProxy - kubeProxy.forwardProxy, err = alpnproxy.NewKubeForwardProxy(cf.Context, port, localProxy.GetAddr()) + kubeProxy.forwardProxy, err = alpnproxy.NewKubeForwardProxy(alpnproxy.KubeForwardProxyConfig{ + CloseContext: cf.Context, + ListenPort: port, + ForwardAddr: localProxy.GetAddr(), + }) if err != nil { return nil, trace.Wrap(err) }