Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2746,6 +2746,14 @@ func (c *Client) DynamicDesktopClient() *dynamicwindows.Client {
return dynamicwindows.NewClient(dynamicwindowsv1.NewDynamicWindowsServiceClient(c.conn))
}

func (c *Client) ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error) {
return c.DynamicDesktopClient().ListDynamicWindowsDesktops(ctx, pageSize, pageToken)
}

func (c *Client) GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) {
return c.DynamicDesktopClient().GetDynamicWindowsDesktop(ctx, name)
}

// ClusterConfigClient returns an unadorned Cluster Configuration client, using the underlying
// Auth gRPC connection.
func (c *Client) ClusterConfigClient() clusterconfigpb.ClusterConfigServiceClient {
Expand Down
16 changes: 11 additions & 5 deletions lib/auth/authclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,14 +701,20 @@ type ReadWindowsDesktopAccessPoint interface {
// GetRoles returns a list of roles
GetRoles(ctx context.Context) ([]types.Role, error)

// GetWindowsDesktops returns windows desktop hosts.
GetWindowsDesktops(ctx context.Context, filter types.WindowsDesktopFilter) ([]types.WindowsDesktop, error)
// ListWindowsDesktops returns Windows desktop hosts.
ListWindowsDesktops(ctx context.Context, req types.ListWindowsDesktopsRequest) (*types.ListWindowsDesktopsResponse, error)

// GetWindowsDesktopServices returns windows desktop hosts.
GetWindowsDesktopServices(ctx context.Context) ([]types.WindowsDesktopService, error)
// ListWindowsDesktopServices returns Windows desktop services.
ListWindowsDesktopServices(ctx context.Context, req types.ListWindowsDesktopServicesRequest) (*types.ListWindowsDesktopServicesResponse, error)

// GetWindowsDesktopService returns a windows desktop host by name.
// GetWindowsDesktopService returns a Windows desktop service by name.
GetWindowsDesktopService(ctx context.Context, name string) (types.WindowsDesktopService, error)

// GetDynamicWindowsDesktop gets a dynamic Windows desktop by name.
GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error)

// ListDynamicWindowsDesktops returns dynamic Windows desktops.
ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error)
}

// WindowsDesktopAccessPoint is an API interface implemented by a certificate authority (CA) to be
Expand Down
2 changes: 2 additions & 0 deletions lib/auth/authclient/clt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,8 @@ type ClientI interface {
types.WebTokensGetter

DynamicDesktopClient() *dynamicwindows.Client
GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error)
ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error)

// TrustClient returns a client to the Trust service.
TrustClient() trustpb.TrustServiceClient
Expand Down
4 changes: 2 additions & 2 deletions lib/services/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,8 @@ type GenericWatcherConfig[T any, R any] struct {
ResourceDiffer func(old, new T) bool
// ResourceKey defines how the resources should be keyed.
ResourceKey func(resource T) string
// ResourcesC is a channel used to report the current resourxe set. It receives
// a fresh list at startup and subsequently a list of all known resourxes
// ResourcesC is a channel used to report the current resource set. It receives
// a fresh list at startup and subsequently a list of all known resources
// whenever an addition or deletion is detected.
ResourcesC chan []T
// CloneFunc defines how a resource is cloned. All resources provided via
Expand Down
101 changes: 54 additions & 47 deletions lib/srv/desktop/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/service/servicecfg"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/readonly"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/winpki"
)
Expand Down Expand Up @@ -360,79 +359,83 @@ func (s *WindowsService) ldapEntryToWindowsDesktop(
}

// startDynamicReconciler starts resource watcher and reconciler that registers/unregisters Windows desktops
// according to the up-to-date list of dynamic Windows desktops resources.
func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services.GenericWatcher[types.DynamicWindowsDesktop, readonly.DynamicWindowsDesktop], error) {
// according to the up-to-date list of dynamic Windows desktop resources. The reconciler runs until the
// provided context expires.
func (s *WindowsService) startDynamicReconciler(ctx context.Context) error {
if len(s.cfg.ResourceMatchers) == 0 {
s.cfg.Logger.DebugContext(ctx, "Not starting dynamic desktop resource watcher.")
return nil, nil
}
s.cfg.Logger.DebugContext(ctx, "Starting dynamic desktop resource watcher.")
dynamicDesktopClient := s.cfg.AuthClient.DynamicDesktopClient()
watcher, err := services.NewDynamicWindowsDesktopWatcher(ctx, services.DynamicWindowsDesktopWatcherConfig{
DynamicWindowsDesktopGetter: dynamicDesktopClient,
ResourceWatcherConfig: services.ResourceWatcherConfig{
Component: teleport.ComponentWindowsDesktop,
Client: s.cfg.AccessPoint,
},
})

if err != nil {
return nil, trace.Wrap(err)
return nil
}
// errCh is used to indicate whether the background reconciliation routine
// starts successfully
errCh := make(chan error)

currentResources := make(map[string]types.WindowsDesktop)
var newResources map[string]types.WindowsDesktop

reconciler, err := services.NewReconciler(services.ReconcilerConfig[types.WindowsDesktop]{
Matcher: func(desktop types.WindowsDesktop) bool {
return services.MatchResourceLabels(s.cfg.ResourceMatchers, desktop.GetAllLabels())
},
GetCurrentResources: func() map[string]types.WindowsDesktop {
maps.DeleteFunc(currentResources, func(_ string, v types.WindowsDesktop) bool {
d, err := s.cfg.AuthClient.GetWindowsDesktops(ctx, types.WindowsDesktopFilter{
HostID: v.GetHostID(),
Name: v.GetName(),
})
return err != nil || len(d) == 0
})
Comment on lines -391 to -397
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the problematic code. The rest of the changes in the PR are subtle improvements.

return currentResources
},
GetNewResources: func() map[string]types.WindowsDesktop {
return newResources
},
OnCreate: s.upsertDesktop,
OnUpdate: s.updateDesktop,
OnDelete: s.deleteDesktop,
})
if err != nil {
return nil, trace.Wrap(err)
}
go func() {
defer s.cfg.Logger.DebugContext(ctx, "DynamicWindowsDesktop resource watcher done.")
s.cfg.Logger.DebugContext(ctx, "Starting dynamic desktop resource watcher.")
watcher, err := services.NewDynamicWindowsDesktopWatcher(ctx, services.DynamicWindowsDesktopWatcherConfig{
DynamicWindowsDesktopGetter: s.cfg.AccessPoint,
ResourceWatcherConfig: services.ResourceWatcherConfig{
Component: teleport.ComponentWindowsDesktop,
Client: s.cfg.AccessPoint,
},
})
if err != nil {
errCh <- trace.Wrap(err)
return
}

defer watcher.Close()
defer s.cfg.Logger.DebugContext(ctx, "DynamicWindowsDesktop resource watcher done.")

currentResources := make(map[string]types.WindowsDesktop)
var newResources map[string]types.WindowsDesktop

reconciler, err := services.NewReconciler(services.ReconcilerConfig[types.WindowsDesktop]{
Matcher: func(desktop types.WindowsDesktop) bool {
return services.MatchResourceLabels(s.cfg.ResourceMatchers, desktop.GetAllLabels())
},
GetCurrentResources: func() map[string]types.WindowsDesktop { return currentResources },
GetNewResources: func() map[string]types.WindowsDesktop { return newResources },
OnCreate: s.upsertDesktop,
OnUpdate: s.updateDesktop,
OnDelete: s.deleteDesktop,
})
if err != nil {
errCh <- trace.Wrap(err)
return
}

// If we got here, the reconciler is running.
errCh <- nil

tickDuration := 5 * time.Minute
expiryDuration := tickDuration + 2*time.Minute

tick := s.cfg.Clock.NewTicker(tickDuration)
defer tick.Stop()

for {
select {
case desktops := <-watcher.ResourcesC:
start := s.cfg.Clock.Now()
newResources = make(map[string]types.WindowsDesktop)
for _, dynamicDesktop := range desktops {
desktop, err := s.toWindowsDesktop(dynamicDesktop)
desktop.SetExpiry(s.cfg.Clock.Now().Add(expiryDuration))
if err != nil {
s.cfg.Logger.WarnContext(ctx, "Can't create desktop resource", "error", err)
continue
}
desktop.SetExpiry(s.cfg.Clock.Now().Add(expiryDuration))
newResources[dynamicDesktop.GetName()] = desktop
}
if err := reconciler.Reconcile(ctx); err != nil {
s.cfg.Logger.WarnContext(ctx, "Reconciliation failed, will retry", "error", err)
continue
}
currentResources = newResources
s.cfg.Logger.DebugContext(ctx, "completed dynamic desktop reconciliation", "duration", s.cfg.Clock.Since(start), "count", len(newResources))
case <-tick.Chan():
start := s.cfg.Clock.Now()
newResources = make(map[string]types.WindowsDesktop)
for k, v := range currentResources {
newResources[k] = v.Copy()
Expand All @@ -443,22 +446,26 @@ func (s *WindowsService) startDynamicReconciler(ctx context.Context) (*services.
continue
}
currentResources = newResources
s.cfg.Logger.DebugContext(ctx, "completed dynamic desktop reconciliation", "duration", s.cfg.Clock.Since(start), "count", len(newResources))
case <-watcher.Done():
return
case <-ctx.Done():
return
}
}
}()
return watcher, nil

return trace.Wrap(<-errCh, "could not start dynamic desktop reconciler")
}

func (s *WindowsService) toWindowsDesktop(dynamicDesktop types.DynamicWindowsDesktop) (*types.WindowsDesktopV3, error) {
width, height := dynamicDesktop.GetScreenSize()

desktopLabels := dynamicDesktop.GetAllLabels()
labels := make(map[string]string, len(desktopLabels)+1)
maps.Copy(labels, desktopLabels)
labels[types.OriginLabel] = types.OriginDynamic

return types.NewWindowsDesktopV3(dynamicDesktop.GetName(), labels, types.WindowsDesktopSpecV3{
Addr: dynamicDesktop.GetAddr(),
Domain: dynamicDesktop.GetDomain(),
Expand Down
22 changes: 8 additions & 14 deletions lib/srv/desktop/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,9 @@ func TestDynamicWindowsDiscovery(t *testing.T) {
},
},
}
reconciler, err := s.startDynamicReconciler(ctx)
require.NoError(t, err)

require.NoError(t, s.startDynamicReconciler(t.Context()))
t.Cleanup(func() {
reconciler.Close()
require.NoError(t, authServer.AuthServer.DeleteAllWindowsDesktops(ctx))
var key string
for {
Expand Down Expand Up @@ -327,21 +326,15 @@ func TestDynamicWindowsDiscoveryExpiry(t *testing.T) {
Dir: t.TempDir(),
})
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, authServer.Close())
})
t.Cleanup(func() { require.NoError(t, authServer.Close()) })

tlsServer, err := authServer.NewTestTLSServer()
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, tlsServer.Close())
})
t.Cleanup(func() { require.NoError(t, tlsServer.Close()) })

client, err := tlsServer.NewClient(authtest.TestServerID(types.RoleWindowsDesktop, "test-host-id"))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close())
})
t.Cleanup(func() { require.NoError(t, client.Close()) })

dynamicWindowsClient := client.DynamicDesktopClient()

Expand Down Expand Up @@ -371,8 +364,8 @@ func TestDynamicWindowsDiscoveryExpiry(t *testing.T) {
},
},
}
_, err = s.startDynamicReconciler(ctx)
require.NoError(t, err)

require.NoError(t, s.startDynamicReconciler(t.Context()))

desktop, err := types.NewDynamicWindowsDesktopV1("test", map[string]string{
"foo": "bar",
Expand All @@ -393,6 +386,7 @@ func TestDynamicWindowsDiscoveryExpiry(t *testing.T) {

err = client.DeleteWindowsDesktop(ctx, s.cfg.Heartbeat.HostUUID, "test")
require.NoError(t, err)

desktops, err := client.GetWindowsDesktops(ctx, types.WindowsDesktopFilter{})
require.NoError(t, err)
require.Empty(t, desktops)
Expand Down
34 changes: 29 additions & 5 deletions lib/srv/desktop/windows_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ import (
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/api/utils/clientutils"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/authz"
"github.com/gravitational/teleport/lib/defaults"
libevents "github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/recorder"
"github.com/gravitational/teleport/lib/itertools/stream"
"github.com/gravitational/teleport/lib/limiter"
"github.com/gravitational/teleport/lib/reversetunnelclient"
"github.com/gravitational/teleport/lib/service/servicecfg"
Expand Down Expand Up @@ -396,7 +398,7 @@ func NewWindowsService(cfg WindowsServiceConfig) (*WindowsService, error) {
return nil, trace.Wrap(err)
}

if _, err := s.startDynamicReconciler(ctx); err != nil {
if err := s.startDynamicReconciler(ctx); err != nil {
return nil, trace.Wrap(err)
}

Expand Down Expand Up @@ -632,8 +634,19 @@ func (s *WindowsService) handleConnection(proxyConn *tls.Conn) {
desktopName := strings.TrimSuffix(proxyConn.ConnectionState().ServerName, SNISuffix)
log = log.With("desktop_name", desktopName)

desktops, err := s.cfg.AccessPoint.GetWindowsDesktops(ctx,
types.WindowsDesktopFilter{HostID: s.cfg.Heartbeat.HostUUID, Name: desktopName})
desktops, err := stream.Collect(clientutils.Resources(ctx,
func(ctx context.Context, pageSize int, pageToken string) ([]types.WindowsDesktop, string, error) {
resp, err := s.cfg.AccessPoint.ListWindowsDesktops(ctx, types.ListWindowsDesktopsRequest{
WindowsDesktopFilter: types.WindowsDesktopFilter{HostID: s.cfg.Heartbeat.HostUUID, Name: desktopName},
Limit: pageSize,
StartKey: pageToken,
})
if err != nil {
return nil, "", trace.Wrap(err)
}

return resp.Desktops, resp.NextKey, nil
}))
if err != nil {
log.WarnContext(ctx, "Failed to fetch desktop by name", "error", err)
sendTDPError("Teleport failed to find the requested desktop in its database.")
Expand Down Expand Up @@ -1107,8 +1120,19 @@ func (s *WindowsService) staticHostHeartbeatInfo(host servicecfg.WindowsHost,
// a very large number of desktops in the cluster, this may use up a lot of CPU
// time.
func (s *WindowsService) nameForStaticHost(addr string) (string, error) {
desktops, err := s.cfg.AccessPoint.GetWindowsDesktops(s.closeCtx,
types.WindowsDesktopFilter{})
desktops, err := stream.Collect(clientutils.Resources(s.closeCtx,
func(ctx context.Context, pageSize int, pageToken string) ([]types.WindowsDesktop, string, error) {
resp, err := s.cfg.AccessPoint.ListWindowsDesktops(ctx, types.ListWindowsDesktopsRequest{
Limit: pageSize,
StartKey: pageToken,
SearchKeywords: []string{addr},
})
if err != nil {
return nil, "", trace.Wrap(err)
}

return resp.Desktops, resp.NextKey, nil
}))
if err != nil {
return "", trace.Wrap(err)
}
Expand Down
Loading