diff --git a/server/client.go b/server/client.go index 022fb00dd81..138164eb391 100644 --- a/server/client.go +++ b/server/client.go @@ -3820,24 +3820,34 @@ func (c *client) pruneDenyCache() { // prunePubPermsCache will prune the cache via randomly // deleting items. Doing so pruneSize items at a time. func (c *client) prunePubPermsCache() { - // There is a case where we can invoke this from multiple go routines, - // (in deliverMsg() if sub.client is a LEAF), so we make sure to prune - // from only one go routine at a time. - if !atomic.CompareAndSwapInt32(&c.perms.prun, 0, 1) { - return - } - const maxPruneAtOnce = 1000 - r := 0 - c.perms.pcache.Range(func(k, _ any) bool { - c.perms.pcache.Delete(k) - if r++; (r > pruneSize && atomic.LoadInt32(&c.perms.pcsz) < int32(maxPermCacheSize)) || - (r > maxPruneAtOnce) { - return false + // With parallel additions to the cache, it is possible that this function + // would not be able to reduce the cache to its max size in one go. We + // will try a few times but will release/reacquire the "lock" at each + // attempt to give a chance to another go routine to take over and not + // have this go routine do too many attempts. + for i := 0; i < 5; i++ { + // There is a case where we can invoke this from multiple go routines, + // (in deliverMsg() if sub.client is a LEAF), so we make sure to prune + // from only one go routine at a time. + if !atomic.CompareAndSwapInt32(&c.perms.prun, 0, 1) { + return } - return true - }) - atomic.AddInt32(&c.perms.pcsz, -int32(r)) - atomic.StoreInt32(&c.perms.prun, 0) + const maxPruneAtOnce = 1000 + r := 0 + c.perms.pcache.Range(func(k, _ any) bool { + c.perms.pcache.Delete(k) + if r++; (r > pruneSize && atomic.LoadInt32(&c.perms.pcsz) < int32(maxPermCacheSize)) || + (r > maxPruneAtOnce) { + return false + } + return true + }) + n := atomic.AddInt32(&c.perms.pcsz, -int32(r)) + atomic.StoreInt32(&c.perms.prun, 0) + if n <= int32(maxPermCacheSize) { + return + } + } } // pubAllowed checks on publish permissioning. diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 071bbf960aa..4d20b3a8428 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1466,20 +1466,19 @@ func TestLeafNodePubAllowedPruning(t *testing.T) { } wg.Wait() - checkFor(t, 2*time.Second, time.Millisecond, func() (rerr error) { - defer func() { - if rerr != nil { - c.pubAllowed(nats.NewInbox()) - } - }() - if n := int(atomic.LoadInt32(&c.perms.pcsz)); n > maxPermCacheSize { - return fmt.Errorf("Expected size to be less than %v, got %v", maxPermCacheSize, n) - } - if n := atomic.LoadInt32(&c.perms.prun); n != 0 { - t.Fatalf("c.perms.prun should be 0, was %v", n) - } - return nil - }) + // The cache prune function does try for a bit to make sure the cache + // is below the maxPermCacheSize value, but depending on the machine + // this runs on, it may be that it is still a bit over. If so, run + // pubAllowed one more time and we must get below. + if n := int(atomic.LoadInt32(&c.perms.pcsz)); n > maxPermCacheSize { + c.pubAllowed(nats.NewInbox()) + } + if n := int(atomic.LoadInt32(&c.perms.pcsz)); n > maxPermCacheSize { + t.Fatalf("Expected size to be less than %v, got %v", maxPermCacheSize, n) + } + if n := atomic.LoadInt32(&c.perms.prun); n != 0 { + t.Fatalf("c.perms.prun should be 0, was %v", n) + } } func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) {