diff --git a/lib/cache/cache.go b/lib/cache/cache.go index d836dbb0d21ff..b78f5de1d7e39 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -61,11 +61,9 @@ var ( cacheCollectors = []prometheus.Collector{cacheEventsReceived, cacheStaleEventsReceived} ) -const cacheTargetAuth string = "auth" - // ForAuth sets up watch configuration for the auth server func ForAuth(cfg Config) Config { - cfg.target = cacheTargetAuth + cfg.target = "auth" cfg.Watches = []types.WatchKind{ {Kind: types.KindCertAuthority, LoadSecrets: true}, {Kind: types.KindClusterName}, @@ -556,6 +554,9 @@ type Config struct { // "relative expiration" checks which are used to compensate for real backends // that have suffer from overly lazy ttl'ing of resources. RelativeExpiryCheckInterval time.Duration + // RelativeExpiryLimit determines the maximum number of nodes that may be + // removed during relative expiration. + RelativeExpiryLimit int // EventsC is a channel for event notifications, // used in tests EventsC chan Event @@ -598,11 +599,12 @@ func (c *Config) CheckAndSetDefaults() error { c.CacheInitTimeout = time.Second * 20 } if c.RelativeExpiryCheckInterval == 0 { - // TODO(fspmarshall): change this to 1/2 offline threshold once that becomes - // a configurable value. This will likely be a dynamic configuration, and - // therefore require lazy initialization after the cache has become healthy. - c.RelativeExpiryCheckInterval = apidefaults.ServerAnnounceTTL / 2 + c.RelativeExpiryCheckInterval = apidefaults.ServerKeepAliveTTL() + 5*time.Second + } + if c.RelativeExpiryLimit == 0 { + c.RelativeExpiryLimit = 2000 } + if c.Component == "" { c.Component = teleport.ComponentCache } @@ -902,11 +904,19 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim retry.Reset() - relativeExpiryInterval := interval.New(interval.Config{ - Duration: c.Config.RelativeExpiryCheckInterval, - FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval), - Jitter: utils.NewSeventhJitter(), - }) + // only enable relative node expiry if the cache is configured + // to watch for types.KindNode + relativeExpiryInterval := interval.NewNoop() + for _, watch := range c.Config.Watches { + if watch.Kind == types.KindNode { + relativeExpiryInterval = interval.New(interval.Config{ + Duration: c.Config.RelativeExpiryCheckInterval, + FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval), + Jitter: utils.NewSeventhJitter(), + }) + break + } + } defer relativeExpiryInterval.Stop() c.notify(c.ctx, Event{Type: WatcherStarted}) @@ -958,7 +968,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim } } - err = c.processEvent(ctx, event) + err = c.processEvent(ctx, event, true) if err != nil { return trace.Wrap(err) } @@ -973,7 +983,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim // staleness by only removing items which are stale from within the cache's own "frame of // reference". // -// to better understand why we use this expiry strategy, its important to understand the two +// to better understand why we use this expiry strategy, it's important to understand the two // distinct scenarios that we're trying to accommodate: // // 1. Expiry events are being emitted very lazily by the real backend (*hours* after the time @@ -993,15 +1003,8 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim // relative to our current view of the world. // // *note*: this function is only sane to call when the cache and event stream are healthy, and -// cannot run concurrently with event processing. this function injects additional events into -// the outbound event stream. +// cannot run concurrently with event processing. func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error { - if c.target != cacheTargetAuth { - // if we are not the auth cache, we are a downstream cache and can rely upon the - // upstream auth cache to perform relative expiry and propagate the changes. - return nil - } - // TODO(fspmarshall): Start using dynamic value once it is implemented. gracePeriod := apidefaults.ServerAnnounceTTL @@ -1051,20 +1054,24 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error { continue } - // event stream processing is paused while this function runs. we perform the - // actual expiry by constructing a fake delete event for the resource which both - // updates this cache, and all downstream caches. - err = c.processEvent(ctx, types.Event{ + // remove the node locally without emitting an event, other caches will + // eventually remove the same node when they run their expiry logic. + if err := c.processEvent(ctx, types.Event{ Type: types.OpDelete, Resource: &types.ResourceHeader{ Kind: types.KindNode, Metadata: node.GetMetadata(), }, - }) - if err != nil { + }, false); err != nil { return trace.Wrap(err) } - removed++ + + // high churn rates can cause purging a very large number of nodes + // per interval, limit to a sane number such that we don't overwhelm + // things or get too far out of sync with other caches. + if removed++; removed >= c.Config.RelativeExpiryLimit { + break + } } if removed > 0 { @@ -1126,7 +1133,10 @@ func (c *Cache) fetch(ctx context.Context) (apply func(ctx context.Context) erro }, nil } -func (c *Cache) processEvent(ctx context.Context, event types.Event) error { +// processEvent hands the event off to the appropriate collection for processing. Any +// resources which were not registered are ignored. If processing completed successfully +// and emit is true the event will be emitted via the fanout. +func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) error { resourceKind := resourceKindFromResource(event.Resource) collection, ok := c.collections[resourceKind] if !ok { @@ -1137,7 +1147,9 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event) error { if err := collection.processEvent(ctx, event); err != nil { return trace.Wrap(err) } - c.eventsFanout.Emit(event) + if emit { + c.eventsFanout.Emit(event) + } return nil } diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 6d7486512834b..2de23eda18bcc 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -24,6 +24,14 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "github.com/gravitational/teleport/api/client" "github.com/gravitational/teleport/api/client/proto" apidefaults "github.com/gravitational/teleport/api/defaults" @@ -37,16 +45,10 @@ import ( "github.com/gravitational/teleport/lib/services/local" "github.com/gravitational/teleport/lib/services/suite" "github.com/gravitational/teleport/lib/utils" - "github.com/gravitational/trace" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/google/uuid" - "github.com/jonboulle/clockwork" - log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" ) +const eventBufferSize = 1024 + func TestMain(m *testing.M) { utils.InitLoggerForTests() os.Exit(m.Run()) @@ -507,6 +509,20 @@ func expectEvent(t *testing.T, eventsC <-chan Event, expectedEvent string) { } } +func unexpectedEvent(t *testing.T, eventsC <-chan Event, unexpectedEvent string) { + timeC := time.After(time.Second) + for { + select { + case event := <-eventsC: + if event.Type == unexpectedEvent { + t.Fatalf("Received unexpected event: %s", unexpectedEvent) + } + case <-timeC: + return + } + } +} + func expectNextEvent(t *testing.T, eventsC <-chan Event, expectedEvent string, skipEvents ...string) { timeC := time.After(5 * time.Second) for { @@ -2314,6 +2330,96 @@ func TestRelativeExpiry(t *testing.T) { require.True(t, len(nodes) > 0, "node_count=%d", len(nodes)) } +func TestRelativeExpiryLimit(t *testing.T) { + const ( + checkInterval = time.Second + nodeCount = 100 + expiryLimit = 10 + ) + + // make sure the event buffer is much larger than node count + // so that we can batch create nodes without waiting on each event + require.True(t, int(nodeCount*3) < eventBufferSize) + + ctx := context.Background() + + clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour)) + p := newTestPack(t, func(c Config) Config { + c.RelativeExpiryCheckInterval = checkInterval + c.RelativeExpiryLimit = expiryLimit + c.Clock = clock + return ForProxy(c) + }) + t.Cleanup(p.Close) + + // add servers that expire at a range of times + now := clock.Now() + for i := 0; i < nodeCount; i++ { + exp := now.Add(time.Minute * time.Duration(i)) + server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace) + server.SetExpiry(exp) + _, err := p.presenceS.UpsertNode(ctx, server) + require.NoError(t, err) + } + + // wait for nodes to reach cache (we batch insert first for performance reasons) + for i := 0; i < nodeCount; i++ { + expectEvent(t, p.eventsC, EventProcessed) + } + + nodes, err := p.cache.GetNodes(ctx, apidefaults.Namespace) + require.NoError(t, err) + require.Len(t, nodes, nodeCount) + + clock.Advance(time.Hour * 24) + for expired := nodeCount - expiryLimit; expired > 0; expired -= expiryLimit { + // get rid of events that were emitted before clock advanced + drainEvents(p.eventsC) + // wait for next relative expiry check to run + expectEvent(t, p.eventsC, RelativeExpiry) + + // verify that the limit is respected. + nodes, err = p.cache.GetNodes(ctx, apidefaults.Namespace) + require.NoError(t, err) + require.Len(t, nodes, expired) + + // advance clock to trigger next relative expiry check + clock.Advance(time.Hour * 24) + } +} + +func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) { + clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour)) + p := newTestPack(t, func(c Config) Config { + c.RelativeExpiryCheckInterval = time.Second + c.Clock = clock + c.Watches = []types.WatchKind{{Kind: types.KindNode}} + return c + }) + t.Cleanup(p.Close) + + p2 := newTestPack(t, func(c Config) Config { + c.RelativeExpiryCheckInterval = time.Second + c.Clock = clock + c.Watches = []types.WatchKind{ + {Kind: types.KindNamespace}, + {Kind: types.KindNamespace}, + {Kind: types.KindCertAuthority}, + } + return c + }) + t.Cleanup(p2.Close) + + for i := 0; i < 2; i++ { + clock.Advance(time.Hour * 24) + drainEvents(p.eventsC) + expectEvent(t, p.eventsC, RelativeExpiry) + + drainEvents(p2.eventsC) + unexpectedEvent(t, p2.eventsC, RelativeExpiry) + } +} + func TestCache_Backoff(t *testing.T) { clock := clockwork.NewFakeClock() p := newTestPack(t, func(c Config) Config { diff --git a/lib/utils/interval/interval.go b/lib/utils/interval/interval.go index 3c26328f704a8..774c6155cc0de 100644 --- a/lib/utils/interval/interval.go +++ b/lib/utils/interval/interval.go @@ -56,6 +56,14 @@ type Config struct { Jitter utils.Jitter } +// NewNoop creates a new interval that will never fire. +func NewNoop() *Interval { + return &Interval{ + ch: make(chan time.Time, 1), + done: make(chan struct{}), + } +} + // New creates a new interval instance. This function panics on non-positive // interval durations (equivalent to time.NewTicker). func New(cfg Config) *Interval { diff --git a/lib/utils/interval/interval_test.go b/lib/utils/interval/interval_test.go new file mode 100644 index 0000000000000..f12992468561c --- /dev/null +++ b/lib/utils/interval/interval_test.go @@ -0,0 +1,35 @@ +// Copyright 2022 Gravitational, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package interval + +import ( + "testing" +) + +func TestNewNoop(t *testing.T) { + i := NewNoop() + ch := i.Next() + select { + case <-ch: + t.Fatalf("noop should not emit anything") + default: + } + i.Stop() + select { + case <-ch: + t.Fatalf("noop should not emit anything") + default: + } +}