Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
dbobjectv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/dbobject/v1"
identitycenterv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/identitycenter/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
provisioningv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/provisioning/v1"
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
Expand Down Expand Up @@ -535,7 +534,6 @@ type Cache struct {
eventsFanout *services.FanoutV2
lowVolumeEventsFanout *utils.RoundRobin[*services.FanoutV2]
kubeWaitingContsCache *local.KubeWaitingContainerService
notificationsCache services.Notifications
accessMontoringRuleCache services.AccessMonitoringRules
staticHostUsersCache *local.StaticHostUserService
provisioningStatesCache *local.ProvisioningStateService
Expand Down Expand Up @@ -994,12 +992,6 @@ func New(config Config) (*Cache, error) {
return nil, trace.Wrap(err)
}

notificationsCache, err := local.NewNotificationsService(config.Backend, config.Clock)
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

accessMonitoringRuleCache, err := local.NewAccessMonitoringRulesService(config.Backend)
if err != nil {
cancel()
Expand Down Expand Up @@ -1106,7 +1098,6 @@ func New(config Config) (*Cache, error) {
userLoginStateCache: userLoginStatesCache,
accessListCache: accessListCache,
databaseObjectsCache: databaseObjectsCache,
notificationsCache: notificationsCache,
eventsFanout: fanout,
lowVolumeEventsFanout: utils.NewRoundRobin(lowVolumeFanouts),
kubeWaitingContsCache: kubeWaitingContsCache,
Expand Down Expand Up @@ -2981,36 +2972,6 @@ func (c *Cache) ListAccessListReviews(ctx context.Context, accessList string, pa
return rg.reader.ListAccessListReviews(ctx, accessList, pageSize, pageToken)
}

// ListUserNotifications returns a paginated list of user-specific notifications for all users.
func (c *Cache) ListUserNotifications(ctx context.Context, pageSize int, startKey string) ([]*notificationsv1.Notification, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListUserNotifications")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.userNotifications)
if err != nil {
return nil, "", trace.Wrap(err)
}

defer rg.Release()

out, nextKey, err := rg.reader.ListUserNotifications(ctx, pageSize, startKey)
return out, nextKey, trace.Wrap(err)
}

// ListGlobalNotifications returns a paginated list of global notifications.
func (c *Cache) ListGlobalNotifications(ctx context.Context, pageSize int, startKey string) ([]*notificationsv1.GlobalNotification, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListGlobalNotifications")
defer span.End()

rg, err := readLegacyCollectionCache(c, c.legacyCacheCollections.globalNotifications)
if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
out, nextKey, err := rg.reader.ListGlobalNotifications(ctx, pageSize, startKey)
return out, nextKey, trace.Wrap(err)
}

// ListAccessMonitoringRules returns a paginated list of access monitoring rules.
func (c *Cache) ListAccessMonitoringRules(ctx context.Context, pageSize int, nextToken string) ([]*accessmonitoringrulesv1.AccessMonitoringRule, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListAccessMonitoringRules")
Expand Down
56 changes: 0 additions & 56 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,34 +2035,6 @@ func TestAccessListReviews(t *testing.T) {
})
}

// TestUserNotifications tests that CRUD operations on user notification resources are
// replicated from the backend to the cache.
func TestUserNotifications(t *testing.T) {
t.Parallel()

p := newTestPack(t, ForAuth)
t.Cleanup(p.Close)

testResources153(t, p, testFuncs153[*notificationsv1.Notification]{
newResource: func(name string) (*notificationsv1.Notification, error) {
return newUserNotification(t, name), nil
},
create: func(ctx context.Context, item *notificationsv1.Notification) error {
_, err := p.notifications.CreateUserNotification(ctx, item)
return trace.Wrap(err)
},
list: func(ctx context.Context) ([]*notificationsv1.Notification, error) {
items, _, err := p.notifications.ListUserNotifications(ctx, 0, "")
return items, trace.Wrap(err)
},
cacheList: func(ctx context.Context) ([]*notificationsv1.Notification, error) {
items, _, err := p.cache.ListUserNotifications(ctx, 0, "")
return items, trace.Wrap(err)
},
deleteAll: p.notifications.DeleteAllUserNotifications,
})
}

// TestCrownJewel tests that CRUD operations on user notification resources are
// replicated from the backend to the cache.
func TestCrownJewel(t *testing.T) {
Expand Down Expand Up @@ -2250,34 +2222,6 @@ func TestAutoUpdateAgentRollout(t *testing.T) {
})
}

// TestGlobalNotifications tests that CRUD operations on global notification resources are
// replicated from the backend to the cache.
func TestGlobalNotifications(t *testing.T) {
t.Parallel()

p := newTestPack(t, ForAuth)
t.Cleanup(p.Close)

testResources153(t, p, testFuncs153[*notificationsv1.GlobalNotification]{
newResource: func(name string) (*notificationsv1.GlobalNotification, error) {
return newGlobalNotification(t, name), nil
},
create: func(ctx context.Context, item *notificationsv1.GlobalNotification) error {
_, err := p.notifications.CreateGlobalNotification(ctx, item)
return trace.Wrap(err)
},
list: func(ctx context.Context) ([]*notificationsv1.GlobalNotification, error) {
items, _, err := p.notifications.ListGlobalNotifications(ctx, 0, "")
return items, trace.Wrap(err)
},
cacheList: func(ctx context.Context) ([]*notificationsv1.GlobalNotification, error) {
items, _, err := p.cache.ListGlobalNotifications(ctx, 0, "")
return items, trace.Wrap(err)
},
deleteAll: p.notifications.DeleteAllGlobalNotifications,
})
}

// TestStaticHostUsers tests that CRUD operations on static host user resources are
// replicated from the backend to the cache.
func TestStaticHostUsers(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions lib/cache/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
healthcheckconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/healthcheckconfig/v1"
identitycenterv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/identitycenter/v1"
machineidv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
workloadidentityv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/workloadidentity/v1"
"github.com/gravitational/teleport/api/types"
)
Expand Down Expand Up @@ -75,6 +76,8 @@ type collections struct {
reverseTunnels *collection[types.ReverseTunnel]
spiffeFederations *collection[*machineidv1.SPIFFEFederation]
workloadIdentity *collection[*workloadidentityv1.WorkloadIdentity]
userNotifications *collection[*notificationsv1.Notification]
globalNotifications *collection[*notificationsv1.GlobalNotification]
}

// setupCollections ensures that the appropriate [collection] is
Expand Down Expand Up @@ -273,6 +276,22 @@ func setupCollections(c Config) (*collections, error) {

out.workloadIdentity = collect
out.byKind[resourceKind] = out.workloadIdentity
case types.KindNotification:
collect, err := newUserNotificationCollection(c.Notifications, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.userNotifications = collect
out.byKind[resourceKind] = out.userNotifications
case types.KindGlobalNotification:
collect, err := newGlobalNotificationCollection(c.Notifications, watch)
if err != nil {
return nil, trace.Wrap(err)
}

out.globalNotifications = collect
out.byKind[resourceKind] = out.globalNotifications
}
}

Expand Down
141 changes: 0 additions & 141 deletions lib/cache/legacy_collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
dbobjectv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/dbobject/v1"
identitycenterv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/identitycenter/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
provisioningv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/provisioning/v1"
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
Expand Down Expand Up @@ -143,9 +142,7 @@ type legacyCollections struct {
webSessions collectionReader[webSessionGetter]
webTokens collectionReader[webTokenGetter]
dynamicWindowsDesktops collectionReader[dynamicWindowsDesktopsGetter]
userNotifications collectionReader[notificationGetter]
accessGraphSettings collectionReader[accessGraphSettingsGetter]
globalNotifications collectionReader[notificationGetter]
accessMonitoringRules collectionReader[accessMonitoringRuleGetter]
autoUpdateConfigs collectionReader[autoUpdateConfigGetter]
autoUpdateVersions collectionReader[autoUpdateVersionGetter]
Expand Down Expand Up @@ -467,24 +464,6 @@ func setupLegacyCollections(c *Cache, watches []types.WatchKind) (*legacyCollect
watch: watch,
}
collections.byKind[resourceKind] = collections.staticHostUsers
case types.KindNotification:
if c.Notifications == nil {
return nil, trace.BadParameter("missing parameter Notifications")
}
collections.userNotifications = &genericCollection[*notificationsv1.Notification, notificationGetter, userNotificationExecutor]{
cache: c,
watch: watch,
}
collections.byKind[resourceKind] = collections.userNotifications
case types.KindGlobalNotification:
if c.Notifications == nil {
return nil, trace.BadParameter("missing parameter Notifications")
}
collections.globalNotifications = &genericCollection[*notificationsv1.GlobalNotification, notificationGetter, globalNotificationExecutor]{
cache: c,
watch: watch,
}
collections.byKind[resourceKind] = collections.globalNotifications
case types.KindAccessMonitoringRule:
if c.AccessMonitoringRules == nil {
return nil, trace.BadParameter("missing parameter AccessMonitoringRule")
Expand Down Expand Up @@ -2416,126 +2395,6 @@ type accessListReviewsGetter interface {
ListAccessListReviews(ctx context.Context, accessList string, pageSize int, pageToken string) (reviews []*accesslist.Review, nextToken string, err error)
}

type notificationGetter interface {
ListUserNotifications(ctx context.Context, pageSize int, startKey string) ([]*notificationsv1.Notification, string, error)
ListGlobalNotifications(ctx context.Context, pageSize int, startKey string) ([]*notificationsv1.GlobalNotification, string, error)
}

type userNotificationExecutor struct{}

func (userNotificationExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]*notificationsv1.Notification, error) {
var notifications []*notificationsv1.Notification
var startKey string
for {
notifs, nextKey, err := cache.notificationsCache.ListUserNotifications(ctx, 0, startKey)
if err != nil {
return nil, trace.Wrap(err)
}
notifications = append(notifications, notifs...)

if nextKey == "" {
break
}
startKey = nextKey
}

return notifications, nil
}

func (userNotificationExecutor) upsert(ctx context.Context, cache *Cache, notification *notificationsv1.Notification) error {
_, err := cache.notificationsCache.UpsertUserNotification(ctx, notification)
if err != nil {
return trace.Wrap(err)
}

return nil
}

func (userNotificationExecutor) deleteAll(ctx context.Context, cache *Cache) error {
return cache.notificationsCache.DeleteAllUserNotifications(ctx)
}

func (userNotificationExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error {
r, ok := resource.(types.Resource153UnwrapperT[*notificationsv1.Notification])
if !ok {
return trace.BadParameter("unknown resource type, expected types.Resource153Unwrapper, got %T", resource)
}
notification := r.UnwrapT()
username := notification.GetSpec().GetUsername()
notificationId := notification.GetMetadata().GetName()

err := cache.notificationsCache.DeleteUserNotification(ctx, username, notificationId)
return trace.Wrap(err)
}

func (userNotificationExecutor) isSingleton() bool { return false }

func (userNotificationExecutor) getReader(cache *Cache, cacheOK bool) notificationGetter {
if cacheOK {
return cache.notificationsCache
}
return cache.Config.Notifications
}

var _ executor[*notificationsv1.Notification, notificationGetter] = userNotificationExecutor{}

type globalNotificationExecutor struct{}

func (globalNotificationExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]*notificationsv1.GlobalNotification, error) {
var notifications []*notificationsv1.GlobalNotification
var startKey string
for {
notifs, nextKey, err := cache.notificationsCache.ListGlobalNotifications(ctx, 0, startKey)
if err != nil {
return nil, trace.Wrap(err)
}

notifications = append(notifications, notifs...)

if nextKey == "" {
break
}
startKey = nextKey
}

return notifications, nil
}

func (globalNotificationExecutor) upsert(ctx context.Context, cache *Cache, notification *notificationsv1.GlobalNotification) error {
if _, err := cache.notificationsCache.UpsertGlobalNotification(ctx, notification); err != nil {
return trace.Wrap(err)
}

return nil
}

func (globalNotificationExecutor) deleteAll(ctx context.Context, cache *Cache) error {
return cache.notificationsCache.DeleteAllGlobalNotifications(ctx)
}

func (globalNotificationExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error {
r, ok := resource.(types.Resource153UnwrapperT[*notificationsv1.GlobalNotification])
if !ok {
return trace.BadParameter("unknown resource type, expected types.Resource153Unwrapper, got %T", resource)
}
globalNotification := r.UnwrapT()
notificationId := globalNotification.GetMetadata().GetName()

err := cache.notificationsCache.DeleteGlobalNotification(ctx, notificationId)
return trace.Wrap(err)
}

func (globalNotificationExecutor) isSingleton() bool { return false }

func (globalNotificationExecutor) getReader(cache *Cache, cacheOK bool) notificationGetter {
if cacheOK {
return cache.notificationsCache
}
return cache.Config.Notifications
}

var _ executor[*notificationsv1.GlobalNotification, notificationGetter] = globalNotificationExecutor{}

type accessMonitoringRulesExecutor struct{}

func (accessMonitoringRulesExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]*accessmonitoringrulesv1.AccessMonitoringRule, error) {
Expand Down
Loading
Loading