Skip to content

Prevent relative expiry from emitting more events than can be processed #12002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,9 @@ var (
cacheCollectors = []prometheus.Collector{cacheEventsReceived, cacheStaleEventsReceived}
)

const cacheTargetAuth string = "auth"

// ForAuth sets up watch configuration for the auth server
func ForAuth(cfg Config) Config {
cfg.target = cacheTargetAuth
cfg.target = "auth"
cfg.Watches = []types.WatchKind{
{Kind: types.KindCertAuthority, LoadSecrets: true},
{Kind: types.KindClusterName},
Expand Down Expand Up @@ -556,6 +554,9 @@ type Config struct {
// "relative expiration" checks which are used to compensate for real backends
// that have suffer from overly lazy ttl'ing of resources.
RelativeExpiryCheckInterval time.Duration
// RelativeExpiryLimit determines the maximum number of nodes that may be
// removed during relative expiration.
RelativeExpiryLimit int
// EventsC is a channel for event notifications,
// used in tests
EventsC chan Event
Expand Down Expand Up @@ -598,11 +599,12 @@ func (c *Config) CheckAndSetDefaults() error {
c.CacheInitTimeout = time.Second * 20
}
if c.RelativeExpiryCheckInterval == 0 {
// TODO(fspmarshall): change this to 1/2 offline threshold once that becomes
// a configurable value. This will likely be a dynamic configuration, and
// therefore require lazy initialization after the cache has become healthy.
c.RelativeExpiryCheckInterval = apidefaults.ServerAnnounceTTL / 2
c.RelativeExpiryCheckInterval = apidefaults.ServerKeepAliveTTL() + 5*time.Second
}
if c.RelativeExpiryLimit == 0 {
c.RelativeExpiryLimit = 2000
}

if c.Component == "" {
c.Component = teleport.ComponentCache
}
Expand Down Expand Up @@ -973,7 +975,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
// staleness by only removing items which are stale from within the cache's own "frame of
// reference".
//
// to better understand why we use this expiry strategy, its important to understand the two
// to better understand why we use this expiry strategy, it's important to understand the two
// distinct scenarios that we're trying to accommodate:
//
// 1. Expiry events are being emitted very lazily by the real backend (*hours* after the time
Expand All @@ -996,12 +998,6 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry utils.Retry, timer *tim
// cannot run concurrently with event processing. this function injects additional events into
// the outbound event stream.
func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
if c.target != cacheTargetAuth {
// if we are not the auth cache, we are a downstream cache and can rely upon the
// upstream auth cache to perform relative expiry and propagate the changes.
return nil
}

// TODO(fspmarshall): Start using dynamic value once it is implemented.
gracePeriod := apidefaults.ServerAnnounceTTL

Expand Down Expand Up @@ -1051,20 +1047,18 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
continue
}

// event stream processing is paused while this function runs. we perform the
// actual expiry by constructing a fake delete event for the resource which both
// updates this cache, and all downstream caches.
err = c.processEvent(ctx, types.Event{
Type: types.OpDelete,
Resource: &types.ResourceHeader{
Kind: types.KindNode,
Metadata: node.GetMetadata(),
},
})
if err != nil {
// remove the node locally without emitting an event, other caches will
// eventually remove the same node when they run their expiry logic.
if err := c.presenceCache.DeleteNode(ctx, apidefaults.Namespace, node.GetName()); err != nil {
return trace.Wrap(err)
}
removed++

// high churn rates can cause purging a very large number of nodes
// per interval, limit to a sane number such that we don't overwhelm
// things or get too far out of sync with other caches.
if removed++; removed >= c.Config.RelativeExpiryLimit {
break
}
}

if removed > 0 {
Expand Down
75 changes: 67 additions & 8 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
Expand All @@ -37,14 +45,6 @@ import (
"github.com/gravitational/teleport/lib/services/local"
"github.com/gravitational/teleport/lib/services/suite"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

const eventBufferSize = 1024
Expand Down Expand Up @@ -2323,6 +2323,65 @@ func TestRelativeExpiry(t *testing.T) {
require.True(t, len(nodes) > 0, "node_count=%d", len(nodes))
}

func TestRelativeExpiryLimit(t *testing.T) {
const (
checkInterval = time.Second
nodeCount = 100
expiryLimit = 10
)

// make sure the event buffer is much larger than node count
// so that we can batch create nodes without waiting on each event
require.True(t, int(nodeCount*3) < eventBufferSize)

ctx := context.Background()

clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour))
p := newTestPack(t, func(c Config) Config {
c.RelativeExpiryCheckInterval = checkInterval
c.RelativeExpiryLimit = expiryLimit
c.Clock = clock
return ForProxy(c)
})
t.Cleanup(p.Close)

// add servers that expire at a range of times
now := clock.Now()
for i := 0; i < nodeCount; i++ {
exp := now.Add(time.Minute * time.Duration(i))
server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace)
server.SetExpiry(exp)
_, err := p.presenceS.UpsertNode(ctx, server)
require.NoError(t, err)
}

// wait for nodes to reach cache (we batch insert first for performance reasons)
for i := 0; i < nodeCount; i++ {
expectEvent(t, p.eventsC, EventProcessed)
}

nodes, err := p.cache.GetNodes(ctx, apidefaults.Namespace)
require.NoError(t, err)
require.Len(t, nodes, nodeCount)

clock.Advance(time.Hour * 24)
for expired := nodeCount - expiryLimit; expired > 0; expired -= expiryLimit {
// get rid of events that were emitted before clock advanced
drainEvents(p.eventsC)
// wait for next relative expiry check to run
expectEvent(t, p.eventsC, RelativeExpiry)

// verify that the limit is respected.
nodes, err = p.cache.GetNodes(ctx, apidefaults.Namespace)
require.NoError(t, err)
require.Len(t, nodes, expired)

// advance clock to trigger next relative expiry check
clock.Advance(time.Hour * 24)
}

}

func TestCache_Backoff(t *testing.T) {
clock := clockwork.NewFakeClock()
p := newTestPack(t, func(c Config) Config {
Expand Down