Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 0 additions & 80 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2409,86 +2409,6 @@ func (c *Cache) GetProxies() ([]types.Server, error) {
return rg.reader.GetProxies()
}

type remoteClustersCacheKey struct {
name string
}

// GetRemoteClusters returns a list of remote clusters
func (c *Cache) GetRemoteClusters(ctx context.Context) ([]types.RemoteCluster, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetRemoteClusters")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.remoteClusters)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
if !rg.IsCacheRead() {
cachedRemotes, err := utils.FnCacheGet(ctx, c.fnCache, remoteClustersCacheKey{}, func(ctx context.Context) ([]types.RemoteCluster, error) {
remotes, err := rg.reader.GetRemoteClusters(ctx)
return remotes, err
})
if err != nil || cachedRemotes == nil {
return nil, trace.Wrap(err)
}

remotes := make([]types.RemoteCluster, 0, len(cachedRemotes))
for _, remote := range cachedRemotes {
remotes = append(remotes, remote.Clone())
}
return remotes, nil
}
return rg.reader.GetRemoteClusters(ctx)
}

// GetRemoteCluster returns a remote cluster by name
func (c *Cache) GetRemoteCluster(ctx context.Context, clusterName string) (types.RemoteCluster, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetRemoteCluster")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.remoteClusters)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
if !rg.IsCacheRead() {
cachedRemote, err := utils.FnCacheGet(ctx, c.fnCache, remoteClustersCacheKey{clusterName}, func(ctx context.Context) (types.RemoteCluster, error) {
remote, err := rg.reader.GetRemoteCluster(ctx, clusterName)
return remote, err
})
if err != nil {
return nil, trace.Wrap(err)
}

return cachedRemote.Clone(), nil
}
rc, err := rg.reader.GetRemoteCluster(ctx, clusterName)
if trace.IsNotFound(err) && rg.IsCacheRead() {
// release read lock early
rg.Release()
// fallback is sane because this method is never used
// in construction of derivative caches.
if rc, err := c.Config.Trust.GetRemoteCluster(ctx, clusterName); err == nil {
return rc, nil
}
}
return rc, trace.Wrap(err)
}

// ListRemoteClusters returns a page of remote clusters.
func (c *Cache) ListRemoteClusters(ctx context.Context, pageSize int, nextToken string) ([]types.RemoteCluster, string, error) {
_, span := c.Tracer.Start(ctx, "cache/ListRemoteClusters")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.remoteClusters)
if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
remoteClusters, token, err := rg.reader.ListRemoteClusters(ctx, pageSize, nextToken)
return remoteClusters, token, trace.Wrap(err)
}

// GetUser is a part of auth.Cache implementation.
func (c *Cache) GetUser(ctx context.Context, name string, withSecrets bool) (types.User, error) {
_, span := c.Tracer.Start(ctx, "cache/GetUser")
Expand Down
34 changes: 0 additions & 34 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,40 +1852,6 @@ func TestAuthServers(t *testing.T) {
})
}

// TestRemoteClusters tests remote clusters caching
func TestRemoteClusters(t *testing.T) {
t.Parallel()

p := newTestPack(t, ForProxy)
t.Cleanup(p.Close)

testResources(t, p, testFuncs[types.RemoteCluster]{
newResource: func(name string) (types.RemoteCluster, error) {
return types.NewRemoteCluster(name)
},
create: func(ctx context.Context, rc types.RemoteCluster) error {
_, err := p.trustS.CreateRemoteCluster(ctx, rc)
return err
},
list: func(ctx context.Context) ([]types.RemoteCluster, error) {
return p.trustS.GetRemoteClusters(ctx)
},
cacheGet: func(ctx context.Context, name string) (types.RemoteCluster, error) {
return p.cache.GetRemoteCluster(ctx, name)
},
cacheList: func(ctx context.Context) ([]types.RemoteCluster, error) {
return p.cache.GetRemoteClusters(ctx)
},
update: func(ctx context.Context, rc types.RemoteCluster) error {
_, err := p.trustS.UpdateRemoteCluster(ctx, rc)
return err
},
deleteAll: func(ctx context.Context) error {
return p.trustS.DeleteAllRemoteClusters(ctx)
},
})
}

// TestKubernetes tests that CRUD operations on kubernetes clusters resources are
// replicated from the backend to the cache.
func TestKubernetes(t *testing.T) {
Expand Down
12 changes: 10 additions & 2 deletions lib/cache/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type collectionHandler interface {
type collections struct {
byKind map[resourceKind]collectionHandler

botInstances *collection[*machineidv1.BotInstance, botInstanceIndex]
botInstances *collection[*machineidv1.BotInstance, botInstanceIndex]
remoteClusters *collection[types.RemoteCluster, remoteClusterIndex]
}

// isKnownUncollectedKind is true if a resource kind is not stored in
Expand Down Expand Up @@ -87,13 +88,20 @@ func setupCollections(c Config, legacyCollections map[resourceKind]legacyCollect

out.botInstances = collect
out.byKind[resourceKind] = out.botInstances
case types.KindRemoteCluster:
collect, err := newRemoteClusterCollection(c.Trust, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.remoteClusters = collect
out.byKind[resourceKind] = out.remoteClusters
default:
_, legacyOk := legacyCollections[resourceKind]
if _, ok := out.byKind[resourceKind]; !ok && !legacyOk {
return nil, trace.BadParameter("resource %q is not supported", watch.Kind)
}
}

}

return out, nil
Expand Down
9 changes: 0 additions & 9 deletions lib/cache/legacy_collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,15 +356,6 @@ func setupLegacyCollections(c *Cache, watches []types.WatchKind) (*legacyCollect
watch: watch,
}
collections.byKind[resourceKind] = collections.tunnelConnections
case types.KindRemoteCluster:
if c.Presence == nil {
return nil, trace.BadParameter("missing parameter Presence")
}
collections.remoteClusters = &genericCollection[types.RemoteCluster, remoteClusterGetter, remoteClusterExecutor]{
cache: c,
watch: watch,
}
collections.byKind[resourceKind] = collections.remoteClusters
case types.KindAppServer:
if c.Presence == nil {
return nil, trace.BadParameter("missing parameter Presence")
Expand Down
166 changes: 166 additions & 0 deletions lib/cache/remote_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Teleport
// Copyright (C) 2025 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package cache

import (
"context"

"github.com/gravitational/trace"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/clientutils"
"github.com/gravitational/teleport/lib/itertools/stream"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
)

type remoteClusterIndex string

const remoteClusterNameIndex remoteClusterIndex = "name"

func newRemoteClusterCollection(upstream services.Trust, w types.WatchKind) (*collection[types.RemoteCluster, remoteClusterIndex], error) {
if upstream == nil {
return nil, trace.BadParameter("missing parameter Trust")
}

return &collection[types.RemoteCluster, remoteClusterIndex]{
store: newStore(
types.KindRemoteCluster,
types.RemoteCluster.Clone,
map[remoteClusterIndex]func(types.RemoteCluster) string{
remoteClusterNameIndex: types.RemoteCluster.GetName,
}),
fetcher: func(ctx context.Context, loadSecrets bool) ([]types.RemoteCluster, error) {
out, err := stream.Collect(clientutils.Resources(ctx, upstream.ListRemoteClusters))
return out, trace.Wrap(err)
},
headerTransform: func(hdr *types.ResourceHeader) types.RemoteCluster {
return &types.RemoteClusterV3{
Kind: hdr.Kind,
Version: hdr.Version,
Metadata: types.Metadata{
Name: hdr.Metadata.Name,
},
}
},
watch: w,
}, nil
}

type remoteClustersCacheKey struct {
name string
}

// GetRemoteClusters returns a list of remote clusters
func (c *Cache) GetRemoteClusters(ctx context.Context) ([]types.RemoteCluster, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetRemoteClusters")
defer span.End()

rg, err := acquireReadGuard(c, c.collections.remoteClusters)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()

if rg.ReadCache() {
remotes := make([]types.RemoteCluster, 0, rg.store.len())
for rc := range rg.store.resources(remoteClusterNameIndex, "", "") {
remotes = append(remotes, rc.Clone())
}

return remotes, nil
}

cachedRemotes, err := utils.FnCacheGet(ctx, c.fnCache, remoteClustersCacheKey{}, func(ctx context.Context) ([]types.RemoteCluster, error) {
var out []types.RemoteCluster
var startKey string

for {
clusters, next, err := c.Config.Trust.ListRemoteClusters(ctx, 0, startKey)
if err != nil {
return nil, trace.Wrap(err)
}

out = append(out, clusters...)
startKey = next
if next == "" {
break
}
}

return out, nil
})
if err != nil || cachedRemotes == nil {
return nil, trace.Wrap(err)
}

remotes := make([]types.RemoteCluster, 0, len(cachedRemotes))
for _, remote := range cachedRemotes {
remotes = append(remotes, remote.Clone())
}
return remotes, nil
}

// GetRemoteCluster returns a remote cluster by name
func (c *Cache) GetRemoteCluster(ctx context.Context, clusterName string) (types.RemoteCluster, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetRemoteCluster")
defer span.End()

var upstreamRead bool
getter := genericGetter[types.RemoteCluster, remoteClusterIndex]{
cache: c,
collection: c.collections.remoteClusters,
index: remoteClusterNameIndex,
upstreamGet: func(ctx context.Context, clusterName string) (types.RemoteCluster, error) {
upstreamRead = true
cachedRemote, err := utils.FnCacheGet(ctx, c.fnCache, remoteClustersCacheKey{clusterName}, func(ctx context.Context) (types.RemoteCluster, error) {
remote, err := c.Config.Trust.GetRemoteCluster(ctx, clusterName)
return remote, err
})
if err != nil {
return nil, trace.Wrap(err)
}

return cachedRemote.Clone(), nil
},
}
out, err := getter.get(ctx, clusterName)
if trace.IsNotFound(err) && !upstreamRead {
// fallback is sane because this method is never used
// in construction of derivative caches.
if rc, err := c.Config.Trust.GetRemoteCluster(ctx, clusterName); err == nil {
return rc, nil
}
}
return out, trace.Wrap(err)
}

// ListRemoteClusters returns a page of remote clusters.
func (c *Cache) ListRemoteClusters(ctx context.Context, pageSize int, nextToken string) ([]types.RemoteCluster, string, error) {
_, span := c.Tracer.Start(ctx, "cache/ListRemoteClusters")
defer span.End()

lister := genericLister[types.RemoteCluster, remoteClusterIndex]{
cache: c,
collection: c.collections.remoteClusters,
index: remoteClusterNameIndex,
upstreamList: c.Config.Trust.ListRemoteClusters,
nextToken: types.RemoteCluster.GetName,
}
out, next, err := lister.list(ctx, pageSize, nextToken)
return out, next, trace.Wrap(err)
}
Loading
Loading