From 08110564c9ad642e1da24c67680da50d366ea03e Mon Sep 17 00:00:00 2001 From: Zac Bergquist Date: Tue, 19 Aug 2025 11:16:27 -0600 Subject: [PATCH] Improve the dynamic Windows desktop reconciler The reconciler runs in the Windows desktop service, watching for dynamic_windows_desktop resources that match its configured labels, and creating corresponding windows_desktop resources for any matches. This change makes several improvements to the reconciler: - emit a debug-level log entry showing how long the reconciliation took - move more state into local variables in the reconciler goroutine - ensure that we hit the local process cache instead of the backend - remove a suspicious delete operation that seems to be unnecessary at best and actively harmful at worst Co-authored-by: Tim Ross --- api/client/client.go | 8 +++ lib/auth/authclient/api.go | 16 +++-- lib/auth/authclient/clt.go | 2 + lib/services/watcher.go | 4 +- lib/srv/desktop/discovery.go | 101 ++++++++++++++++-------------- lib/srv/desktop/discovery_test.go | 24 +++---- lib/srv/desktop/windows_server.go | 34 ++++++++-- 7 files changed, 116 insertions(+), 73 deletions(-) diff --git a/api/client/client.go b/api/client/client.go index 9f69e4557bf60..52245ed3d48d1 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -2787,6 +2787,14 @@ func (c *Client) DynamicDesktopClient() *dynamicwindows.Client { return dynamicwindows.NewClient(dynamicwindowsv1.NewDynamicWindowsServiceClient(c.conn)) } +func (c *Client) ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error) { + return c.DynamicDesktopClient().ListDynamicWindowsDesktops(ctx, pageSize, pageToken) +} + +func (c *Client) GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) { + return c.DynamicDesktopClient().GetDynamicWindowsDesktop(ctx, name) +} + // ClusterConfigClient returns an unadorned Cluster Configuration client, using the underlying // Auth gRPC connection. func (c *Client) ClusterConfigClient() clusterconfigpb.ClusterConfigServiceClient { diff --git a/lib/auth/authclient/api.go b/lib/auth/authclient/api.go index cd1db002d3a53..8614e0c6f1ff0 100644 --- a/lib/auth/authclient/api.go +++ b/lib/auth/authclient/api.go @@ -705,14 +705,20 @@ type ReadWindowsDesktopAccessPoint interface { // GetNamespace returns namespace by name GetNamespace(name string) (*types.Namespace, error) - // GetWindowsDesktops returns windows desktop hosts. - GetWindowsDesktops(ctx context.Context, filter types.WindowsDesktopFilter) ([]types.WindowsDesktop, error) + // ListWindowsDesktops returns Windows desktop hosts. + ListWindowsDesktops(ctx context.Context, req types.ListWindowsDesktopsRequest) (*types.ListWindowsDesktopsResponse, error) - // GetWindowsDesktopServices returns windows desktop hosts. - GetWindowsDesktopServices(ctx context.Context) ([]types.WindowsDesktopService, error) + // ListWindowsDesktopServices returns Windows desktop services. + ListWindowsDesktopServices(ctx context.Context, req types.ListWindowsDesktopServicesRequest) (*types.ListWindowsDesktopServicesResponse, error) - // GetWindowsDesktopService returns a windows desktop host by name. + // GetWindowsDesktopService returns a Windows desktop service by name. GetWindowsDesktopService(ctx context.Context, name string) (types.WindowsDesktopService, error) + + // GetDynamicWindowsDesktop gets a dynamic Windows desktop by name. + GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) + + // ListDynamicWindowsDesktops returns dynamic Windows desktops. + ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error) } // WindowsDesktopAccessPoint is an API interface implemented by a certificate authority (CA) to be diff --git a/lib/auth/authclient/clt.go b/lib/auth/authclient/clt.go index 4c4ec8e24933c..267e0d4f76af4 100644 --- a/lib/auth/authclient/clt.go +++ b/lib/auth/authclient/clt.go @@ -1640,6 +1640,8 @@ type ClientI interface { types.WebTokensGetter DynamicDesktopClient() *dynamicwindows.Client + GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) + ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error) // TrustClient returns a client to the Trust service. TrustClient() trustpb.TrustServiceClient diff --git a/lib/services/watcher.go b/lib/services/watcher.go index 730dfda754c02..4caad86a41925 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -594,8 +594,8 @@ type GenericWatcherConfig[T any, R any] struct { ResourceDiffer func(old, new T) bool // ResourceKey defines how the resources should be keyed. ResourceKey func(resource T) string - // ResourcesC is a channel used to report the current resourxe set. It receives - // a fresh list at startup and subsequently a list of all known resourxes + // ResourcesC is a channel used to report the current resource set. It receives + // a fresh list at startup and subsequently a list of all known resources // whenever an addition or deletion is detected. ResourcesC chan []T // CloneFunc defines how a resource is cloned. All resources provided via diff --git a/lib/srv/desktop/discovery.go b/lib/srv/desktop/discovery.go index e7b5747533126..03d1b45c114b2 100644 --- a/lib/srv/desktop/discovery.go +++ b/lib/srv/desktop/discovery.go @@ -38,7 +38,6 @@ import ( "github.com/gravitational/teleport/lib/auth/windows" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/services" - "github.com/gravitational/teleport/lib/services/readonly" "github.com/gravitational/teleport/lib/utils" ) @@ -371,71 +370,73 @@ func (s *WindowsService) ldapEntryToWindowsDesktop( } // startDynamicReconciler starts resource watcher and reconciler that registers/unregisters Windows desktops -// according to the up-to-date list of dynamic Windows desktops resources. -func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services.GenericWatcher[types.DynamicWindowsDesktop, readonly.DynamicWindowsDesktop], error) { +// according to the up-to-date list of dynamic Windows desktop resources. The reconciler runs until the +// provided context expires. +func (s *WindowsService) startDynamicReconciler(ctx context.Context) error { if len(s.cfg.ResourceMatchers) == 0 { s.cfg.Logger.DebugContext(ctx, "Not starting dynamic desktop resource watcher.") - return nil, nil - } - s.cfg.Logger.DebugContext(ctx, "Starting dynamic desktop resource watcher.") - dynamicDesktopClient := s.cfg.AuthClient.DynamicDesktopClient() - watcher, err := services.NewDynamicWindowsDesktopWatcher(ctx, services.DynamicWindowsDesktopWatcherConfig{ - DynamicWindowsDesktopGetter: dynamicDesktopClient, - ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: teleport.ComponentWindowsDesktop, - Client: s.cfg.AccessPoint, - }, - }) - - if err != nil { - return nil, trace.Wrap(err) + return nil } + // errCh is used to indicate whether the background reconciliation routine + // starts successfully + errCh := make(chan error) - currentResources := make(map[string]types.WindowsDesktop) - var newResources map[string]types.WindowsDesktop - - reconciler, err := services.NewReconciler(services.ReconcilerConfig[types.WindowsDesktop]{ - Matcher: func(desktop types.WindowsDesktop) bool { - return services.MatchResourceLabels(s.cfg.ResourceMatchers, desktop.GetAllLabels()) - }, - GetCurrentResources: func() map[string]types.WindowsDesktop { - maps.DeleteFunc(currentResources, func(_ string, v types.WindowsDesktop) bool { - d, err := s.cfg.AuthClient.GetWindowsDesktops(ctx, types.WindowsDesktopFilter{ - HostID: v.GetHostID(), - Name: v.GetName(), - }) - return err != nil || len(d) == 0 - }) - return currentResources - }, - GetNewResources: func() map[string]types.WindowsDesktop { - return newResources - }, - OnCreate: s.upsertDesktop, - OnUpdate: s.updateDesktop, - OnDelete: s.deleteDesktop, - }) - if err != nil { - return nil, trace.Wrap(err) - } go func() { - defer s.cfg.Logger.DebugContext(ctx, "DynamicWindowsDesktop resource watcher done.") + s.cfg.Logger.DebugContext(ctx, "Starting dynamic desktop resource watcher.") + watcher, err := services.NewDynamicWindowsDesktopWatcher(ctx, services.DynamicWindowsDesktopWatcherConfig{ + DynamicWindowsDesktopGetter: s.cfg.AccessPoint, + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: teleport.ComponentWindowsDesktop, + Client: s.cfg.AccessPoint, + }, + }) + if err != nil { + errCh <- trace.Wrap(err) + return + } + defer watcher.Close() + defer s.cfg.Logger.DebugContext(ctx, "DynamicWindowsDesktop resource watcher done.") + + currentResources := make(map[string]types.WindowsDesktop) + var newResources map[string]types.WindowsDesktop + + reconciler, err := services.NewReconciler(services.ReconcilerConfig[types.WindowsDesktop]{ + Matcher: func(desktop types.WindowsDesktop) bool { + return services.MatchResourceLabels(s.cfg.ResourceMatchers, desktop.GetAllLabels()) + }, + GetCurrentResources: func() map[string]types.WindowsDesktop { return currentResources }, + GetNewResources: func() map[string]types.WindowsDesktop { return newResources }, + OnCreate: s.upsertDesktop, + OnUpdate: s.updateDesktop, + OnDelete: s.deleteDesktop, + }) + if err != nil { + errCh <- trace.Wrap(err) + return + } + + // If we got here, the reconciler is running. + errCh <- nil + tickDuration := 5 * time.Minute expiryDuration := tickDuration + 2*time.Minute + tick := s.cfg.Clock.NewTicker(tickDuration) defer tick.Stop() + for { select { case desktops := <-watcher.ResourcesC: + start := s.cfg.Clock.Now() newResources = make(map[string]types.WindowsDesktop) for _, dynamicDesktop := range desktops { desktop, err := s.toWindowsDesktop(dynamicDesktop) - desktop.SetExpiry(s.cfg.Clock.Now().Add(expiryDuration)) if err != nil { s.cfg.Logger.WarnContext(ctx, "Can't create desktop resource", "error", err) continue } + desktop.SetExpiry(s.cfg.Clock.Now().Add(expiryDuration)) newResources[dynamicDesktop.GetName()] = desktop } if err := reconciler.Reconcile(ctx); err != nil { @@ -443,7 +444,9 @@ func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services. continue } currentResources = newResources + s.cfg.Logger.DebugContext(ctx, "completed dynamic desktop reconciliation", "duration", s.cfg.Clock.Since(start), "count", len(newResources)) case <-tick.Chan(): + start := s.cfg.Clock.Now() newResources = make(map[string]types.WindowsDesktop) for k, v := range currentResources { newResources[k] = v.Copy() @@ -454,6 +457,7 @@ func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services. continue } currentResources = newResources + s.cfg.Logger.DebugContext(ctx, "completed dynamic desktop reconciliation", "duration", s.cfg.Clock.Since(start), "count", len(newResources)) case <-watcher.Done(): return case <-ctx.Done(): @@ -461,15 +465,18 @@ func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services. } } }() - return watcher, nil + + return trace.Wrap(<-errCh, "could not start dynamic desktop reconciler") } func (s *WindowsService) toWindowsDesktop(dynamicDesktop types.DynamicWindowsDesktop) (*types.WindowsDesktopV3, error) { width, height := dynamicDesktop.GetScreenSize() + desktopLabels := dynamicDesktop.GetAllLabels() labels := make(map[string]string, len(desktopLabels)+1) maps.Copy(labels, desktopLabels) labels[types.OriginLabel] = types.OriginDynamic + return types.NewWindowsDesktopV3(dynamicDesktop.GetName(), labels, types.WindowsDesktopSpecV3{ Addr: dynamicDesktop.GetAddr(), Domain: dynamicDesktop.GetDomain(), diff --git a/lib/srv/desktop/discovery_test.go b/lib/srv/desktop/discovery_test.go index bad16d7d4655f..39d06e45438a1 100644 --- a/lib/srv/desktop/discovery_test.go +++ b/lib/srv/desktop/discovery_test.go @@ -252,10 +252,10 @@ func TestDynamicWindowsDiscovery(t *testing.T) { }, }, } - reconciler, err := s.startDynamicReconciler(ctx) - require.NoError(t, err) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + require.NoError(t, s.startDynamicReconciler(ctx)) t.Cleanup(func() { - reconciler.Close() require.NoError(t, authServer.AuthServer.DeleteAllWindowsDesktops(ctx)) require.NoError(t, authServer.AuthServer.DeleteAllDynamicWindowsDesktops(ctx)) }) @@ -318,21 +318,15 @@ func TestDynamicWindowsDiscoveryExpiry(t *testing.T) { Dir: t.TempDir(), }) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, authServer.Close()) - }) + t.Cleanup(func() { require.NoError(t, authServer.Close()) }) tlsServer, err := authServer.NewTestTLSServer() require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, tlsServer.Close()) - }) + t.Cleanup(func() { require.NoError(t, tlsServer.Close()) }) client, err := tlsServer.NewClient(authtest.TestServerID(types.RoleWindowsDesktop, "test-host-id")) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, client.Close()) - }) + t.Cleanup(func() { require.NoError(t, client.Close()) }) dynamicWindowsClient := client.DynamicDesktopClient() @@ -362,8 +356,9 @@ func TestDynamicWindowsDiscoveryExpiry(t *testing.T) { }, }, } - _, err = s.startDynamicReconciler(ctx) - require.NoError(t, err) + ctx, cancel = context.WithCancel(ctx) + defer cancel() + require.NoError(t, s.startDynamicReconciler(ctx)) desktop, err := types.NewDynamicWindowsDesktopV1("test", map[string]string{ "foo": "bar", @@ -384,6 +379,7 @@ func TestDynamicWindowsDiscoveryExpiry(t *testing.T) { err = client.DeleteWindowsDesktop(ctx, s.cfg.Heartbeat.HostUUID, "test") require.NoError(t, err) + desktops, err := client.GetWindowsDesktops(ctx, types.WindowsDesktopFilter{}) require.NoError(t, err) require.Empty(t, desktops) diff --git a/lib/srv/desktop/windows_server.go b/lib/srv/desktop/windows_server.go index df5e84fe3d386..e36bc85a79707 100644 --- a/lib/srv/desktop/windows_server.go +++ b/lib/srv/desktop/windows_server.go @@ -42,6 +42,7 @@ import ( apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/api/utils/clientutils" "github.com/gravitational/teleport/lib/auth" "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/auth/windows" @@ -49,6 +50,7 @@ import ( "github.com/gravitational/teleport/lib/defaults" libevents "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/events/recorder" + "github.com/gravitational/teleport/lib/itertools/stream" "github.com/gravitational/teleport/lib/limiter" "github.com/gravitational/teleport/lib/modules" "github.com/gravitational/teleport/lib/reversetunnel" @@ -409,7 +411,7 @@ func NewWindowsService(cfg WindowsServiceConfig) (*WindowsService, error) { return nil, trace.Wrap(err) } - if _, err := s.startDynamicReconciler(ctx); err != nil { + if err := s.startDynamicReconciler(ctx); err != nil { return nil, trace.Wrap(err) } @@ -801,8 +803,19 @@ func (s *WindowsService) handleConnection(proxyConn *tls.Conn) { desktopName := strings.TrimSuffix(proxyConn.ConnectionState().ServerName, SNISuffix) log = log.With("desktop_name", desktopName) - desktops, err := s.cfg.AccessPoint.GetWindowsDesktops(ctx, - types.WindowsDesktopFilter{HostID: s.cfg.Heartbeat.HostUUID, Name: desktopName}) + desktops, err := stream.Collect(clientutils.Resources(ctx, + func(ctx context.Context, pageSize int, pageToken string) ([]types.WindowsDesktop, string, error) { + resp, err := s.cfg.AccessPoint.ListWindowsDesktops(ctx, types.ListWindowsDesktopsRequest{ + WindowsDesktopFilter: types.WindowsDesktopFilter{HostID: s.cfg.Heartbeat.HostUUID, Name: desktopName}, + Limit: pageSize, + StartKey: pageToken, + }) + if err != nil { + return nil, "", trace.Wrap(err) + } + + return resp.Desktops, resp.NextKey, nil + })) if err != nil { log.WarnContext(ctx, "Failed to fetch desktop by name", "error", err) sendTDPError("Teleport failed to find the requested desktop in its database.") @@ -1278,8 +1291,19 @@ func (s *WindowsService) staticHostHeartbeatInfo(host servicecfg.WindowsHost, // a very large number of desktops in the cluster, this may use up a lot of CPU // time. func (s *WindowsService) nameForStaticHost(addr string) (string, error) { - desktops, err := s.cfg.AccessPoint.GetWindowsDesktops(s.closeCtx, - types.WindowsDesktopFilter{}) + desktops, err := stream.Collect(clientutils.Resources(s.closeCtx, + func(ctx context.Context, pageSize int, pageToken string) ([]types.WindowsDesktop, string, error) { + resp, err := s.cfg.AccessPoint.ListWindowsDesktops(ctx, types.ListWindowsDesktopsRequest{ + Limit: pageSize, + StartKey: pageToken, + SearchKeywords: []string{addr}, + }) + if err != nil { + return nil, "", trace.Wrap(err) + } + + return resp.Desktops, resp.NextKey, nil + })) if err != nil { return "", trace.Wrap(err) }