Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 0 additions & 106 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1784,112 +1784,6 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
return nil
}

type remoteClustersCacheKey struct {
name string
}

// GetRemoteClusters returns a list of remote clusters
func (c *Cache) GetRemoteClusters(ctx context.Context) ([]types.RemoteCluster, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetRemoteClusters")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.remoteClusters)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
if !rg.IsCacheRead() {
cachedRemotes, err := utils.FnCacheGet(ctx, c.fnCache, remoteClustersCacheKey{}, func(ctx context.Context) ([]types.RemoteCluster, error) {
remotes, err := rg.reader.GetRemoteClusters(ctx)
return remotes, err
})
if err != nil || cachedRemotes == nil {
return nil, trace.Wrap(err)
}

remotes := make([]types.RemoteCluster, 0, len(cachedRemotes))
for _, remote := range cachedRemotes {
remotes = append(remotes, remote.Clone())
}
return remotes, nil
}
return rg.reader.GetRemoteClusters(ctx)
}

// GetRemoteCluster returns a remote cluster by name
func (c *Cache) GetRemoteCluster(ctx context.Context, clusterName string) (types.RemoteCluster, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetRemoteCluster")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.remoteClusters)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
if !rg.IsCacheRead() {
cachedRemote, err := utils.FnCacheGet(ctx, c.fnCache, remoteClustersCacheKey{clusterName}, func(ctx context.Context) (types.RemoteCluster, error) {
remote, err := rg.reader.GetRemoteCluster(ctx, clusterName)
return remote, err
})
if err != nil {
return nil, trace.Wrap(err)
}

return cachedRemote.Clone(), nil
}
rc, err := rg.reader.GetRemoteCluster(ctx, clusterName)
if trace.IsNotFound(err) && rg.IsCacheRead() {
// release read lock early
rg.Release()
// fallback is sane because this method is never used
// in construction of derivative caches.
if rc, err := c.Config.Trust.GetRemoteCluster(ctx, clusterName); err == nil {
return rc, nil
}
}
return rc, trace.Wrap(err)
}

// ListRemoteClusters returns a page of remote clusters.
func (c *Cache) ListRemoteClusters(ctx context.Context, pageSize int, nextToken string) ([]types.RemoteCluster, string, error) {
_, span := c.Tracer.Start(ctx, "cache/ListRemoteClusters")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.remoteClusters)
if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
remoteClusters, token, err := rg.reader.ListRemoteClusters(ctx, pageSize, nextToken)
return remoteClusters, token, trace.Wrap(err)
}

// GetTunnelConnections is a part of auth.Cache implementation
func (c *Cache) GetTunnelConnections(clusterName string, opts ...services.MarshalOption) ([]types.TunnelConnection, error) {
_, span := c.Tracer.Start(context.TODO(), "cache/GetTunnelConnections")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.tunnelConnections)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
return rg.reader.GetTunnelConnections(clusterName, opts...)
}

// GetAllTunnelConnections is a part of auth.Cache implementation
func (c *Cache) GetAllTunnelConnections(opts ...services.MarshalOption) (conns []types.TunnelConnection, err error) {
_, span := c.Tracer.Start(context.TODO(), "cache/GetAllTunnelConnections")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.tunnelConnections)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
return rg.reader.GetAllTunnelConnections(opts...)
}

// ListKubernetesWaitingContainers lists Kubernetes ephemeral
// containers that are waiting to be created until moderated
// session conditions are met.
Expand Down
64 changes: 0 additions & 64 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,70 +1209,6 @@ func TestRecovery(t *testing.T) {
require.Empty(t, cmp.Diff(ca2, out, cmpopts.IgnoreFields(types.Metadata{}, "Revision")))
}

// TestTunnelConnections tests tunnel connections caching
func TestTunnelConnections(t *testing.T) {
t.Parallel()

p := newTestPack(t, ForProxy)
t.Cleanup(p.Close)

clusterName := "example.com"
testResources(t, p, testFuncs[types.TunnelConnection]{
newResource: func(name string) (types.TunnelConnection, error) {
return types.NewTunnelConnection(name, types.TunnelConnectionSpecV2{
ClusterName: clusterName,
ProxyName: "p1",
LastHeartbeat: time.Now().UTC(),
})
},
create: modifyNoContext(p.trustS.UpsertTunnelConnection),
list: func(ctx context.Context) ([]types.TunnelConnection, error) {
return p.trustS.GetTunnelConnections(clusterName)
},
cacheList: func(ctx context.Context) ([]types.TunnelConnection, error) {
return p.cache.GetTunnelConnections(clusterName)
},
update: modifyNoContext(p.trustS.UpsertTunnelConnection),
deleteAll: func(ctx context.Context) error {
return p.trustS.DeleteAllTunnelConnections()
},
})
}

// TestRemoteClusters tests remote clusters caching
func TestRemoteClusters(t *testing.T) {
t.Parallel()

p := newTestPack(t, ForProxy)
t.Cleanup(p.Close)

testResources(t, p, testFuncs[types.RemoteCluster]{
newResource: func(name string) (types.RemoteCluster, error) {
return types.NewRemoteCluster(name)
},
create: func(ctx context.Context, rc types.RemoteCluster) error {
_, err := p.trustS.CreateRemoteCluster(ctx, rc)
return err
},
list: func(ctx context.Context) ([]types.RemoteCluster, error) {
return p.trustS.GetRemoteClusters(ctx)
},
cacheGet: func(ctx context.Context, name string) (types.RemoteCluster, error) {
return p.cache.GetRemoteCluster(ctx, name)
},
cacheList: func(ctx context.Context) ([]types.RemoteCluster, error) {
return p.cache.GetRemoteClusters(ctx)
},
update: func(ctx context.Context, rc types.RemoteCluster) error {
_, err := p.trustS.UpdateRemoteCluster(ctx, rc)
return err
},
deleteAll: func(ctx context.Context) error {
return p.trustS.DeleteAllRemoteClusters(ctx)
},
})
}

func mustCreateDatabase(t *testing.T, name, protocol, uri string) *types.DatabaseV3 {
database, err := types.NewDatabaseV3(
types.Metadata{
Expand Down
18 changes: 18 additions & 0 deletions lib/cache/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ type collections struct {
uiConfigs *collection[types.UIConfig, webUIConfigIndex]
installers *collection[types.Installer, installerIndex]
locks *collection[types.Lock, lockIndex]
tunnelConnections *collection[types.TunnelConnection, tunnelConnectionIndex]
remoteClusters *collection[types.RemoteCluster, remoteClusterIndex]
}

// setupCollections ensures that the appropriate [collection] is
Expand Down Expand Up @@ -552,6 +554,22 @@ func setupCollections(c Config) (*collections, error) {

out.locks = collect
out.byKind[resourceKind] = out.locks
case types.KindTunnelConnection:
collect, err := newTunnelConnectionCollection(c.Trust, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.tunnelConnections = collect
out.byKind[resourceKind] = out.tunnelConnections
case types.KindRemoteCluster:
collect, err := newRemoteClusterCollection(c.Trust, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.remoteClusters = collect
out.byKind[resourceKind] = out.remoteClusters
}
}

Expand Down
97 changes: 0 additions & 97 deletions lib/cache/legacy_collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,12 @@ type legacyCollections struct {
auditQueries collectionReader[services.SecurityAuditQueryGetter]
secReports collectionReader[services.SecurityReportGetter]
secReportsStates collectionReader[services.SecurityReportStateGetter]
tunnelConnections collectionReader[tunnelConnectionGetter]
databaseObjects collectionReader[services.DatabaseObjectsGetter]
discoveryConfigs collectionReader[services.DiscoveryConfigsGetter]
userTasks collectionReader[userTasksGetter]
kubeWaitingContainers collectionReader[kubernetesWaitingContainerGetter]
staticHostUsers collectionReader[staticHostUserGetter]
networkRestrictions collectionReader[networkRestrictionGetter]
remoteClusters collectionReader[remoteClusterGetter]
userLoginStates collectionReader[services.UserLoginStatesGetter]
dynamicWindowsDesktops collectionReader[dynamicWindowsDesktopsGetter]
provisioningStates collectionReader[provisioningStateGetter]
Expand All @@ -122,24 +120,6 @@ func setupLegacyCollections(c *Cache, watches []types.WatchKind) (*legacyCollect
for _, watch := range watches {
resourceKind := resourceKindFromWatchKind(watch)
switch watch.Kind {
case types.KindTunnelConnection:
if c.Presence == nil {
return nil, trace.BadParameter("missing parameter Presence")
}
collections.tunnelConnections = &genericCollection[types.TunnelConnection, tunnelConnectionGetter, tunnelConnectionExecutor]{
cache: c,
watch: watch,
}
collections.byKind[resourceKind] = collections.tunnelConnections
case types.KindRemoteCluster:
if c.Presence == nil {
return nil, trace.BadParameter("missing parameter Presence")
}
collections.remoteClusters = &genericCollection[types.RemoteCluster, remoteClusterGetter, remoteClusterExecutor]{
cache: c,
watch: watch,
}
collections.byKind[resourceKind] = collections.remoteClusters
case types.KindAccessRequest:
if c.DynamicAccess == nil {
return nil, trace.BadParameter("missing parameter DynamicAccess")
Expand Down Expand Up @@ -344,83 +324,6 @@ func (accessRequestExecutor) getReader(_ *Cache, _ bool) noReader {

var _ executor[types.AccessRequest, noReader] = accessRequestExecutor{}

type tunnelConnectionExecutor struct{}

func (tunnelConnectionExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]types.TunnelConnection, error) {
return cache.Trust.GetAllTunnelConnections()
}

func (tunnelConnectionExecutor) upsert(ctx context.Context, cache *Cache, resource types.TunnelConnection) error {
return cache.trustCache.UpsertTunnelConnection(resource)
}

func (tunnelConnectionExecutor) deleteAll(ctx context.Context, cache *Cache) error {
return cache.trustCache.DeleteAllTunnelConnections()
}

func (tunnelConnectionExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error {
return cache.trustCache.DeleteTunnelConnection(resource.GetSubKind(), resource.GetName())
}

func (tunnelConnectionExecutor) isSingleton() bool { return false }

func (tunnelConnectionExecutor) getReader(cache *Cache, cacheOK bool) tunnelConnectionGetter {
if cacheOK {
return cache.trustCache
}
return cache.Config.Trust
}

type tunnelConnectionGetter interface {
GetAllTunnelConnections(opts ...services.MarshalOption) (conns []types.TunnelConnection, err error)
GetTunnelConnections(clusterName string, opts ...services.MarshalOption) ([]types.TunnelConnection, error)
}

var _ executor[types.TunnelConnection, tunnelConnectionGetter] = tunnelConnectionExecutor{}

type remoteClusterExecutor struct{}

func (remoteClusterExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]types.RemoteCluster, error) {
return cache.Trust.GetRemoteClusters(ctx)
}

func (remoteClusterExecutor) upsert(ctx context.Context, cache *Cache, resource types.RemoteCluster) error {
err := cache.trustCache.DeleteRemoteCluster(ctx, resource.GetName())
if err != nil {
if !trace.IsNotFound(err) {
cache.Logger.WarnContext(ctx, "Failed to delete remote cluster", "cluster", resource.GetName(), "error", err)
return trace.Wrap(err)
}
}
_, err = cache.trustCache.CreateRemoteCluster(ctx, resource)
return trace.Wrap(err)
}

func (remoteClusterExecutor) deleteAll(ctx context.Context, cache *Cache) error {
return cache.trustCache.DeleteAllRemoteClusters(ctx)
}

func (remoteClusterExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error {
return cache.trustCache.DeleteRemoteCluster(ctx, resource.GetName())
}

func (remoteClusterExecutor) isSingleton() bool { return false }

func (remoteClusterExecutor) getReader(cache *Cache, cacheOK bool) remoteClusterGetter {
if cacheOK {
return cache.trustCache
}
return cache.Config.Trust
}

type remoteClusterGetter interface {
GetRemoteClusters(ctx context.Context) ([]types.RemoteCluster, error)
GetRemoteCluster(ctx context.Context, clusterName string) (types.RemoteCluster, error)
ListRemoteClusters(ctx context.Context, pageSize int, pageToken string) ([]types.RemoteCluster, string, error)
}

var _ executor[types.RemoteCluster, remoteClusterGetter] = remoteClusterExecutor{}

type userExecutor struct{}

func (userExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]types.User, error) {
Expand Down
Loading
Loading