Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore relative node expiry to original behavior #38258

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Restore relative node expiry to original behavior
When relative node expiry was originally implemented it only ran
on Auth and propagated delete events downstream. However, that
was quickly changed in #12002 because the event system could not
keep up in clusters with high churn and would result in buffer
overflow errors in downstream caches. The event system has been
overhauled to use FanoutV2, which no longer suffers from the
burst of events causing problems. The change to not propagate
delete events also breaks any node watchers on the local
cache from ever expiring the server since they never receive
a delete event.

This reverts some of the changes from #12002 such that relative
expiry now only runs on the auth cache and emits delete events.
Each relative expiry interval is however still limited to only
remove up to a fixed number of nodes.

Closes #37527
rosstimothy committed Feb 15, 2024

Verified

This commit was signed with the committer’s verified signature.
commit 78caa9a39a40b0878347f39444d5421d2054d6ed
47 changes: 23 additions & 24 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
@@ -114,6 +114,7 @@ func makeAllKnownCAsFilter() types.CertAuthorityFilter {
// ForAuth sets up watch configuration for the auth server
func ForAuth(cfg Config) Config {
cfg.target = "auth"
cfg.EnableRelativeExpiry = true
cfg.Watches = []types.WatchKind{
{Kind: types.KindCertAuthority, LoadSecrets: true},
{Kind: types.KindClusterName},
@@ -741,6 +742,9 @@ type Config struct {
// healthy even if some of the requested resource kinds aren't
// supported by the event source.
DisablePartialHealth bool
// EnableRelativeExpiry turns on purging expired items from the cache even
// if delete events have not been received from the backend.
EnableRelativeExpiry bool
}

// CheckAndSetDefaults checks parameters and sets default values
@@ -1282,18 +1286,14 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer

retry.Reset()

// only enable relative node expiry if the cache is configured
// to watch for types.KindNode
// Only enable relative node expiry for the auth cache.
relativeExpiryInterval := interval.NewNoop()
for _, watch := range c.Config.Watches {
if watch.Kind == types.KindNode {
relativeExpiryInterval = interval.New(interval.Config{
Duration: c.Config.RelativeExpiryCheckInterval,
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
Jitter: retryutils.NewSeventhJitter(),
})
break
}
if c.EnableRelativeExpiry {
relativeExpiryInterval = interval.New(interval.Config{
Duration: c.Config.RelativeExpiryCheckInterval,
FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval),
Jitter: retryutils.NewSeventhJitter(),
})
}
defer relativeExpiryInterval.Stop()

@@ -1346,8 +1346,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
}
}

err = c.processEvent(ctx, event, true)
if err != nil {
if err := c.processEvent(ctx, event); err != nil {
return trace.Wrap(err)
}
c.notify(c.ctx, Event{Event: event, Type: EventProcessed})
@@ -1440,7 +1439,7 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
Kind: types.KindNode,
Metadata: node.GetMetadata(),
},
}, false); err != nil {
}); err != nil {
return trace.Wrap(err)
}

@@ -1607,9 +1606,9 @@ func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types
}

// processEvent hands the event off to the appropriate collection for processing. Any
// resources which were not registered are ignored. If processing completed successfully
// and emit is true the event will be emitted via the fanout.
func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) error {
// resources which were not registered are ignored. If processing completed successfully,
// the event will be emitted via the fanout.
func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
resourceKind := resourceKindFromResource(event.Resource)
collection, ok := c.collections.byKind[resourceKind]
if !ok {
@@ -1619,14 +1618,14 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool)
if err := collection.processEvent(ctx, event); err != nil {
return trace.Wrap(err)
}
if emit {
c.eventsFanout.Emit(event)
if !isHighVolumeResource(resourceKind.kind) {
c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
f.Emit(event)
})
}

c.eventsFanout.Emit(event)
if !isHighVolumeResource(resourceKind.kind) {
c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) {
f.Emit(event)
})
}

return nil
}

12 changes: 8 additions & 4 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -2552,6 +2552,7 @@ func TestRelativeExpiry(t *testing.T) {
}

func TestRelativeExpiryLimit(t *testing.T) {
t.Parallel()
const (
checkInterval = time.Second
nodeCount = 100
@@ -2569,7 +2570,7 @@ func TestRelativeExpiryLimit(t *testing.T) {
c.RelativeExpiryCheckInterval = checkInterval
c.RelativeExpiryLimit = expiryLimit
c.Clock = clock
return ForProxy(c)
return ForAuth(c)
})
t.Cleanup(p.Close)

@@ -2609,7 +2610,8 @@ func TestRelativeExpiryLimit(t *testing.T) {
}
}

func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
func TestRelativeExpiryOnlyForAuth(t *testing.T) {
t.Parallel()
clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour))
p := newTestPack(t, func(c Config) Config {
c.RelativeExpiryCheckInterval = time.Second
@@ -2622,9 +2624,10 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
p2 := newTestPack(t, func(c Config) Config {
c.RelativeExpiryCheckInterval = time.Second
c.Clock = clock
c.target = "llama"
c.Watches = []types.WatchKind{
{Kind: types.KindNamespace},
{Kind: types.KindNamespace},
{Kind: types.KindNode},
{Kind: types.KindCertAuthority},
}
return c
@@ -2634,8 +2637,9 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) {
for i := 0; i < 2; i++ {
clock.Advance(time.Hour * 24)
drainEvents(p.eventsC)
expectEvent(t, p.eventsC, RelativeExpiry)
unexpectedEvent(t, p.eventsC, RelativeExpiry)

clock.Advance(time.Hour * 24)
drainEvents(p2.eventsC)
unexpectedEvent(t, p2.eventsC, RelativeExpiry)
}