Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e6a7324
Add remote client cache
gzdunek Feb 14, 2024
9cd3883
Add an integration test
gzdunek Feb 14, 2024
928bae2
Close all clients when stopping the service
gzdunek Feb 14, 2024
dee8134
Move RemoteClientCache to the place where it is used
gzdunek Feb 15, 2024
056dfc7
Do not check client cert in `Get`
gzdunek Feb 15, 2024
6f8afb3
Fix code style issues
gzdunek Feb 15, 2024
b80514a
Prevent potential race condition when removing a cached client
gzdunek Feb 15, 2024
c991f18
Test concurrent calls to `Get`
gzdunek Feb 15, 2024
78f9738
Add TODO
gzdunek Feb 15, 2024
25ba4b1
`remoteclientcache` -> `clientcache`
gzdunek Feb 19, 2024
5ce7e0b
Reduce the `err` scope
gzdunek Feb 19, 2024
1729850
Move `Config` closer to `New` and docs
gzdunek Feb 19, 2024
cec6c4f
Merge branch 'master' into gzdunek/cache-remote-client
gzdunek Feb 19, 2024
ed9330f
Fix lint
gzdunek Feb 19, 2024
0d06504
Improve logging and error handling
gzdunek Feb 20, 2024
0628885
Add missing comments
gzdunek Feb 20, 2024
b199000
`Close` -> `Clear`
gzdunek Feb 20, 2024
0d99f36
Improve the test
gzdunek Feb 20, 2024
fd51352
Remove mentions about "remote" client
gzdunek Feb 20, 2024
ad44529
Pass `cfg` directly to `Cache`
gzdunek Feb 20, 2024
f875899
`InvalidateForRootCluster` -> `ClearForRootCluster`
gzdunek Feb 20, 2024
6160f51
Add docs for the interface
gzdunek Feb 21, 2024
f92512d
`ClearForRootCluster` -> `ClearForRoot`
gzdunek Feb 21, 2024
6ae3ad7
Add config validation
gzdunek Feb 21, 2024
43d522c
Log multiple fields at once
gzdunek Feb 21, 2024
ac22cbc
Improve setting logger
gzdunek Feb 23, 2024
97c48cb
Use cached remote clients in Connect (#38202)
gzdunek Feb 28, 2024
742d3b2
Merge branch 'master' into gzdunek/cache-remote-client
gzdunek Feb 28, 2024
182a9c6
Replace checking error string with `client.ErrClientCredentialsHaveEx…
gzdunek Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions integration/teleterm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand Down Expand Up @@ -116,6 +117,12 @@ func TestTeleterm(t *testing.T) {
t.Parallel()
testDeleteConnectMyComputerNode(t, pack)
})

t.Run("TestClientCache", func(t *testing.T) {
t.Parallel()

testClientCache(t, pack, creds)
})
}

func testAddingRootCluster(t *testing.T, pack *dbhelpers.DatabasePack, creds *helpers.UserCreds) {
Expand Down Expand Up @@ -335,6 +342,79 @@ func testHeadlessWatcher(t *testing.T, pack *dbhelpers.DatabasePack, creds *help
)
}

func testClientCache(t *testing.T, pack *dbhelpers.DatabasePack, creds *helpers.UserCreds) {
ctx := context.Background()

tc := mustLogin(t, pack.Root.User.GetName(), pack, creds)

storageFakeClock := clockwork.NewFakeClockAt(time.Now())

storage, err := clusters.NewStorage(clusters.Config{
Dir: tc.KeysDir,
Clock: storageFakeClock,
InsecureSkipVerify: tc.InsecureSkipVerify,
})
require.NoError(t, err)

cluster, _, err := storage.Add(ctx, tc.WebProxyAddr)
require.NoError(t, err)

daemonService, err := daemon.New(daemon.Config{
Storage: storage,
CreateTshdEventsClientCredsFunc: func() (grpc.DialOption, error) {
return grpc.WithTransportCredentials(insecure.NewCredentials()), nil
},
KubeconfigsDir: t.TempDir(),
AgentsDir: t.TempDir(),
})
require.NoError(t, err)
t.Cleanup(func() {
daemonService.Stop()
})

// Check if parallel calls trying to get a client will return the same one.
eg, egCtx := errgroup.WithContext(ctx)
blocker := make(chan struct{})
const concurrentCalls = 5
concurrentCallsForClient := make([]*client.ProxyClient, concurrentCalls)
for i := range concurrentCallsForClient {
client := &concurrentCallsForClient[i]
eg.Go(func() error {
<-blocker
c, err := daemonService.GetCachedClient(egCtx, cluster.URI)
*client = c
return err
})
}
// unblock the operation which is still in progress
close(blocker)
require.NoError(t, eg.Wait())
require.Subset(t, concurrentCallsForClient[:1], concurrentCallsForClient[1:])

// Since we have a client in the cache, it should be returned.
secondCallForClient, err := daemonService.GetCachedClient(ctx, cluster.URI)
require.NoError(t, err)
require.Equal(t, concurrentCallsForClient[0], secondCallForClient)

// Let's remove the client from the cache.
// The call to GetCachedClient will
// connect to proxy and return a new client.
err = daemonService.ClearCachedClientsForRoot(cluster.URI)
require.NoError(t, err)
thirdCallForClient, err := daemonService.GetCachedClient(ctx, cluster.URI)
require.NoError(t, err)
require.NotEqual(t, secondCallForClient, thirdCallForClient)

// After closing the client (from our or a remote side)
// it will be removed from the cache.
// The call to GetCachedClient will connect to proxy and return a new client.
err = thirdCallForClient.Close()
require.NoError(t, err)
fourthCallForClient, err := daemonService.GetCachedClient(ctx, cluster.URI)
require.NoError(t, err)
require.NotEqual(t, thirdCallForClient, fourthCallForClient)
}

func testCreateConnectMyComputerRole(t *testing.T, pack *dbhelpers.DatabasePack) {
systemUser, err := user.Current()
require.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion lib/teleterm/apiserver/handler/handler_apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ func (s *Handler) GetApps(ctx context.Context, req *api.GetAppsRequest) (*api.Ge
return nil, trace.Wrap(err)
}

resp, err := cluster.GetApps(ctx, req)
proxyClient, err := s.DaemonService.GetCachedClient(ctx, cluster.URI)
if err != nil {
return nil, trace.Wrap(err)
}

resp, err := cluster.GetApps(ctx, proxyClient.CurrentCluster(), req)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
8 changes: 8 additions & 0 deletions lib/teleterm/apiserver/handler/handler_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (s *Handler) Login(ctx context.Context, req *api.LoginRequest) (*api.EmptyR
// added by daemon.Service.ResolveClusterURI.
clusterClient.MFAPromptConstructor = nil

if err = s.DaemonService.ClearCachedClientsForRoot(cluster.URI); err != nil {
return nil, trace.Wrap(err)
}

if req.Params == nil {
return nil, trace.BadParameter("missing login parameters")
}
Expand Down Expand Up @@ -84,6 +88,10 @@ func (s *Handler) LoginPasswordless(stream api.TerminalService_LoginPasswordless
// daemon.Service.ResolveClusterURI.
clusterClient.MFAPromptConstructor = nil

if err := s.DaemonService.ClearCachedClientsForRoot(cluster.URI); err != nil {
return trace.Wrap(err)
}

// Start the prompt flow.
if err := cluster.PasswordlessLogin(stream.Context(), stream); err != nil {
return trace.Wrap(err)
Expand Down
16 changes: 13 additions & 3 deletions lib/teleterm/apiserver/handler/handler_databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@ import (
"github.com/gravitational/teleport/lib/teleterm/clusters"
)

// GetDatabases gets databses with filters and returns paginated results
// GetDatabases gets databases with filters and returns paginated results
func (s *Handler) GetDatabases(ctx context.Context, req *api.GetDatabasesRequest) (*api.GetDatabasesResponse, error) {
cluster, _, err := s.DaemonService.ResolveCluster(req.ClusterUri)
if err != nil {
return nil, trace.Wrap(err)
}

resp, err := cluster.GetDatabases(ctx, req)
proxyClient, err := s.DaemonService.GetCachedClient(ctx, cluster.URI)
if err != nil {
return nil, trace.Wrap(err)
}

resp, err := cluster.GetDatabases(ctx, proxyClient.CurrentCluster(), req)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -63,7 +68,12 @@ func (s *Handler) ListDatabaseUsers(ctx context.Context, req *api.ListDatabaseUs
return nil, trace.Wrap(err)
}

dbUsers, err := cluster.GetAllowedDatabaseUsers(ctx, req.DbUri)
proxyClient, err := s.DaemonService.GetCachedClient(ctx, cluster.URI)
if err != nil {
return nil, trace.Wrap(err)
}

dbUsers, err := cluster.GetAllowedDatabaseUsers(ctx, proxyClient.CurrentCluster(), req.DbUri)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
11 changes: 11 additions & 0 deletions lib/teleterm/apiserver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ package apiserver

import (
"context"
"errors"

"github.com/gravitational/trace"
"github.com/gravitational/trace/trail"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"

"github.com/gravitational/teleport/api/client"
)

// withErrorHandling is gRPC middleware that maps internal errors to proper gRPC error codes
Expand All @@ -38,6 +41,14 @@ func withErrorHandling(log logrus.FieldLogger) grpc.UnaryServerInterceptor {
resp, err := handler(ctx, req)
if err != nil {
log.WithError(err).Error("Request failed.")
// A stop gap solution that allows us to show a relogin modal when we
// receive an error from the server saying that the cert is expired.
// Read more: https://github.com/gravitational/teleport/pull/38202#discussion_r1497181659
// TODO(gzdunek): fix when addressing https://github.com/gravitational/teleport/issues/32550
if errors.Is(err, client.ErrClientCredentialsHaveExpired) {
return resp, trail.ToGRPC(err)
}

// do not return a full error stack on access denied errors
if trace.IsAccessDenied(err) {
return resp, trail.ToGRPC(trace.AccessDenied("access denied"))
Expand Down
37 changes: 5 additions & 32 deletions lib/teleterm/clusters/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *Cluster) Connected() bool {
// GetWithDetails makes requests to the auth server to return details of the current
// Cluster that cannot be found on the disk only, including details about the user
// and enabled enterprise features. This method requires a valid cert.
func (c *Cluster) GetWithDetails(ctx context.Context) (*ClusterWithDetails, error) {
func (c *Cluster) GetWithDetails(ctx context.Context, authClient auth.ClientI) (*ClusterWithDetails, error) {
var (
authPingResponse proto.PingResponse
caps *types.AccessCapabilities
Expand All @@ -97,20 +97,8 @@ func (c *Cluster) GetWithDetails(ctx context.Context) (*ClusterWithDetails, erro
return nil, trace.Wrap(err)
}

//TODO(gzdunek): These calls should be done in parallel.
err = AddMetadataToRetryableError(ctx, func() error {
//nolint:staticcheck // SA1019. TODO(tross) update to use ClusterClient
proxyClient, err := c.clusterClient.ConnectToProxy(ctx)
if err != nil {
return trace.Wrap(err)
}
defer proxyClient.Close()

authClient, err := proxyClient.ConnectToCluster(ctx, c.clusterClient.SiteName)
if err != nil {
return trace.Wrap(err)
}
defer authClient.Close()

authPingResponse, err = authClient.Ping(ctx)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -218,12 +206,10 @@ func (c *Cluster) GetRoles(ctx context.Context) ([]*types.Role, error) {
}

// GetRequestableRoles returns the requestable roles for the currently logged-in user
func (c *Cluster) GetRequestableRoles(ctx context.Context, req *api.GetRequestableRolesRequest) (*types.AccessCapabilities, error) {
func (c *Cluster) GetRequestableRoles(ctx context.Context, req *api.GetRequestableRolesRequest, authClient auth.ClientI) (*types.AccessCapabilities, error) {
var (
authClient auth.ClientI
proxyClient *client.ProxyClient
err error
response *types.AccessCapabilities
err error
response *types.AccessCapabilities
)

resourceIds := make([]types.ResourceID, 0, len(req.GetResourceIds()))
Expand All @@ -237,19 +223,6 @@ func (c *Cluster) GetRequestableRoles(ctx context.Context, req *api.GetRequestab
}

err = AddMetadataToRetryableError(ctx, func() error {
//nolint:staticcheck // SA1019. TODO(tross) update to use ClusterClient
proxyClient, err = c.clusterClient.ConnectToProxy(ctx)
if err != nil {
return trace.Wrap(err)
}
defer proxyClient.Close()

authClient, err = proxyClient.ConnectToCluster(ctx, c.clusterClient.SiteName)
if err != nil {
return trace.Wrap(err)
}
defer authClient.Close()

response, err = authClient.GetAccessCapabilities(ctx, types.AccessCapabilitiesRequest{
ResourceIDs: resourceIds,
RequestableRoles: true,
Expand Down
Loading