Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 0 additions & 106 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
dbobjectv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/dbobject/v1"
identitycenterv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/identitycenter/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
provisioningv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/provisioning/v1"
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2"
"github.com/gravitational/teleport/api/internalutils/stream"
Expand Down Expand Up @@ -503,15 +501,12 @@ type Cache struct {
dynamicAccessCache services.DynamicAccessExt
presenceCache services.Presence
restrictionsCache services.Restrictions
databaseObjectsCache *local.DatabaseObjectService
dynamicWindowsDesktopsCache services.DynamicWindowsDesktops
userGroupsCache services.UserGroups
discoveryConfigsCache services.DiscoveryConfigs
headlessAuthenticationsCache services.HeadlessAuthenticationService
secReportsCache services.SecReports
eventsFanout *services.FanoutV2
lowVolumeEventsFanout *utils.RoundRobin[*services.FanoutV2]
kubeWaitingContsCache *local.KubeWaitingContainerService
staticHostUsersCache *local.StaticHostUserService
provisioningStatesCache *local.ProvisioningStateService
identityCenterCache *local.IdentityCenterService
Expand Down Expand Up @@ -926,24 +921,12 @@ func New(config Config) (*Cache, error) {
return nil, trace.Wrap(err)
}

databaseObjectsCache, err := local.NewDatabaseObjectService(config.Backend)
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

fanout := services.NewFanoutV2(services.FanoutV2Config{})
lowVolumeFanouts := make([]*services.FanoutV2, 0, config.FanoutShards)
for i := 0; i < config.FanoutShards; i++ {
lowVolumeFanouts = append(lowVolumeFanouts, services.NewFanoutV2(services.FanoutV2Config{}))
}

kubeWaitingContsCache, err := local.NewKubeWaitingContainerService(config.Backend)
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

staticHostUserCache, err := local.NewStaticHostUserService(config.Backend)
if err != nil {
cancel()
Expand All @@ -956,12 +939,6 @@ func New(config Config) (*Cache, error) {
return nil, trace.Wrap(err)
}

dynamicDesktopsService, err := local.NewDynamicWindowsDesktopService(config.Backend)
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

identityCenterCache, err := local.NewIdentityCenterService(local.IdentityCenterServiceConfig{
Backend: config.Backend})
if err != nil {
Expand Down Expand Up @@ -1000,15 +977,12 @@ func New(config Config) (*Cache, error) {
dynamicAccessCache: local.NewDynamicAccessService(config.Backend),
presenceCache: local.NewPresenceService(config.Backend),
restrictionsCache: local.NewRestrictionsService(config.Backend),
dynamicWindowsDesktopsCache: dynamicDesktopsService,
userGroupsCache: userGroupsCache,
discoveryConfigsCache: discoveryConfigsCache,
headlessAuthenticationsCache: identityService,
secReportsCache: secReportsCache,
databaseObjectsCache: databaseObjectsCache,
eventsFanout: fanout,
lowVolumeEventsFanout: utils.NewRoundRobin(lowVolumeFanouts),
kubeWaitingContsCache: kubeWaitingContsCache,
staticHostUsersCache: staticHostUserCache,
provisioningStatesCache: provisioningStatesCache,
identityCenterCache: identityCenterCache,
Expand Down Expand Up @@ -1766,36 +1740,6 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event) error {
return nil
}

// ListKubernetesWaitingContainers lists Kubernetes ephemeral
// containers that are waiting to be created until moderated
// session conditions are met.
func (c *Cache) ListKubernetesWaitingContainers(ctx context.Context, pageSize int, pageToken string) ([]*kubewaitingcontainerpb.KubernetesWaitingContainer, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListKubernetesWaitingContainers")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.kubeWaitingContainers)
if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
return rg.reader.ListKubernetesWaitingContainers(ctx, pageSize, pageToken)
}

// GetKubernetesWaitingContainer returns a Kubernetes ephemeral
// container that are waiting to be created until moderated
// session conditions are met.
func (c *Cache) GetKubernetesWaitingContainer(ctx context.Context, req *kubewaitingcontainerpb.GetKubernetesWaitingContainerRequest) (*kubewaitingcontainerpb.KubernetesWaitingContainer, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetKubernetesWaitingContainer")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.kubeWaitingContainers)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
return rg.reader.GetKubernetesWaitingContainer(ctx, req)
}

// ListStaticHostUsers lists static host users.
func (c *Cache) ListStaticHostUsers(ctx context.Context, pageSize int, pageToken string) ([]*userprovisioningpb.StaticHostUser, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListStaticHostUsers")
Expand All @@ -1822,30 +1766,6 @@ func (c *Cache) GetStaticHostUser(ctx context.Context, name string) (*userprovis
return rg.reader.GetStaticHostUser(ctx, name)
}

func (c *Cache) GetDatabaseObject(ctx context.Context, name string) (*dbobjectv1.DatabaseObject, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetDatabaseObject")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.databaseObjects)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
return rg.reader.GetDatabaseObject(ctx, name)
}

func (c *Cache) ListDatabaseObjects(ctx context.Context, size int, pageToken string) ([]*dbobjectv1.DatabaseObject, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListDatabaseObjects")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.databaseObjects)
if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
return rg.reader.ListDatabaseObjects(ctx, size, pageToken)
}

// GetNetworkRestrictions gets the network restrictions.
func (c *Cache) GetNetworkRestrictions(ctx context.Context) (types.NetworkRestrictions, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetNetworkRestrictions")
Expand All @@ -1860,32 +1780,6 @@ func (c *Cache) GetNetworkRestrictions(ctx context.Context) (types.NetworkRestri
return rg.reader.GetNetworkRestrictions(ctx)
}

// GetDynamicWindowsDesktop returns registered dynamic Windows desktop by name.
func (c *Cache) GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetDynamicWindowsDesktop")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.dynamicWindowsDesktops)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
return rg.reader.GetDynamicWindowsDesktop(ctx, name)
}

// ListDynamicWindowsDesktops returns all registered dynamic Windows desktop.
func (c *Cache) ListDynamicWindowsDesktops(ctx context.Context, pageSize int, nextPage string) ([]types.DynamicWindowsDesktop, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListDynamicWindowsDesktops")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.dynamicWindowsDesktops)
if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
return rg.reader.ListDynamicWindowsDesktops(ctx, pageSize, nextPage)
}

// ListDiscoveryConfigs returns a paginated list of all DiscoveryConfig resources.
func (c *Cache) ListDiscoveryConfigs(ctx context.Context, pageSize int, nextKey string) ([]*discoveryconfig.DiscoveryConfig, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListDiscoveryConfigs")
Expand Down
51 changes: 0 additions & 51 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,57 +1369,6 @@ func TestSecurityReportState(t *testing.T) {
})
}

func TestDatabaseObjects(t *testing.T) {
t.Parallel()

p := newTestPack(t, ForAuth)
t.Cleanup(p.Close)

testResources153(t, p, testFuncs153[*dbobjectv1.DatabaseObject]{
newResource: func(name string) (*dbobjectv1.DatabaseObject, error) {
return newDatabaseObject(t, name), nil
},
create: func(ctx context.Context, item *dbobjectv1.DatabaseObject) error {
_, err := p.databaseObjects.CreateDatabaseObject(ctx, item)
return trace.Wrap(err)
},
list: func(ctx context.Context) ([]*dbobjectv1.DatabaseObject, error) {
items, _, err := p.databaseObjects.ListDatabaseObjects(ctx, 0, "")
return items, trace.Wrap(err)
},
cacheList: func(ctx context.Context) ([]*dbobjectv1.DatabaseObject, error) {
items, _, err := p.databaseObjects.ListDatabaseObjects(ctx, 0, "")
return items, trace.Wrap(err)
},
deleteAll: func(ctx context.Context) error {
token := ""
var objects []*dbobjectv1.DatabaseObject

for {
resp, nextToken, err := p.databaseObjects.ListDatabaseObjects(ctx, 0, token)
if err != nil {
return err
}

objects = append(objects, resp...)

if nextToken == "" {
break
}
token = nextToken
}

for _, object := range objects {
err := p.databaseObjects.DeleteDatabaseObject(ctx, object.GetMetadata().GetName())
if err != nil {
return err
}
}
return nil
},
})
}

// TestStaticHostUsers tests that CRUD operations on static host user resources are
// replicated from the backend to the cache.
func TestStaticHostUsers(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion lib/cache/cert_authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func TestNodeCAFiltering(t *testing.T) {
DynamicAccess: p.cache.dynamicAccessCache,
Presence: p.cache.presenceCache,
Restrictions: p.cache.restrictionsCache,
DynamicWindowsDesktops: p.cache.dynamicWindowsDesktopsCache,
SAMLIdPServiceProviders: p.samlIDPServiceProviders,
UserGroups: p.userGroups,
StaticHostUsers: p.staticHostUsers,
Expand Down
38 changes: 38 additions & 0 deletions lib/cache/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
autoupdatev1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1"
clusterconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/clusterconfig/v1"
crownjewelv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/crownjewel/v1"
dbobjectv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/dbobject/v1"
healthcheckconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/healthcheckconfig/v1"
identitycenterv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/identitycenter/v1"
kubewaitingcontainerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
machineidv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
Expand Down Expand Up @@ -75,8 +77,10 @@ type collections struct {
dbServices *collection[types.DatabaseService, databaseServiceIndex]
kubeServers *collection[types.KubeServer, kubeServerIndex]
kubeClusters *collection[types.KubeCluster, kubeClusterIndex]
kubeWaitingContainers *collection[*kubewaitingcontainerv1.KubernetesWaitingContainer, kubeWaitingContainerIndex]
windowsDesktops *collection[types.WindowsDesktop, windowsDesktopIndex]
windowsDesktopServices *collection[types.WindowsDesktopService, windowsDesktopServiceIndex]
dynamicWindowsDesktops *collection[types.DynamicWindowsDesktop, dynamicWindowsDesktopIndex]
userGroups *collection[types.UserGroup, userGroupIndex]
identityCenterAccounts *collection[*identitycenterv1.Account, identityCenterAccountIndex]
identityCenterAccountAssignments *collection[*identitycenterv1.AccountAssignment, identityCenterAccountAssignmentIndex]
Expand Down Expand Up @@ -117,6 +121,8 @@ type collections struct {
remoteClusters *collection[types.RemoteCluster, remoteClusterIndex]
userTasks *collection[*usertasksv1.UserTask, userTaskIndex]
userLoginStates *collection[*userloginstate.UserLoginState, userLoginStateIndex]
gitServers *collection[types.Server, gitServerIndex]
databaseObjects *collection[*dbobjectv1.DatabaseObject, databaseObjectIndex]
}

// setupCollections ensures that the appropriate [collection] is
Expand Down Expand Up @@ -235,6 +241,14 @@ func setupCollections(c Config) (*collections, error) {

out.dbServices = collect
out.byKind[resourceKind] = out.dbServices
case types.KindDatabaseObject:
collect, err := newDatabaseObjectCollection(c.DatabaseObjects, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.databaseObjects = collect
out.byKind[resourceKind] = out.databaseObjects
case types.KindKubeServer:
collect, err := newKubernetesServerCollection(c.Presence, watch)
if err != nil {
Expand All @@ -251,6 +265,14 @@ func setupCollections(c Config) (*collections, error) {

out.kubeClusters = collect
out.byKind[resourceKind] = out.kubeClusters
case types.KindKubeWaitingContainer:
collect, err := newKubernetesWaitingContainerCollection(c.KubeWaitingContainers, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.kubeWaitingContainers = collect
out.byKind[resourceKind] = out.kubeWaitingContainers
case types.KindWindowsDesktop:
collect, err := newWindowsDesktopCollection(c.WindowsDesktops, watch)
if err != nil {
Expand All @@ -267,6 +289,14 @@ func setupCollections(c Config) (*collections, error) {

out.windowsDesktopServices = collect
out.byKind[resourceKind] = out.windowsDesktopServices
case types.KindDynamicWindowsDesktop:
collect, err := newDynamicWindowsDesktopCollection(c.DynamicWindowsDesktops, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.dynamicWindowsDesktops = collect
out.byKind[resourceKind] = out.dynamicWindowsDesktops
case types.KindUserGroup:
collect, err := newUserGroupCollection(c.UserGroups, watch)
if err != nil {
Expand Down Expand Up @@ -591,6 +621,14 @@ func setupCollections(c Config) (*collections, error) {

out.userLoginStates = collect
out.byKind[resourceKind] = out.userLoginStates
case types.KindGitServer:
collect, err := newGitServerCollection(c.GitServers, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.gitServers = collect
out.byKind[resourceKind] = out.gitServers
}
}

Expand Down
Loading
Loading