From 3b2ea1c91524659f44812de75e14d326f33ddd0c Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Tue, 6 Jan 2026 20:06:28 +0000 Subject: [PATCH 1/4] Fix memory leak in access list reminder notifications Fixes a memory leak caused by variable shadowing where `nextKey` was redeclared in the pagination loop instead of being assigned. This caused the loop to always pass an empty pagination token, fetching the same page repeatedly and never terminating for tenants with more than 1000 pending access list review notifications. The fix renames the loop variable to `notificationsPageKey` to avoid shadowing and properly updates it with the next page token. Signed-off-by: Tiago Silva --- lib/auth/auth.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/auth/auth.go b/lib/auth/auth.go index e74f7d9dafca3..9ba92c7e9eb8b 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -6966,17 +6966,18 @@ func (a *Server) CreateAccessListReminderNotifications(ctx context.Context) { // Fetch all identifiers for this treshold prefix. var identifiers []*notificationsv1.UniqueNotificationIdentifier - var nextKey string + var notificationsPageKey string for { - identifiersResp, nextKey, err := a.ListUniqueNotificationIdentifiersForPrefix(ctx, threshold.prefix, 0, nextKey) + identifiersResp, nextKey, err := a.ListUniqueNotificationIdentifiersForPrefix(ctx, threshold.prefix, 0, notificationsPageKey) if err != nil { a.logger.WarnContext(ctx, "failed to list notification identifiers", "error", err, "prefix", threshold.prefix) - continue + break } identifiers = append(identifiers, identifiersResp...) if nextKey == "" { break } + notificationsPageKey = nextKey } // Create a map of identifiers for quick lookup From 99c6525d7e26ebdb44c32160d32489a215b5c327 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Tue, 6 Jan 2026 21:37:25 +0000 Subject: [PATCH 2/4] handle code review comments --- lib/auth/auth.go | 79 +++++++++++++++++++++----------------- lib/auth/auth_test.go | 89 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 130 insertions(+), 38 deletions(-) diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 9ba92c7e9eb8b..5604b0d0d8ae0 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -6866,8 +6866,31 @@ const ( accessListReminderSemaphoreMaxLeases = 1 ) +// createAccessListReminderNotificationsOptions defines the optional parameters for CreateAccessListReminderNotifications. +type createAccessListReminderNotificationsOptions struct { + createNotificationInterval time.Duration +} + +// CreateAccessListReminderNotificationsOptions is a functional option for CreateAccessListReminderNotifications. +type CreateAccessListReminderNotificationsOptions func(*createAccessListReminderNotificationsOptions) + +// WithCreateNotificationInterval sets the interval between creating notifications. +func WithCreateNotificationInterval(d time.Duration) CreateAccessListReminderNotificationsOptions { + return func(o *createAccessListReminderNotificationsOptions) { + o.createNotificationInterval = d + } +} + // CreateAccessListReminderNotifications checks if there are any access lists expiring soon and creates notifications to remind their owners if so. -func (a *Server) CreateAccessListReminderNotifications(ctx context.Context) { +func (a *Server) CreateAccessListReminderNotifications(ctx context.Context, opts ...CreateAccessListReminderNotificationsOptions) { + opt := &createAccessListReminderNotificationsOptions{ + // defaults to notificationsWriteInterval aka 40ms + createNotificationInterval: notificationsWriteInterval, + } + for _, o := range opts { + o(opt) + } + // Ensure only one auth server is running this check at a time. lease, err := services.AcquireSemaphoreLock(ctx, services.SemaphoreLockConfig{ Service: a, @@ -6900,35 +6923,21 @@ func (a *Server) CreateAccessListReminderNotifications(ctx context.Context) { // Fetch all access lists var accessLists []*accesslist.AccessList - var accessListsPageKey string - accessListsReadLimiter := time.NewTicker(accessListsPageReadInterval) - defer accessListsReadLimiter.Stop() - for { - select { - case <-accessListsReadLimiter.C: - case <-ctx.Done(): - return - } - response, nextKey, err := a.Cache.ListAccessLists(ctx, accessListsPageSize, accessListsPageKey) - if err != nil { - a.logger.WarnContext(ctx, "failed to list access lists for periodic reminder notification check", "error", err) - } - - for _, al := range response { - if !al.IsReviewable() { - continue - } - - // Only keep access lists that fall within our thresholds in memory - if al.Spec.Audit.NextAuditDate.Sub(now) <= 15*24*time.Hour { - accessLists = append(accessLists, al) - } + err = clientutils.IterateResources(ctx, a.Cache.ListAccessLists, func(al *accesslist.AccessList) error { + if !al.IsReviewable() { + return nil } - if nextKey == "" { - break + // Only keep access lists that fall within our thresholds in memory + if al.Spec.Audit.NextAuditDate.Sub(now) <= 15*24*time.Hour { + accessLists = append(accessLists, al) } - accessListsPageKey = nextKey + return nil + }) + if err != nil { + a.logger.WarnContext(ctx, "failed to list access lists for periodic reminder notification check", + "error", err) + return } reminderThresholds := []struct { @@ -6966,18 +6975,16 @@ func (a *Server) CreateAccessListReminderNotifications(ctx context.Context) { // Fetch all identifiers for this treshold prefix. var identifiers []*notificationsv1.UniqueNotificationIdentifier - var notificationsPageKey string - for { - identifiersResp, nextKey, err := a.ListUniqueNotificationIdentifiersForPrefix(ctx, threshold.prefix, 0, notificationsPageKey) + iterator := clientutils.Resources(ctx, func(ctx context.Context, pageSize int, pageKey string) ([]*notificationsv1.UniqueNotificationIdentifier, string, error) { + return a.ListUniqueNotificationIdentifiersForPrefix(ctx, threshold.prefix, pageSize, pageKey) + }) + + for identifiersResp, err := range iterator { if err != nil { a.logger.WarnContext(ctx, "failed to list notification identifiers", "error", err, "prefix", threshold.prefix) break } - identifiers = append(identifiers, identifiersResp...) - if nextKey == "" { - break - } - notificationsPageKey = nextKey + identifiers = append(identifiers, identifiersResp) } // Create a map of identifiers for quick lookup @@ -6993,7 +7000,7 @@ func (a *Server) CreateAccessListReminderNotifications(ctx context.Context) { // Check for access lists which haven't already been accounted for in a notification var needsNotification bool - writeLimiter := time.NewTicker(notificationsWriteInterval) + writeLimiter := time.NewTicker(opt.createNotificationInterval) for _, accessList := range relevantLists { select { case <-writeLimiter.C: diff --git a/lib/auth/auth_test.go b/lib/auth/auth_test.go index bced9f22d8ab2..0885d10fb3cd1 100644 --- a/lib/auth/auth_test.go +++ b/lib/auth/auth_test.go @@ -4717,7 +4717,7 @@ func TestCreateAccessListReminderNotifications(t *testing.T) { } // Run CreateAccessListReminderNotifications() - authServer.CreateAccessListReminderNotifications(ctx) + authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond)) reminderNotificationSubKind := func(n *notificationsv1.Notification) string { return n.GetSubKind() } expectedSubKinds := []string{ @@ -4735,7 +4735,7 @@ func TestCreateAccessListReminderNotifications(t *testing.T) { require.ElementsMatch(t, expectedSubKinds, slices.Map(resp.Notifications, reminderNotificationSubKind)) // Run CreateAccessListReminderNotifications() again to verify no duplicates are created - authServer.CreateAccessListReminderNotifications(ctx) + authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond)) // Check notifications again, counts should remain the same. resp, err = client.ListNotifications(ctx, ¬ificationsv1.ListNotificationsRequest{}) @@ -4801,6 +4801,91 @@ func createAccessList(t *testing.T, authServer *auth.Server, name string, opts . require.NoError(t, err) } +func TestCreateAccessListReminderNotifications_LargeOverdueSet(t *testing.T) { + ctx := t.Context() + + modulestest.SetTestModules(t, modulestest.Modules{ + TestBuildType: modules.BuildEnterprise, + TestFeatures: modules.Features{ + Entitlements: map[entitlements.EntitlementKind]modules.EntitlementInfo{ + entitlements.Identity: {Enabled: true}, + }, + }, + }) + + // Setup test auth server + testServer := newTestTLSServer(t) + authServer := testServer.Auth() + + testRole, err := types.NewRole("test", types.RoleSpecV6{ + Allow: types.RoleConditions{ + Logins: []string{"user"}, + ReviewRequests: &types.AccessReviewConditions{}, + }, + }) + require.NoError(t, err) + _, err = authServer.UpsertRole(ctx, testRole) + require.NoError(t, err) + + testUsername := "user1" + user, err := types.NewUser(testUsername) + require.NoError(t, err) + user.SetRoles([]string{"test"}) + _, err = authServer.UpsertUser(ctx, user) + require.NoError(t, err) + + // Create 2001overdue access lists + // All are overdue by 10 days, which should trigger "overdue by more than 7 days" notification + const numAccessLists = 2001 + overdueBy := -10 // 10 days overdue + nextAuditDate := authServer.GetClock().Now().Add(time.Duration(overdueBy) * 24 * time.Hour) + + for i := range numAccessLists { + createAccessList(t, authServer, fmt.Sprintf("al-overdue-%d", i), + withOwners([]accesslist.Owner{{Name: testUsername}}), + withNextAuditDate(nextAuditDate), + ) + } + + require.EventuallyWithT(t, func(t *assert.CollectT) { + lists, err := testServer.Auth().Cache.GetAccessLists(ctx) + assert.NoError(t, err) + assert.Len(t, lists, numAccessLists, "should have created all %d overdue access lists", numAccessLists) + }, 3*time.Second, 100*time.Millisecond) + + // Run CreateAccessListReminderNotifications() + authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond)) + + identifiers := collectAllUniqueNotificationIdentifiers(t, authServer, types.NotificationIdentifierPrefixAccessListOverdue7d) + require.Len(t, identifiers, numAccessLists, + "should have created unique identifiers for all %d overdue access lists", numAccessLists) + + // Run CreateAccessListReminderNotifications() again to verify it can read multiple pages of identifiers without memory leak + authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond)) + + identifiers = collectAllUniqueNotificationIdentifiers(t, authServer, types.NotificationIdentifierPrefixAccessListOverdue7d) + require.Len(t, identifiers, numAccessLists, + "should have created unique identifiers for all %d overdue access lists", numAccessLists) + +} + +func collectAllUniqueNotificationIdentifiers(t *testing.T, authServer *auth.Server, prefix string) []*notificationsv1.UniqueNotificationIdentifier { + t.Helper() + var allIdentifiers []*notificationsv1.UniqueNotificationIdentifier + var nextKey string + + for { + identifiers, next, err := authServer.ListUniqueNotificationIdentifiersForPrefix(t.Context(), prefix, 100, nextKey) + require.NoError(t, err) + allIdentifiers = append(allIdentifiers, identifiers...) + if next == "" { + break + } + nextKey = next + } + return allIdentifiers +} + func TestServer_GetAnonymizationKey(t *testing.T) { tests := []struct { name string From ffe87a3e6891037f09e6567d84d38965fea7ce44 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Tue, 6 Jan 2026 23:20:29 +0000 Subject: [PATCH 3/4] handle review feedback --- lib/auth/auth.go | 3 --- lib/auth/auth_test.go | 28 ++++++++++++++-------------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 5604b0d0d8ae0..4324e509b1122 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -191,9 +191,6 @@ const ( const ( notificationsPageReadInterval = 5 * time.Millisecond notificationsWriteInterval = 40 * time.Millisecond - - accessListsPageReadInterval = 5 * time.Millisecond - accessListsPageSize = 20 ) const ( diff --git a/lib/auth/auth_test.go b/lib/auth/auth_test.go index 0885d10fb3cd1..1780c305b36e3 100644 --- a/lib/auth/auth_test.go +++ b/lib/auth/auth_test.go @@ -4856,34 +4856,34 @@ func TestCreateAccessListReminderNotifications_LargeOverdueSet(t *testing.T) { // Run CreateAccessListReminderNotifications() authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond)) - identifiers := collectAllUniqueNotificationIdentifiers(t, authServer, types.NotificationIdentifierPrefixAccessListOverdue7d) + identifiers := collectAllUniqueNotificationIdentifiers(t, ctx, authServer, types.NotificationIdentifierPrefixAccessListOverdue7d) require.Len(t, identifiers, numAccessLists, "should have created unique identifiers for all %d overdue access lists", numAccessLists) // Run CreateAccessListReminderNotifications() again to verify it can read multiple pages of identifiers without memory leak authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond)) - identifiers = collectAllUniqueNotificationIdentifiers(t, authServer, types.NotificationIdentifierPrefixAccessListOverdue7d) + identifiers = collectAllUniqueNotificationIdentifiers(t, ctx, authServer, types.NotificationIdentifierPrefixAccessListOverdue7d) require.Len(t, identifiers, numAccessLists, "should have created unique identifiers for all %d overdue access lists", numAccessLists) - } -func collectAllUniqueNotificationIdentifiers(t *testing.T, authServer *auth.Server, prefix string) []*notificationsv1.UniqueNotificationIdentifier { +func collectAllUniqueNotificationIdentifiers(t *testing.T, ctx context.Context, authServer *auth.Server, prefix string) []*notificationsv1.UniqueNotificationIdentifier { t.Helper() - var allIdentifiers []*notificationsv1.UniqueNotificationIdentifier - var nextKey string - for { - identifiers, next, err := authServer.ListUniqueNotificationIdentifiersForPrefix(t.Context(), prefix, 100, nextKey) - require.NoError(t, err) - allIdentifiers = append(allIdentifiers, identifiers...) - if next == "" { - break + var identifiers []*notificationsv1.UniqueNotificationIdentifier + iterator := clientutils.Resources(ctx, func(ctx context.Context, pageSize int, pageKey string) ([]*notificationsv1.UniqueNotificationIdentifier, string, error) { + return authServer.ListUniqueNotificationIdentifiersForPrefix(ctx, prefix, pageSize, pageKey) + }) + + for identifiersResp, err := range iterator { + if err != nil { + require.NoError(t, err, "listing unique notification identifiers for prefix %q", prefix) } - nextKey = next + identifiers = append(identifiers, identifiersResp) } - return allIdentifiers + + return identifiers } func TestServer_GetAnonymizationKey(t *testing.T) { From 2e485958cd78bfba644ea5a14fec0b76d40bfaa8 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Wed, 7 Jan 2026 13:33:05 +0000 Subject: [PATCH 4/4] Re-introduce access list read rate limiting This PR re-introduces access list read rate limiting removed in #62649. The PR had merge enabled and was merged without addressing the feedback. Signed-off-by: Tiago Silva --- lib/auth/auth.go | 55 ++++++++++++++++++++++++++++++++----------- lib/auth/auth_test.go | 19 +++++---------- 2 files changed, 47 insertions(+), 27 deletions(-) diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 4324e509b1122..40fa7e67badac 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -191,6 +191,9 @@ const ( const ( notificationsPageReadInterval = 5 * time.Millisecond notificationsWriteInterval = 40 * time.Millisecond + + accessListsPageReadInterval = 5 * time.Millisecond + accessListsPageSize = 20 ) const ( @@ -6865,7 +6868,8 @@ const ( // createAccessListReminderNotificationsOptions defines the optional parameters for CreateAccessListReminderNotifications. type createAccessListReminderNotificationsOptions struct { - createNotificationInterval time.Duration + createNotificationInterval time.Duration + accessListsPageReadInterval time.Duration } // CreateAccessListReminderNotificationsOptions is a functional option for CreateAccessListReminderNotifications. @@ -6878,11 +6882,19 @@ func WithCreateNotificationInterval(d time.Duration) CreateAccessListReminderNot } } +// WithAccessListsPageReadInterval sets the interval between reading pages of access lists. +func WithAccessListsPageReadInterval(d time.Duration) CreateAccessListReminderNotificationsOptions { + return func(o *createAccessListReminderNotificationsOptions) { + o.accessListsPageReadInterval = d + } +} + // CreateAccessListReminderNotifications checks if there are any access lists expiring soon and creates notifications to remind their owners if so. func (a *Server) CreateAccessListReminderNotifications(ctx context.Context, opts ...CreateAccessListReminderNotificationsOptions) { opt := &createAccessListReminderNotificationsOptions{ // defaults to notificationsWriteInterval aka 40ms - createNotificationInterval: notificationsWriteInterval, + createNotificationInterval: notificationsWriteInterval, + accessListsPageReadInterval: accessListsPageReadInterval, } for _, o := range opts { o(opt) @@ -6920,21 +6932,36 @@ func (a *Server) CreateAccessListReminderNotifications(ctx context.Context, opts // Fetch all access lists var accessLists []*accesslist.AccessList - err = clientutils.IterateResources(ctx, a.Cache.ListAccessLists, func(al *accesslist.AccessList) error { - if !al.IsReviewable() { - return nil + var accessListsPageKey string + accessListsReadLimiter := time.NewTicker(opt.accessListsPageReadInterval) + defer accessListsReadLimiter.Stop() + for { + select { + case <-accessListsReadLimiter.C: + case <-ctx.Done(): + return } - // Only keep access lists that fall within our thresholds in memory - if al.Spec.Audit.NextAuditDate.Sub(now) <= 15*24*time.Hour { - accessLists = append(accessLists, al) + response, nextKey, err := a.Cache.ListAccessLists(ctx, accessListsPageSize, accessListsPageKey) + if err != nil { + a.logger.WarnContext(ctx, "failed to list access lists for periodic reminder notification check", "error", err) } - return nil - }) - if err != nil { - a.logger.WarnContext(ctx, "failed to list access lists for periodic reminder notification check", - "error", err) - return + + for _, al := range response { + if !al.IsReviewable() { + continue + } + + // Only keep access lists that fall within our thresholds in memory + if al.Spec.Audit.NextAuditDate.Sub(now) <= 15*24*time.Hour { + accessLists = append(accessLists, al) + } + } + + if nextKey == "" { + break + } + accessListsPageKey = nextKey } reminderThresholds := []struct { diff --git a/lib/auth/auth_test.go b/lib/auth/auth_test.go index 1780c305b36e3..872607eaf1d48 100644 --- a/lib/auth/auth_test.go +++ b/lib/auth/auth_test.go @@ -4834,7 +4834,7 @@ func TestCreateAccessListReminderNotifications_LargeOverdueSet(t *testing.T) { _, err = authServer.UpsertUser(ctx, user) require.NoError(t, err) - // Create 2001overdue access lists + // Create 2001 overdue access lists // All are overdue by 10 days, which should trigger "overdue by more than 7 days" notification const numAccessLists = 2001 overdueBy := -10 // 10 days overdue @@ -4854,14 +4854,14 @@ func TestCreateAccessListReminderNotifications_LargeOverdueSet(t *testing.T) { }, 3*time.Second, 100*time.Millisecond) // Run CreateAccessListReminderNotifications() - authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond)) + authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond), auth.WithAccessListsPageReadInterval(time.Nanosecond)) identifiers := collectAllUniqueNotificationIdentifiers(t, ctx, authServer, types.NotificationIdentifierPrefixAccessListOverdue7d) require.Len(t, identifiers, numAccessLists, "should have created unique identifiers for all %d overdue access lists", numAccessLists) // Run CreateAccessListReminderNotifications() again to verify it can read multiple pages of identifiers without memory leak - authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond)) + authServer.CreateAccessListReminderNotifications(ctx, auth.WithCreateNotificationInterval(time.Nanosecond), auth.WithAccessListsPageReadInterval(time.Nanosecond)) identifiers = collectAllUniqueNotificationIdentifiers(t, ctx, authServer, types.NotificationIdentifierPrefixAccessListOverdue7d) require.Len(t, identifiers, numAccessLists, @@ -4871,17 +4871,10 @@ func TestCreateAccessListReminderNotifications_LargeOverdueSet(t *testing.T) { func collectAllUniqueNotificationIdentifiers(t *testing.T, ctx context.Context, authServer *auth.Server, prefix string) []*notificationsv1.UniqueNotificationIdentifier { t.Helper() - var identifiers []*notificationsv1.UniqueNotificationIdentifier - iterator := clientutils.Resources(ctx, func(ctx context.Context, pageSize int, pageKey string) ([]*notificationsv1.UniqueNotificationIdentifier, string, error) { + identifiers, err := stream.Collect(clientutils.Resources(ctx, func(ctx context.Context, pageSize int, pageKey string) ([]*notificationsv1.UniqueNotificationIdentifier, string, error) { return authServer.ListUniqueNotificationIdentifiersForPrefix(ctx, prefix, pageSize, pageKey) - }) - - for identifiersResp, err := range iterator { - if err != nil { - require.NoError(t, err, "listing unique notification identifiers for prefix %q", prefix) - } - identifiers = append(identifiers, identifiersResp) - } + })) + require.NoError(t, err, "listing unique notification identifiers for prefix %q", prefix) return identifiers }