Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/kube/proxy/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (s *TLSServer) onCreate(ctx context.Context, cluster types.KubeCluster) err
return s.registerKubeCluster(ctx, cluster)
}

func (s *TLSServer) onUpdate(ctx context.Context, cluster types.KubeCluster) error {
func (s *TLSServer) onUpdate(ctx context.Context, cluster, _ types.KubeCluster) error {
return s.updateKubeCluster(ctx, cluster)
}

Expand Down
4 changes: 2 additions & 2 deletions lib/services/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type ReconcilerConfig[T Reconciled] struct {
// OnCreate is called when a new resource is detected.
OnCreate func(context.Context, T) error
// OnUpdate is called when an existing resource is updated.
OnUpdate func(context.Context, T) error
OnUpdate func(ctx context.Context, new, old T) error
// OnDelete is called when an existing resource is deleted.
OnDelete func(context.Context, T) error
// Log is the reconciler's logger.
Expand Down Expand Up @@ -180,7 +180,7 @@ func (r *Reconciler[T]) processNewResource(ctx context.Context, currentResources
if CompareResources(newT, registered) != Equal {
if r.cfg.Matcher(newT) {
r.log.Infof("%v %v updated, updating.", kind, name)
if err := r.cfg.OnUpdate(ctx, newT); err != nil {
if err := r.cfg.OnUpdate(ctx, newT, registered); err != nil {
return trace.Wrap(err, "failed to update %v %v", kind, name)
}
return nil
Expand Down
24 changes: 17 additions & 7 deletions lib/services/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ import (

// TestReconciler makes sure appropriate callbacks are called during reconciliation.
func TestReconciler(t *testing.T) {
type updateCall struct{ new, old testResource }
tests := []struct {
description string
selectors []ResourceMatcher
registeredResources []testResource
newResources []testResource
onCreateCalls []testResource
onUpdateCalls []testResource
onUpdateCalls []updateCall
onDeleteCalls []testResource
}{
{
Expand Down Expand Up @@ -91,7 +92,12 @@ func TestReconciler(t *testing.T) {
}},
registeredResources: []testResource{makeDynamicResource("res1", nil)},
newResources: []testResource{makeDynamicResource("res1", map[string]string{"env": "dev"})},
onUpdateCalls: []testResource{makeDynamicResource("res1", map[string]string{"env": "dev"})},
onUpdateCalls: []updateCall{
{
old: makeDynamicResource("res1", nil),
new: makeDynamicResource("res1", map[string]string{"env": "dev"}),
},
},
},
{
description: "non-matching updated resource should be removed",
Expand Down Expand Up @@ -125,8 +131,11 @@ func TestReconciler(t *testing.T) {
onCreateCalls: []testResource{
makeDynamicResource("res5", map[string]string{"env": "prod"}),
},
onUpdateCalls: []testResource{
makeDynamicResource("res2", map[string]string{"env": "prod", "a": "b"}),
onUpdateCalls: []updateCall{
{
new: makeDynamicResource("res2", map[string]string{"env": "prod", "a": "b"}),
old: makeDynamicResource("res2", map[string]string{"env": "prod"}),
},
},
onDeleteCalls: []testResource{
makeDynamicResource("res1", map[string]string{"env": "prod"}),
Expand All @@ -138,7 +147,8 @@ func TestReconciler(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
// Reconciler will record all callback calls in these lists.
var onCreateCalls, onUpdateCalls, onDeleteCalls []testResource
var onCreateCalls, onDeleteCalls []testResource
var onUpdateCalls []updateCall

reconciler, err := NewReconciler[testResource](ReconcilerConfig[testResource]{
Matcher: func(tr testResource) bool {
Expand All @@ -158,8 +168,8 @@ func TestReconciler(t *testing.T) {
onCreateCalls = append(onCreateCalls, tr)
return nil
},
OnUpdate: func(ctx context.Context, tr testResource) error {
onUpdateCalls = append(onUpdateCalls, tr)
OnUpdate: func(ctx context.Context, tr, old testResource) error {
onUpdateCalls = append(onUpdateCalls, updateCall{new: tr, old: old})
return nil
},
OnDelete: func(ctx context.Context, tr testResource) error {
Expand Down
2 changes: 1 addition & 1 deletion lib/srv/app/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *Server) onCreate(ctx context.Context, app types.Application) error {
return s.registerApp(ctx, app)
}

func (s *Server) onUpdate(ctx context.Context, app types.Application) error {
func (s *Server) onUpdate(ctx context.Context, app, _ types.Application) error {
return s.updateApp(ctx, app)
}

Expand Down
2 changes: 1 addition & 1 deletion lib/srv/db/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s *Server) onCreate(ctx context.Context, database types.Database) error {
}

// onUpdate is called by reconciler when an already proxied database is updated.
func (s *Server) onUpdate(ctx context.Context, database types.Database) error {
func (s *Server) onUpdate(ctx context.Context, database, _ types.Database) error {
// OnUpdate receives a "new" resource from s.monitoredDatabases. Make a
// copy here so that any attribute changes to the proxied database will not
// affect database objects tracked in s.monitoredDatabases.
Expand Down
6 changes: 5 additions & 1 deletion lib/srv/desktop/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *WindowsService) startDesktopDiscovery() error {
GetCurrentResources: func() map[string]types.WindowsDesktop { return s.lastDiscoveryResults },
GetNewResources: s.getDesktopsFromLDAP,
OnCreate: s.upsertDesktop,
OnUpdate: s.upsertDesktop,
OnUpdate: s.updateDesktop,
OnDelete: s.deleteDesktop,
Log: s.cfg.Log,
})
Expand Down Expand Up @@ -141,6 +141,10 @@ func (s *WindowsService) getDesktopsFromLDAP() map[string]types.WindowsDesktop {
return result
}

func (s *WindowsService) updateDesktop(ctx context.Context, desktop, _ types.WindowsDesktop) error {
return s.upsertDesktop(ctx, desktop)
}

func (s *WindowsService) upsertDesktop(ctx context.Context, d types.WindowsDesktop) error {
return s.cfg.AuthClient.UpsertWindowsDesktop(ctx, d)
}
Expand Down
4 changes: 2 additions & 2 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database)
// the resource.
// TODO(tigrato): DELETE on 15.0.0
if trace.IsAlreadyExists(err) {
return trace.Wrap(s.onDatabaseUpdate(ctx, database))
return trace.Wrap(s.onDatabaseUpdate(ctx, database, nil))
}
if err != nil {
return trace.Wrap(err)
Expand All @@ -168,7 +168,7 @@ func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database)
return nil
}

func (s *Server) onDatabaseUpdate(ctx context.Context, database types.Database) error {
func (s *Server) onDatabaseUpdate(ctx context.Context, database, _ types.Database) error {
s.Log.Debugf("Updating database %s.", database.GetName())
return trace.Wrap(s.AccessPoint.UpdateDatabase(ctx, database))
}
Expand Down
4 changes: 2 additions & 2 deletions lib/srv/discovery/kube_services_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *Server) onAppCreate(ctx context.Context, app types.Application) error {
// discovery group label to ensure the user doesn't have to manually delete
// the resource.
if trace.IsAlreadyExists(err) {
return trace.Wrap(s.onAppUpdate(ctx, app))
return trace.Wrap(s.onAppUpdate(ctx, app, nil))
}
if err != nil {
return trace.Wrap(err)
Expand All @@ -141,7 +141,7 @@ func (s *Server) onAppCreate(ctx context.Context, app types.Application) error {
return nil
}

func (s *Server) onAppUpdate(ctx context.Context, app types.Application) error {
func (s *Server) onAppUpdate(ctx context.Context, app, _ types.Application) error {
s.Log.Debugf("Updating app %s.", app.GetName())
return trace.Wrap(s.AccessPoint.UpdateApp(ctx, app))
}
Expand Down
4 changes: 2 additions & 2 deletions lib/srv/discovery/kube_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster
// the resource.
// TODO(tigrato): DELETE on 15.0.0
if trace.IsAlreadyExists(err) {
return trace.Wrap(s.onKubeUpdate(ctx, kubeCluster))
return trace.Wrap(s.onKubeUpdate(ctx, kubeCluster, nil))
}
if err != nil {
return trace.Wrap(err)
Expand All @@ -139,7 +139,7 @@ func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster
return nil
}

func (s *Server) onKubeUpdate(ctx context.Context, kubeCluster types.KubeCluster) error {
func (s *Server) onKubeUpdate(ctx context.Context, kubeCluster, _ types.KubeCluster) error {
s.Log.Debugf("Updating kube_cluster %s.", kubeCluster.GetName())
return trace.Wrap(s.AccessPoint.UpdateKubernetesCluster(ctx, kubeCluster))
}
Expand Down