Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
420 changes: 258 additions & 162 deletions lib/cache/cache.go

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2990,11 +2990,11 @@ func testResources[T types.Resource](t *testing.T, p *testPack, funcs testFuncs[
require.Empty(t, cmp.Diff([]T{r}, out, cmpOpts...))

// Wait until the information has been replicated to the cache.
require.Eventually(t, func() bool {
require.EventuallyWithT(t, func(t *assert.CollectT) {
// Make sure the cache has a single resource in it.
out, err = funcs.cacheList(ctx)
assert.NoError(t, err)
return len(cmp.Diff([]T{r}, out, cmpOpts...)) == 0
assert.Empty(t, cmp.Diff([]T{r}, out, cmpOpts...))
}, time.Second*2, time.Millisecond*250)

// cacheGet is optional as not every resource implements it
Expand All @@ -3020,11 +3020,11 @@ func testResources[T types.Resource](t *testing.T, p *testPack, funcs testFuncs[
require.Empty(t, cmp.Diff([]T{r}, out, cmpOpts...))

// Check that information has been replicated to the cache.
require.Eventually(t, func() bool {
require.EventuallyWithT(t, func(t *assert.CollectT) {
// Make sure the cache has a single resource in it.
out, err = funcs.cacheList(ctx)
assert.NoError(t, err)
return len(cmp.Diff([]T{r}, out, cmpOpts...)) == 0
assert.Empty(t, cmp.Diff([]T{r}, out, cmpOpts...))
}, time.Second*2, time.Millisecond*250)

// Remove all service providers from the backend.
Expand Down Expand Up @@ -3060,20 +3060,20 @@ func testResources153[T types.Resource153](t *testing.T, p *testPack, funcs test
}

assertCacheContents := func(expected []T) {
require.EventuallyWithT(t, func(collect *assert.CollectT) {
require.EventuallyWithT(t, func(t *assert.CollectT) {
out, err := funcs.cacheList(ctx)
assert.NoError(collect, err)
assert.NoError(t, err)

// If the cache is expected to be empty, then test explicitly for
// *that* rather than do an equality test. An equality test here
// would be overly-pedantic about a service returning `nil` rather
// than an empty slice.
if len(expected) == 0 {
assert.Empty(collect, out)
assert.Empty(t, out)
return
}

assert.Empty(collect, cmp.Diff(expected, out, cmpOpts...))
assert.Empty(t, cmp.Diff(expected, out, cmpOpts...))
}, 2*time.Second, 10*time.Millisecond)
}

Expand Down
165 changes: 165 additions & 0 deletions lib/cache/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Teleport
// Copyright (C) 2025 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package cache

import (
"context"
"reflect"

"github.com/gravitational/trace"

"github.com/gravitational/teleport/api/types"
)

// collection is responsible for managing a cached resource.
type collection[T any, I comparable] struct {
// fetcher is called by fetch to retrieve and seed the
// store with all known resources from upstream.
fetcher func(ctx context.Context, loadSecrets bool) ([]T, error)
// store persists all resources in memory.
store *store[T, I]
// watch contains the kind of resource being monitored.
watch types.WatchKind
// headerTransform is used when handling delete events in [onDelete]. Since
// [types.OpDelete] events only contain information about the resource key,
// most event handlers only emit a [types.ResourceHeader] which has enough
// information to identify a resource. Some resources do emit a half
// populated [T], or have enough information from the key to emit a full [T].
//
// If this optional transformation is supplied it will be called when
// processing delete events before attempting to delete the resource
// from the store.
headerTransform func(hdr *types.ResourceHeader) T
// filter is an optional function used to prevent some resources
// from being persisted in the store.
filter func(T) bool
// singleton indicates if the resource should only ever have a single item.
// TODO(tross|fspmarshall|espadolini) investigate if special singleton
// behavior can be removed.
singleton bool
}

func (c collection[_, _]) watchKind() types.WatchKind {
return c.watch
}

// onDelete attempts to remove the provided resource from the store.
// An error is returned if the resource is of an unexpected type, or
// the resource is a [types.ResourceHeader] and no headerTransform was
// specified.
//
// This is a no-op if the configured filter does not return true.
func (c *collection[T, _]) onDelete(r types.Resource) error {
switch t := r.(type) {
case interface{ UnwrapT() T }:
tt := t.UnwrapT()
if c.filter != nil && !c.filter(tt) {
return nil
}

return trace.Wrap(c.store.delete(tt))
case *types.ResourceHeader:
if c.headerTransform == nil {
return trace.BadParameter("unable to convert types.ResourceHeader to %v (no transform specified, this is a bug)", reflect.TypeFor[T]())
}

tt := c.headerTransform(t)
if c.filter != nil && !c.filter(tt) {
return nil
}

return trace.Wrap(c.store.delete(tt))
case T:
if c.filter != nil && !c.filter(t) {
return nil
}

return trace.Wrap(c.store.delete(t))
default:
return trace.BadParameter("unexpected type %T (expected %v)", r, reflect.TypeFor[T]())
}
}

// onUpdate attempts to place the resource into the local store.
// An error is returned if the resource is of an unexpected type
//
// This is a no-op if the configured filter does not return true.
func (c *collection[T, _]) onPut(r types.Resource) error {
switch t := r.(type) {
case interface{ UnwrapT() T }:
tt := t.UnwrapT()
if c.filter != nil && !c.filter(tt) {
return nil
}

c.store.put(tt)
return nil
case T:
if c.filter != nil && !c.filter(t) {
return nil
}

c.store.put(t)
return nil
default:
return trace.BadParameter("unexpected type %T (expected %v)", r, reflect.TypeFor[T]())
}
}

// fetch populates the store with items received by the configured fetcher.
func (c collection[T, _]) fetch(ctx context.Context, cacheOK bool) (apply func(context.Context) error, err error) {
// Singleton objects will only get deleted or updated, not both
// TODO(tross|fspmarshall|espadolini) investigate if special singleton
// behavior can be removed.
deleteSingleton := false

var resources []T
if cacheOK {
resources, err = c.fetcher(ctx, c.watch.LoadSecrets)
if err != nil {
if !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}
deleteSingleton = true
}
}

return func(ctx context.Context) error {
// Always perform the delete if this is not a singleton, otherwise
// only perform the delete if the singleton wasn't found
// or the resource kind isn't cached in the current generation.
if !c.singleton || deleteSingleton || !cacheOK {
if err := c.store.clear(); err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
}
}
}
// If this is a singleton and we performed a deletion, return here
// because we only want to update or delete a singleton, not both.
// Also don't continue if the resource kind isn't cached in the current generation.
if c.singleton && deleteSingleton || !cacheOK {
return nil
}
for _, resource := range resources {
if err := c.store.put(resource); err != nil {
return trace.Wrap(err)
}
}
return nil
}, nil
}
Loading
Loading