From 6a65a5aec22488877176f8e8ffacf28e8d466f6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Cie=C5=9Blak?= Date: Thu, 10 Nov 2022 13:19:54 +0100 Subject: [PATCH 1/4] Rename StorageByResourceURI to ClusterGetter That was a weird name. I think it was originally meant to be something else, then I did search and replace in this file and it change it to that name. --- lib/teleterm/clusters/dbcmd_cli_command_provider.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/teleterm/clusters/dbcmd_cli_command_provider.go b/lib/teleterm/clusters/dbcmd_cli_command_provider.go index 9934766904ef1..592d2353dfcb1 100644 --- a/lib/teleterm/clusters/dbcmd_cli_command_provider.go +++ b/lib/teleterm/clusters/dbcmd_cli_command_provider.go @@ -28,15 +28,15 @@ import ( // DbcmdCLICommandProvider provides CLI commands for database gateways. It needs Storage to read // fresh profile state from the disk. type DbcmdCLICommandProvider struct { - storage StorageByResourceURI + storage ClusterGetter execer dbcmd.Execer } -type StorageByResourceURI interface { +type ClusterGetter interface { GetByResourceURI(string) (*Cluster, error) } -func NewDbcmdCLICommandProvider(storage StorageByResourceURI, execer dbcmd.Execer) DbcmdCLICommandProvider { +func NewDbcmdCLICommandProvider(storage ClusterGetter, execer dbcmd.Execer) DbcmdCLICommandProvider { return DbcmdCLICommandProvider{ storage: storage, execer: execer, From 46a9c8c8227dbad3a84ffa37137e7531e1ec6a9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Cie=C5=9Blak?= Date: Thu, 10 Nov 2022 13:21:04 +0100 Subject: [PATCH 2/4] Allow passing CLICommandProvider to daemon.Service This will help us in tests in the upcoming commits. --- lib/teleterm/daemon/config.go | 12 +++++++++--- lib/teleterm/daemon/daemon.go | 4 +--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/teleterm/daemon/config.go b/lib/teleterm/daemon/config.go index d1a764f6cccd1..ba977f6d2b6fa 100644 --- a/lib/teleterm/daemon/config.go +++ b/lib/teleterm/daemon/config.go @@ -21,6 +21,7 @@ import ( "github.com/sirupsen/logrus" "google.golang.org/grpc" + "github.com/gravitational/teleport/lib/client/db/dbcmd" "github.com/gravitational/teleport/lib/teleterm/clusters" "github.com/gravitational/teleport/lib/teleterm/gateway" ) @@ -30,9 +31,10 @@ type Config struct { // Storage is a storage service that reads/writes to tsh profiles Storage *clusters.Storage // Log is a component logger - Log *logrus.Entry - GatewayCreator GatewayCreator - TCPPortAllocator gateway.TCPPortAllocator + Log *logrus.Entry + GatewayCreator GatewayCreator + TCPPortAllocator gateway.TCPPortAllocator + CLICommandProvider gateway.CLICommandProvider // CreateTshdEventsClientCredsFunc lazily creates creds for the tshd events server ran by the // Electron app. This is to ensure that the server public key is written to the disk under the // expected location by the time we get around to creating the client. @@ -55,6 +57,10 @@ func (c *Config) CheckAndSetDefaults() error { c.TCPPortAllocator = gateway.NetTCPPortAllocator{} } + if c.CLICommandProvider == nil { + c.CLICommandProvider = clusters.NewDbcmdCLICommandProvider(c.Storage, dbcmd.SystemExecer{}) + } + if c.Log == nil { c.Log = logrus.NewEntry(logrus.StandardLogger()).WithField(trace.Component, "daemon") } diff --git a/lib/teleterm/daemon/daemon.go b/lib/teleterm/daemon/daemon.go index 909dce5d41565..de34d1fa80252 100644 --- a/lib/teleterm/daemon/daemon.go +++ b/lib/teleterm/daemon/daemon.go @@ -22,7 +22,6 @@ import ( "google.golang.org/grpc" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/lib/client/db/dbcmd" api "github.com/gravitational/teleport/lib/teleterm/api/protogen/golang/v1" "github.com/gravitational/teleport/lib/teleterm/clusters" "github.com/gravitational/teleport/lib/teleterm/gateway" @@ -166,13 +165,12 @@ type GatewayCreator interface { // createGateway assumes that mu is already held by a public method. func (s *Service) createGateway(ctx context.Context, params CreateGatewayParams) (*gateway.Gateway, error) { - cliCommandProvider := clusters.NewDbcmdCLICommandProvider(s.cfg.Storage, dbcmd.SystemExecer{}) clusterCreateGatewayParams := clusters.CreateGatewayParams{ TargetURI: params.TargetURI, TargetUser: params.TargetUser, TargetSubresourceName: params.TargetSubresourceName, LocalPort: params.LocalPort, - CLICommandProvider: cliCommandProvider, + CLICommandProvider: s.cfg.CLICommandProvider, TCPPortAllocator: s.cfg.TCPPortAllocator, } From f2a4fbe7933598d63a243a03fc3aad19e86a1f7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Cie=C5=9Blak?= Date: Thu, 10 Nov 2022 12:57:12 +0100 Subject: [PATCH 3/4] Add mutex to gateway.Gateway --- lib/teleterm/gateway/gateway.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/lib/teleterm/gateway/gateway.go b/lib/teleterm/gateway/gateway.go index 3ebe263ccefa0..4582d47d69924 100644 --- a/lib/teleterm/gateway/gateway.go +++ b/lib/teleterm/gateway/gateway.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "strconv" + "sync" "github.com/gravitational/trace" "github.com/sirupsen/logrus" @@ -170,10 +171,16 @@ func (g *Gateway) Serve() error { } func (g *Gateway) URI() uri.ResourceURI { + g.mu.RLock() + defer g.mu.RUnlock() + return g.cfg.URI } func (g *Gateway) SetURI(newURI uri.ResourceURI) { + g.mu.Lock() + defer g.mu.Unlock() + g.cfg.URI = newURI } @@ -194,10 +201,16 @@ func (g *Gateway) TargetUser() string { } func (g *Gateway) TargetSubresourceName() string { + g.mu.RLock() + defer g.mu.RUnlock() + return g.cfg.TargetSubresourceName } func (g *Gateway) SetTargetSubresourceName(value string) { + g.mu.Lock() + defer g.mu.Unlock() + g.cfg.TargetSubresourceName = value } @@ -280,13 +293,10 @@ func checkCertSubject(tlsCert tls.Certificate, dbRoute tlsca.RouteToDatabase) er } // Gateway describes local proxy that creates a gateway to the remote Teleport resource. -// -// Gateway is not safe for concurrent use in itself. However, all access to gateways is gated by -// daemon.Service which obtains a lock for any operation pertaining to gateways. -// -// In the future if Gateway becomes more complex it might be worthwhile to add an RWMutex to it. type Gateway struct { - cfg *Config + cfg *Config + // mu guards cfg fields mutated by Gateway setters. + mu sync.RWMutex localProxy *alpn.LocalProxy // closeContext and closeCancel are used to signal to any waiting goroutines // that the local proxy is now closed and to release any resources. From 479d77230cd0e01866723290d3c7fb46d56b06da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Cie=C5=9Blak?= Date: Thu, 10 Nov 2022 13:32:07 +0100 Subject: [PATCH 4/4] daemon.Service: Make gateway-related methods return protos daemon.Service.ListGateways used to copy each gateway by dereferencing a pointer and then returning a new slice with all gateways. This was done to protect from callsites mutating the original slice with gateways without obtaining a lock on daemon.Service.mu. However, methods such as `daemon.Service.CreateGateway` still returned a pointer to the gateway, technically allowing other callsites to modify the gateway without going through daemon.Service methods. Since in the previous commit we added a mutex to gateway.Gateway, we can no longer copy the struct by value. To work around this problem, we're going to return protos from daemon.Service methods which previously returned gateway pointers. Callsites that use those methods don't need real gateways anyway, they just need to read a couple of fields from them to return a response from the API endpoint. --- .../apiserver/handler/handler_gateways.go | 56 +++---------------- lib/teleterm/daemon/daemon.go | 33 +++++++---- lib/teleterm/daemon/daemon_test.go | 38 ++++++++----- lib/teleterm/gateway/gateway.go | 21 +++++++ 4 files changed, 74 insertions(+), 74 deletions(-) diff --git a/lib/teleterm/apiserver/handler/handler_gateways.go b/lib/teleterm/apiserver/handler/handler_gateways.go index 22bc8a30dc4eb..3c87f37c79d89 100644 --- a/lib/teleterm/apiserver/handler/handler_gateways.go +++ b/lib/teleterm/apiserver/handler/handler_gateways.go @@ -21,7 +21,6 @@ import ( api "github.com/gravitational/teleport/lib/teleterm/api/protogen/golang/v1" "github.com/gravitational/teleport/lib/teleterm/daemon" - "github.com/gravitational/teleport/lib/teleterm/gateway" ) // CreateGateway creates a gateway @@ -38,30 +37,18 @@ func (s *Handler) CreateGateway(ctx context.Context, req *api.CreateGatewayReque return nil, trace.Wrap(err) } - apiGateway, err := newAPIGateway(*gateway) - if err != nil { - return nil, trace.Wrap(err) - } - - return apiGateway, nil + return gateway, nil } // ListGateways lists all gateways func (s *Handler) ListGateways(ctx context.Context, req *api.ListGatewaysRequest) (*api.ListGatewaysResponse, error) { - gws := s.DaemonService.ListGateways() - - apiGws := make([]*api.Gateway, 0, len(gws)) - for _, gw := range gws { - apiGateway, err := newAPIGateway(gw) - if err != nil { - return nil, trace.Wrap(err) - } - - apiGws = append(apiGws, apiGateway) + gateways, err := s.DaemonService.ListGateways() + if err != nil { + return nil, trace.Wrap(err) } return &api.ListGatewaysResponse{ - Gateways: apiGws, + Gateways: gateways, }, nil } @@ -74,25 +61,6 @@ func (s *Handler) RemoveGateway(ctx context.Context, req *api.RemoveGatewayReque return &api.EmptyResponse{}, nil } -func newAPIGateway(gateway gateway.Gateway) (*api.Gateway, error) { - command, err := gateway.CLICommand() - if err != nil { - return nil, trace.Wrap(err) - } - - return &api.Gateway{ - Uri: gateway.URI().String(), - TargetUri: gateway.TargetURI(), - TargetName: gateway.TargetName(), - TargetUser: gateway.TargetUser(), - TargetSubresourceName: gateway.TargetSubresourceName(), - Protocol: gateway.Protocol(), - LocalAddress: gateway.LocalAddress(), - LocalPort: gateway.LocalPort(), - CliCommand: command, - }, nil -} - // RestartGateway stops a gateway and starts a new with identical parameters but fresh certs, // keeping the original URI. func (s *Handler) RestartGateway(ctx context.Context, req *api.RestartGatewayRequest) (*api.EmptyResponse, error) { @@ -113,12 +81,7 @@ func (s *Handler) SetGatewayTargetSubresourceName(ctx context.Context, req *api. return nil, trace.Wrap(err) } - apiGateway, err := newAPIGateway(*gateway) - if err != nil { - return nil, trace.Wrap(err) - } - - return apiGateway, nil + return gateway, nil } // SetGatewayLocalPort restarts the gateway under the new port without fetching new certs. @@ -128,10 +91,5 @@ func (s *Handler) SetGatewayLocalPort(ctx context.Context, req *api.SetGatewayLo return nil, trace.Wrap(err) } - apiGateway, err := newAPIGateway(*gateway) - if err != nil { - return nil, trace.Wrap(err) - } - - return apiGateway, nil + return gateway, nil } diff --git a/lib/teleterm/daemon/daemon.go b/lib/teleterm/daemon/daemon.go index de34d1fa80252..cbf592dc22d19 100644 --- a/lib/teleterm/daemon/daemon.go +++ b/lib/teleterm/daemon/daemon.go @@ -147,7 +147,7 @@ func (s *Service) ClusterLogout(ctx context.Context, uri string) error { } // CreateGateway creates a gateway to given targetURI -func (s *Service) CreateGateway(ctx context.Context, params CreateGatewayParams) (*gateway.Gateway, error) { +func (s *Service) CreateGateway(ctx context.Context, params CreateGatewayParams) (*api.Gateway, error) { s.mu.Lock() defer s.mu.Unlock() @@ -156,7 +156,8 @@ func (s *Service) CreateGateway(ctx context.Context, params CreateGatewayParams) return nil, trace.Wrap(err) } - return gateway, nil + protoGateway, err := gateway.ToProto() + return protoGateway, trace.Wrap(err) } type GatewayCreator interface { @@ -265,21 +266,25 @@ func (s *Service) findGateway(gatewayURI string) (*gateway.Gateway, error) { } // ListGateways lists gateways -func (s *Service) ListGateways() []gateway.Gateway { +func (s *Service) ListGateways() ([]*api.Gateway, error) { s.mu.RLock() defer s.mu.RUnlock() - gws := make([]gateway.Gateway, 0, len(s.gateways)) + gws := make([]*api.Gateway, 0, len(s.gateways)) for _, gateway := range s.gateways { - gws = append(gws, *gateway) + protoGateway, err := gateway.ToProto() + if err != nil { + return nil, trace.Wrap(err) + } + gws = append(gws, protoGateway) } - return gws + return gws, nil } // SetGatewayTargetSubresourceName updates the TargetSubresourceName field of a gateway stored in // s.gateways. -func (s *Service) SetGatewayTargetSubresourceName(gatewayURI, targetSubresourceName string) (*gateway.Gateway, error) { +func (s *Service) SetGatewayTargetSubresourceName(gatewayURI, targetSubresourceName string) (*api.Gateway, error) { s.mu.Lock() defer s.mu.Unlock() @@ -290,7 +295,8 @@ func (s *Service) SetGatewayTargetSubresourceName(gatewayURI, targetSubresourceN gateway.SetTargetSubresourceName(targetSubresourceName) - return gateway, nil + protoGateway, err := gateway.ToProto() + return protoGateway, trace.Wrap(err) } // SetGatewayLocalPort creates a new gateway with the given port, swaps it with the old gateway @@ -302,7 +308,7 @@ func (s *Service) SetGatewayTargetSubresourceName(gatewayURI, targetSubresourceN // correct that mistake and choose a different port. // // SetGatewayLocalPort is a noop if port is equal to the existing port. -func (s *Service) SetGatewayLocalPort(gatewayURI, localPort string) (*gateway.Gateway, error) { +func (s *Service) SetGatewayLocalPort(gatewayURI, localPort string) (*api.Gateway, error) { s.mu.Lock() defer s.mu.Unlock() @@ -312,7 +318,8 @@ func (s *Service) SetGatewayLocalPort(gatewayURI, localPort string) (*gateway.Ga } if localPort == oldGateway.LocalPort() { - return oldGateway, nil + protoOldGateway, err := oldGateway.ToProto() + return protoOldGateway, trace.Wrap(err) } newGateway, err := gateway.NewWithLocalPort(oldGateway, localPort) @@ -343,7 +350,8 @@ func (s *Service) SetGatewayLocalPort(gatewayURI, localPort string) (*gateway.Ga } }() - return newGateway, nil + protoNewGateway, err := newGateway.ToProto() + return protoNewGateway, trace.Wrap(err) } // GetAllServers returns a full list of nodes without pagination or sorting. @@ -586,7 +594,8 @@ func (s *Service) TransferFile(ctx context.Context, request *api.FileTransferReq // Service is the daemon service type Service struct { cfg *Config - mu sync.RWMutex + // mu guards gateways. + mu sync.RWMutex // closeContext is canceled when Service is getting stopped. It is used as a context for the calls // to the tshd events gRPC client. closeContext context.Context diff --git a/lib/teleterm/daemon/daemon_test.go b/lib/teleterm/daemon/daemon_test.go index 111c21d9a58ab..18da318649e3b 100644 --- a/lib/teleterm/daemon/daemon_test.go +++ b/lib/teleterm/daemon/daemon_test.go @@ -16,6 +16,7 @@ package daemon import ( "context" + "fmt" "net" "net/http" "net/http/httptest" @@ -85,6 +86,13 @@ func (m *mockGatewayCreator) CreateGateway(ctx context.Context, params clusters. return gateway, nil } +type mockCLICommandProvider struct{} + +func (m mockCLICommandProvider) GetCommand(gateway *gateway.Gateway) (string, error) { + command := fmt.Sprintf("%s/%s", gateway.TargetName(), gateway.TargetSubresourceName()) + return command, nil +} + type gatewayCRUDTestContext struct { nameToGateway map[string]*gateway.Gateway mockGatewayCreator *mockGatewayCreator @@ -115,16 +123,17 @@ func TestGatewayCRUD(t *testing.T) { name: "ListGateways", gatewayNamesToCreate: []string{"gateway1", "gateway2"}, testFunc: func(t *testing.T, c *gatewayCRUDTestContext, daemon *Service) { - gateways := daemon.ListGateways() - gatewayURIs := map[uri.ResourceURI]struct{}{} + protoGateways, err := daemon.ListGateways() + require.NoError(t, err) + gatewayURIs := map[string]struct{}{} - for _, gateway := range gateways { - gatewayURIs[gateway.URI()] = struct{}{} + for _, protoGateway := range protoGateways { + gatewayURIs[protoGateway.Uri] = struct{}{} } - require.Equal(t, 2, len(gateways)) - require.Contains(t, gatewayURIs, c.nameToGateway["gateway1"].URI()) - require.Contains(t, gatewayURIs, c.nameToGateway["gateway2"].URI()) + require.Equal(t, 2, len(protoGateways)) + require.Contains(t, gatewayURIs, c.nameToGateway["gateway1"].URI().String()) + require.Contains(t, gatewayURIs, c.nameToGateway["gateway2"].URI().String()) }, }, { @@ -170,9 +179,9 @@ func TestGatewayCRUD(t *testing.T) { require.Equal(t, 0, oldListener.CloseCallCount) - updatedGateway, err := daemon.SetGatewayLocalPort(oldGateway.URI().String(), "12345") + updatedProtoGateway, err := daemon.SetGatewayLocalPort(oldGateway.URI().String(), "12345") require.NoError(t, err) - require.Equal(t, "12345", updatedGateway.LocalPort()) + require.Equal(t, "12345", updatedProtoGateway.LocalPort) updatedGatewayAddress := c.mockTCPPortAllocator.RecentListener().RealAddr().String() // Check if the restarted gateway is still available under the same URI. @@ -241,9 +250,10 @@ func TestGatewayCRUD(t *testing.T) { require.NoError(t, err) daemon, err := New(Config{ - Storage: storage, - GatewayCreator: mockGatewayCreator, - TCPPortAllocator: tt.tcpPortAllocator, + Storage: storage, + GatewayCreator: mockGatewayCreator, + TCPPortAllocator: tt.tcpPortAllocator, + CLICommandProvider: &mockCLICommandProvider{}, }) require.NoError(t, err) @@ -251,13 +261,15 @@ func TestGatewayCRUD(t *testing.T) { for _, gatewayName := range tt.gatewayNamesToCreate { gatewayName := gatewayName - gateway, err := daemon.CreateGateway(context.Background(), CreateGatewayParams{ + protoGateway, err := daemon.CreateGateway(context.Background(), CreateGatewayParams{ TargetURI: uri.NewClusterURI("foo").AppendDB(gatewayName).String(), TargetUser: "alice", TargetSubresourceName: "", LocalPort: "", }) require.NoError(t, err) + gateway, err := daemon.findGateway(protoGateway.Uri) + require.NoError(t, err) nameToGateway[gatewayName] = gateway } diff --git a/lib/teleterm/gateway/gateway.go b/lib/teleterm/gateway/gateway.go index 4582d47d69924..d93570c7ef718 100644 --- a/lib/teleterm/gateway/gateway.go +++ b/lib/teleterm/gateway/gateway.go @@ -30,6 +30,7 @@ import ( "github.com/gravitational/teleport/api/utils/keys" alpn "github.com/gravitational/teleport/lib/srv/alpnproxy" alpncommon "github.com/gravitational/teleport/lib/srv/alpnproxy/common" + api "github.com/gravitational/teleport/lib/teleterm/api/protogen/golang/v1" "github.com/gravitational/teleport/lib/teleterm/api/uri" "github.com/gravitational/teleport/lib/tlsca" "github.com/gravitational/teleport/lib/utils" @@ -146,6 +147,26 @@ func NewWithLocalPort(gateway *Gateway, port string) (*Gateway, error) { return newGateway, trace.Wrap(err) } +// ToProto converts the gateway to its protobuf form. +func (g *Gateway) ToProto() (*api.Gateway, error) { + command, err := g.CLICommand() + if err != nil { + return nil, trace.Wrap(err) + } + + return &api.Gateway{ + Uri: g.URI().String(), + TargetUri: g.TargetURI(), + TargetName: g.TargetName(), + TargetUser: g.TargetUser(), + TargetSubresourceName: g.TargetSubresourceName(), + Protocol: g.Protocol(), + LocalAddress: g.LocalAddress(), + LocalPort: g.LocalPort(), + CliCommand: command, + }, nil +} + // Close terminates gateway connection. Fails if called on an already closed gateway. func (g *Gateway) Close() error { g.closeCancel()