diff --git a/api/client/client.go b/api/client/client.go index 9cd1fdc66e53b..ba1184f744965 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -2746,6 +2746,14 @@ func (c *Client) DynamicDesktopClient() *dynamicwindows.Client { return dynamicwindows.NewClient(dynamicwindowsv1.NewDynamicWindowsServiceClient(c.conn)) } +func (c *Client) ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error) { + return c.DynamicDesktopClient().ListDynamicWindowsDesktops(ctx, pageSize, pageToken) +} + +func (c *Client) GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) { + return c.DynamicDesktopClient().GetDynamicWindowsDesktop(ctx, name) +} + // ClusterConfigClient returns an unadorned Cluster Configuration client, using the underlying // Auth gRPC connection. func (c *Client) ClusterConfigClient() clusterconfigpb.ClusterConfigServiceClient { diff --git a/lib/auth/authclient/api.go b/lib/auth/authclient/api.go index 22a064de749ad..65e4bb6abc7d5 100644 --- a/lib/auth/authclient/api.go +++ b/lib/auth/authclient/api.go @@ -701,14 +701,20 @@ type ReadWindowsDesktopAccessPoint interface { // GetRoles returns a list of roles GetRoles(ctx context.Context) ([]types.Role, error) - // GetWindowsDesktops returns windows desktop hosts. - GetWindowsDesktops(ctx context.Context, filter types.WindowsDesktopFilter) ([]types.WindowsDesktop, error) + // ListWindowsDesktops returns Windows desktop hosts. + ListWindowsDesktops(ctx context.Context, req types.ListWindowsDesktopsRequest) (*types.ListWindowsDesktopsResponse, error) - // GetWindowsDesktopServices returns windows desktop hosts. - GetWindowsDesktopServices(ctx context.Context) ([]types.WindowsDesktopService, error) + // ListWindowsDesktopServices returns Windows desktop services. + ListWindowsDesktopServices(ctx context.Context, req types.ListWindowsDesktopServicesRequest) (*types.ListWindowsDesktopServicesResponse, error) - // GetWindowsDesktopService returns a windows desktop host by name. + // GetWindowsDesktopService returns a Windows desktop service by name. GetWindowsDesktopService(ctx context.Context, name string) (types.WindowsDesktopService, error) + + // GetDynamicWindowsDesktop gets a dynamic Windows desktop by name. + GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) + + // ListDynamicWindowsDesktops returns dynamic Windows desktops. + ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error) } // WindowsDesktopAccessPoint is an API interface implemented by a certificate authority (CA) to be diff --git a/lib/auth/authclient/clt.go b/lib/auth/authclient/clt.go index e938e91cd97c7..dbae70cbb4d0a 100644 --- a/lib/auth/authclient/clt.go +++ b/lib/auth/authclient/clt.go @@ -1482,6 +1482,8 @@ type ClientI interface { types.WebTokensGetter DynamicDesktopClient() *dynamicwindows.Client + GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) + ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error) // TrustClient returns a client to the Trust service. TrustClient() trustpb.TrustServiceClient diff --git a/lib/services/watcher.go b/lib/services/watcher.go index cd0a5f27800d2..6c2655235fad7 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -583,8 +583,8 @@ type GenericWatcherConfig[T any, R any] struct { ResourceDiffer func(old, new T) bool // ResourceKey defines how the resources should be keyed. ResourceKey func(resource T) string - // ResourcesC is a channel used to report the current resourxe set. It receives - // a fresh list at startup and subsequently a list of all known resourxes + // ResourcesC is a channel used to report the current resource set. It receives + // a fresh list at startup and subsequently a list of all known resources // whenever an addition or deletion is detected. ResourcesC chan []T // CloneFunc defines how a resource is cloned. All resources provided via diff --git a/lib/srv/desktop/discovery.go b/lib/srv/desktop/discovery.go index 1d65113cc36fb..7311947ce2772 100644 --- a/lib/srv/desktop/discovery.go +++ b/lib/srv/desktop/discovery.go @@ -37,7 +37,6 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/services" - "github.com/gravitational/teleport/lib/services/readonly" "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/winpki" ) @@ -360,71 +359,73 @@ func (s *WindowsService) ldapEntryToWindowsDesktop( } // startDynamicReconciler starts resource watcher and reconciler that registers/unregisters Windows desktops -// according to the up-to-date list of dynamic Windows desktops resources. -func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services.GenericWatcher[types.DynamicWindowsDesktop, readonly.DynamicWindowsDesktop], error) { +// according to the up-to-date list of dynamic Windows desktop resources. The reconciler runs until the +// provided context expires. +func (s *WindowsService) startDynamicReconciler(ctx context.Context) error { if len(s.cfg.ResourceMatchers) == 0 { s.cfg.Logger.DebugContext(ctx, "Not starting dynamic desktop resource watcher.") - return nil, nil - } - s.cfg.Logger.DebugContext(ctx, "Starting dynamic desktop resource watcher.") - dynamicDesktopClient := s.cfg.AuthClient.DynamicDesktopClient() - watcher, err := services.NewDynamicWindowsDesktopWatcher(ctx, services.DynamicWindowsDesktopWatcherConfig{ - DynamicWindowsDesktopGetter: dynamicDesktopClient, - ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: teleport.ComponentWindowsDesktop, - Client: s.cfg.AccessPoint, - }, - }) - - if err != nil { - return nil, trace.Wrap(err) + return nil } + // errCh is used to indicate whether the background reconciliation routine + // starts successfully + errCh := make(chan error) - currentResources := make(map[string]types.WindowsDesktop) - var newResources map[string]types.WindowsDesktop - - reconciler, err := services.NewReconciler(services.ReconcilerConfig[types.WindowsDesktop]{ - Matcher: func(desktop types.WindowsDesktop) bool { - return services.MatchResourceLabels(s.cfg.ResourceMatchers, desktop.GetAllLabels()) - }, - GetCurrentResources: func() map[string]types.WindowsDesktop { - maps.DeleteFunc(currentResources, func(_ string, v types.WindowsDesktop) bool { - d, err := s.cfg.AuthClient.GetWindowsDesktops(ctx, types.WindowsDesktopFilter{ - HostID: v.GetHostID(), - Name: v.GetName(), - }) - return err != nil || len(d) == 0 - }) - return currentResources - }, - GetNewResources: func() map[string]types.WindowsDesktop { - return newResources - }, - OnCreate: s.upsertDesktop, - OnUpdate: s.updateDesktop, - OnDelete: s.deleteDesktop, - }) - if err != nil { - return nil, trace.Wrap(err) - } go func() { - defer s.cfg.Logger.DebugContext(ctx, "DynamicWindowsDesktop resource watcher done.") + s.cfg.Logger.DebugContext(ctx, "Starting dynamic desktop resource watcher.") + watcher, err := services.NewDynamicWindowsDesktopWatcher(ctx, services.DynamicWindowsDesktopWatcherConfig{ + DynamicWindowsDesktopGetter: s.cfg.AccessPoint, + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: teleport.ComponentWindowsDesktop, + Client: s.cfg.AccessPoint, + }, + }) + if err != nil { + errCh <- trace.Wrap(err) + return + } + defer watcher.Close() + defer s.cfg.Logger.DebugContext(ctx, "DynamicWindowsDesktop resource watcher done.") + + currentResources := make(map[string]types.WindowsDesktop) + var newResources map[string]types.WindowsDesktop + + reconciler, err := services.NewReconciler(services.ReconcilerConfig[types.WindowsDesktop]{ + Matcher: func(desktop types.WindowsDesktop) bool { + return services.MatchResourceLabels(s.cfg.ResourceMatchers, desktop.GetAllLabels()) + }, + GetCurrentResources: func() map[string]types.WindowsDesktop { return currentResources }, + GetNewResources: func() map[string]types.WindowsDesktop { return newResources }, + OnCreate: s.upsertDesktop, + OnUpdate: s.updateDesktop, + OnDelete: s.deleteDesktop, + }) + if err != nil { + errCh <- trace.Wrap(err) + return + } + + // If we got here, the reconciler is running. + errCh <- nil + tickDuration := 5 * time.Minute expiryDuration := tickDuration + 2*time.Minute + tick := s.cfg.Clock.NewTicker(tickDuration) defer tick.Stop() + for { select { case desktops := <-watcher.ResourcesC: + start := s.cfg.Clock.Now() newResources = make(map[string]types.WindowsDesktop) for _, dynamicDesktop := range desktops { desktop, err := s.toWindowsDesktop(dynamicDesktop) - desktop.SetExpiry(s.cfg.Clock.Now().Add(expiryDuration)) if err != nil { s.cfg.Logger.WarnContext(ctx, "Can't create desktop resource", "error", err) continue } + desktop.SetExpiry(s.cfg.Clock.Now().Add(expiryDuration)) newResources[dynamicDesktop.GetName()] = desktop } if err := reconciler.Reconcile(ctx); err != nil { @@ -432,7 +433,9 @@ func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services. continue } currentResources = newResources + s.cfg.Logger.DebugContext(ctx, "completed dynamic desktop reconciliation", "duration", s.cfg.Clock.Since(start), "count", len(newResources)) case <-tick.Chan(): + start := s.cfg.Clock.Now() newResources = make(map[string]types.WindowsDesktop) for k, v := range currentResources { newResources[k] = v.Copy() @@ -443,6 +446,7 @@ func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services. continue } currentResources = newResources + s.cfg.Logger.DebugContext(ctx, "completed dynamic desktop reconciliation", "duration", s.cfg.Clock.Since(start), "count", len(newResources)) case <-watcher.Done(): return case <-ctx.Done(): @@ -450,15 +454,18 @@ func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services. } } }() - return watcher, nil + + return trace.Wrap(<-errCh, "could not start dynamic desktop reconciler") } func (s *WindowsService) toWindowsDesktop(dynamicDesktop types.DynamicWindowsDesktop) (*types.WindowsDesktopV3, error) { width, height := dynamicDesktop.GetScreenSize() + desktopLabels := dynamicDesktop.GetAllLabels() labels := make(map[string]string, len(desktopLabels)+1) maps.Copy(labels, desktopLabels) labels[types.OriginLabel] = types.OriginDynamic + return types.NewWindowsDesktopV3(dynamicDesktop.GetName(), labels, types.WindowsDesktopSpecV3{ Addr: dynamicDesktop.GetAddr(), Domain: dynamicDesktop.GetDomain(), diff --git a/lib/srv/desktop/discovery_test.go b/lib/srv/desktop/discovery_test.go index fdc2e22ccb592..b475a77f3d564 100644 --- a/lib/srv/desktop/discovery_test.go +++ b/lib/srv/desktop/discovery_test.go @@ -250,10 +250,9 @@ func TestDynamicWindowsDiscovery(t *testing.T) { }, }, } - reconciler, err := s.startDynamicReconciler(ctx) - require.NoError(t, err) + + require.NoError(t, s.startDynamicReconciler(t.Context())) t.Cleanup(func() { - reconciler.Close() require.NoError(t, authServer.AuthServer.DeleteAllWindowsDesktops(ctx)) var key string for { @@ -327,21 +326,15 @@ func TestDynamicWindowsDiscoveryExpiry(t *testing.T) { Dir: t.TempDir(), }) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, authServer.Close()) - }) + t.Cleanup(func() { require.NoError(t, authServer.Close()) }) tlsServer, err := authServer.NewTestTLSServer() require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, tlsServer.Close()) - }) + t.Cleanup(func() { require.NoError(t, tlsServer.Close()) }) client, err := tlsServer.NewClient(authtest.TestServerID(types.RoleWindowsDesktop, "test-host-id")) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, client.Close()) - }) + t.Cleanup(func() { require.NoError(t, client.Close()) }) dynamicWindowsClient := client.DynamicDesktopClient() @@ -371,8 +364,8 @@ func TestDynamicWindowsDiscoveryExpiry(t *testing.T) { }, }, } - _, err = s.startDynamicReconciler(ctx) - require.NoError(t, err) + + require.NoError(t, s.startDynamicReconciler(t.Context())) desktop, err := types.NewDynamicWindowsDesktopV1("test", map[string]string{ "foo": "bar", @@ -393,6 +386,7 @@ func TestDynamicWindowsDiscoveryExpiry(t *testing.T) { err = client.DeleteWindowsDesktop(ctx, s.cfg.Heartbeat.HostUUID, "test") require.NoError(t, err) + desktops, err := client.GetWindowsDesktops(ctx, types.WindowsDesktopFilter{}) require.NoError(t, err) require.Empty(t, desktops) diff --git a/lib/srv/desktop/windows_server.go b/lib/srv/desktop/windows_server.go index 72addc8d49e92..489683bfb23a6 100644 --- a/lib/srv/desktop/windows_server.go +++ b/lib/srv/desktop/windows_server.go @@ -41,11 +41,13 @@ import ( apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/api/utils/clientutils" "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/authz" "github.com/gravitational/teleport/lib/defaults" libevents "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/events/recorder" + "github.com/gravitational/teleport/lib/itertools/stream" "github.com/gravitational/teleport/lib/limiter" "github.com/gravitational/teleport/lib/reversetunnelclient" "github.com/gravitational/teleport/lib/service/servicecfg" @@ -396,7 +398,7 @@ func NewWindowsService(cfg WindowsServiceConfig) (*WindowsService, error) { return nil, trace.Wrap(err) } - if _, err := s.startDynamicReconciler(ctx); err != nil { + if err := s.startDynamicReconciler(ctx); err != nil { return nil, trace.Wrap(err) } @@ -632,8 +634,19 @@ func (s *WindowsService) handleConnection(proxyConn *tls.Conn) { desktopName := strings.TrimSuffix(proxyConn.ConnectionState().ServerName, SNISuffix) log = log.With("desktop_name", desktopName) - desktops, err := s.cfg.AccessPoint.GetWindowsDesktops(ctx, - types.WindowsDesktopFilter{HostID: s.cfg.Heartbeat.HostUUID, Name: desktopName}) + desktops, err := stream.Collect(clientutils.Resources(ctx, + func(ctx context.Context, pageSize int, pageToken string) ([]types.WindowsDesktop, string, error) { + resp, err := s.cfg.AccessPoint.ListWindowsDesktops(ctx, types.ListWindowsDesktopsRequest{ + WindowsDesktopFilter: types.WindowsDesktopFilter{HostID: s.cfg.Heartbeat.HostUUID, Name: desktopName}, + Limit: pageSize, + StartKey: pageToken, + }) + if err != nil { + return nil, "", trace.Wrap(err) + } + + return resp.Desktops, resp.NextKey, nil + })) if err != nil { log.WarnContext(ctx, "Failed to fetch desktop by name", "error", err) sendTDPError("Teleport failed to find the requested desktop in its database.") @@ -1107,8 +1120,19 @@ func (s *WindowsService) staticHostHeartbeatInfo(host servicecfg.WindowsHost, // a very large number of desktops in the cluster, this may use up a lot of CPU // time. func (s *WindowsService) nameForStaticHost(addr string) (string, error) { - desktops, err := s.cfg.AccessPoint.GetWindowsDesktops(s.closeCtx, - types.WindowsDesktopFilter{}) + desktops, err := stream.Collect(clientutils.Resources(s.closeCtx, + func(ctx context.Context, pageSize int, pageToken string) ([]types.WindowsDesktop, string, error) { + resp, err := s.cfg.AccessPoint.ListWindowsDesktops(ctx, types.ListWindowsDesktopsRequest{ + Limit: pageSize, + StartKey: pageToken, + SearchKeywords: []string{addr}, + }) + if err != nil { + return nil, "", trace.Wrap(err) + } + + return resp.Desktops, resp.NextKey, nil + })) if err != nil { return "", trace.Wrap(err) }