Skip to content

Commit 5c3fd96

Browse files
rosstimothygithub-actions
authored and
github-actions
committed
Restore relative node expiry to original behavior
When relative node expiry was originally implemented it only ran on Auth and propagated delete events downstream. However, that was quickly changed in #12002 because the event system could not keep up in clusters with high churn and would result in buffer overflow errors in downstream caches. The event system has been overhauled to use FanoutV2, which no longer suffers from the burst of events causing problems. The change to not propagate delete events also breaks any node watchers on the local cache from ever expiring the server since they never receive a delete event. This reverts some of the changes from #12002 such that relative expiry now only runs on the auth cache and emits delete events. Each relative expiry interval is however still limited to only remove up to a fixed number of nodes. Closes #37527
1 parent c34fc09 commit 5c3fd96

File tree

2 files changed

+31
-28
lines changed

2 files changed

+31
-28
lines changed

lib/cache/cache.go

+23-24
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ func makeAllKnownCAsFilter() types.CertAuthorityFilter {
114114
// ForAuth sets up watch configuration for the auth server
115115
func ForAuth(cfg Config) Config {
116116
cfg.target = "auth"
117+
cfg.EnableRelativeExpiry = true
117118
cfg.Watches = []types.WatchKind{
118119
{Kind: types.KindCertAuthority, LoadSecrets: true},
119120
{Kind: types.KindClusterName},
@@ -741,6 +742,9 @@ type Config struct {
741742
// healthy even if some of the requested resource kinds aren't
742743
// supported by the event source.
743744
DisablePartialHealth bool
745+
// EnableRelativeExpiry turns on purging expired items from the cache even
746+
// if delete events have not been received from the backend.
747+
EnableRelativeExpiry bool
744748
}
745749

746750
// CheckAndSetDefaults checks parameters and sets default values
@@ -1282,18 +1286,14 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
12821286

12831287
retry.Reset()
12841288

1285-
// only enable relative node expiry if the cache is configured
1286-
// to watch for types.KindNode
1289+
// Only enable relative node expiry for the auth cache.
12871290
relativeExpiryInterval := interval.NewNoop()
1288-
for _, watch := range c.Config.Watches {
1289-
if watch.Kind == types.KindNode {
1290-
relativeExpiryInterval = interval.New(interval.Config{
1291-
Duration: c.Config.RelativeExpiryCheckInterval,
1292-
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
1293-
Jitter: retryutils.NewSeventhJitter(),
1294-
})
1295-
break
1296-
}
1291+
if c.EnableRelativeExpiry {
1292+
relativeExpiryInterval = interval.New(interval.Config{
1293+
Duration: c.Config.RelativeExpiryCheckInterval,
1294+
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
1295+
Jitter: retryutils.NewSeventhJitter(),
1296+
})
12971297
}
12981298
defer relativeExpiryInterval.Stop()
12991299

@@ -1346,8 +1346,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
13461346
}
13471347
}
13481348

1349-
err = c.processEvent(ctx, event, true)
1350-
if err != nil {
1349+
if err := c.processEvent(ctx, event); err != nil {
13511350
return trace.Wrap(err)
13521351
}
13531352
c.notify(c.ctx, Event{Event: event, Type: EventProcessed})
@@ -1440,7 +1439,7 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
14401439
Kind: types.KindNode,
14411440
Metadata: node.GetMetadata(),
14421441
},
1443-
}, false); err != nil {
1442+
}); err != nil {
14441443
return trace.Wrap(err)
14451444
}
14461445

@@ -1607,9 +1606,9 @@ func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types
16071606
}
16081607

16091608
// processEvent hands the event off to the appropriate collection for processing. Any
1610-
// resources which were not registered are ignored. If processing completed successfully
1611-
// and emit is true the event will be emitted via the fanout.
1612-
func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) error {
1609+
// resources which were not registered are ignored. If processing completed successfully,
1610+
// the event will be emitted via the fanout.
1611+
func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
16131612
resourceKind := resourceKindFromResource(event.Resource)
16141613
collection, ok := c.collections.byKind[resourceKind]
16151614
if !ok {
@@ -1619,14 +1618,14 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool)
16191618
if err := collection.processEvent(ctx, event); err != nil {
16201619
return trace.Wrap(err)
16211620
}
1622-
if emit {
1623-
c.eventsFanout.Emit(event)
1624-
if !isHighVolumeResource(resourceKind.kind) {
1625-
c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
1626-
f.Emit(event)
1627-
})
1628-
}
1621+
1622+
c.eventsFanout.Emit(event)
1623+
if !isHighVolumeResource(resourceKind.kind) {
1624+
c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
1625+
f.Emit(event)
1626+
})
16291627
}
1628+
16301629
return nil
16311630
}
16321631

lib/cache/cache_test.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -2552,6 +2552,7 @@ func TestRelativeExpiry(t *testing.T) {
25522552
}
25532553

25542554
func TestRelativeExpiryLimit(t *testing.T) {
2555+
t.Parallel()
25552556
const (
25562557
checkInterval = time.Second
25572558
nodeCount = 100
@@ -2569,7 +2570,7 @@ func TestRelativeExpiryLimit(t *testing.T) {
25692570
c.RelativeExpiryCheckInterval = checkInterval
25702571
c.RelativeExpiryLimit = expiryLimit
25712572
c.Clock = clock
2572-
return ForProxy(c)
2573+
return ForAuth(c)
25732574
})
25742575
t.Cleanup(p.Close)
25752576

@@ -2609,7 +2610,8 @@ func TestRelativeExpiryLimit(t *testing.T) {
26092610
}
26102611
}
26112612

2612-
func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
2613+
func TestRelativeExpiryOnlyForAuth(t *testing.T) {
2614+
t.Parallel()
26132615
clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour))
26142616
p := newTestPack(t, func(c Config) Config {
26152617
c.RelativeExpiryCheckInterval = time.Second
@@ -2622,9 +2624,10 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
26222624
p2 := newTestPack(t, func(c Config) Config {
26232625
c.RelativeExpiryCheckInterval = time.Second
26242626
c.Clock = clock
2627+
c.target = "llama"
26252628
c.Watches = []types.WatchKind{
26262629
{Kind: types.KindNamespace},
2627-
{Kind: types.KindNamespace},
2630+
{Kind: types.KindNode},
26282631
{Kind: types.KindCertAuthority},
26292632
}
26302633
return c
@@ -2634,8 +2637,9 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
26342637
for i := 0; i < 2; i++ {
26352638
clock.Advance(time.Hour * 24)
26362639
drainEvents(p.eventsC)
2637-
expectEvent(t, p.eventsC, RelativeExpiry)
2640+
unexpectedEvent(t, p.eventsC, RelativeExpiry)
26382641

2642+
clock.Advance(time.Hour * 24)
26392643
drainEvents(p2.eventsC)
26402644
unexpectedEvent(t, p2.eventsC, RelativeExpiry)
26412645
}

0 commit comments

Comments
 (0)