diff --git a/api/types/installer.go b/api/types/installer.go index 1ac1d8487cd1a..fa341f7372dcd 100644 --- a/api/types/installer.go +++ b/api/types/installer.go @@ -22,7 +22,7 @@ import ( "github.com/gravitational/trace" ) -// Installer is an installer script rseource +// Installer is an installer script resource type Installer interface { Resource diff --git a/lib/auth/auth_test.go b/lib/auth/auth_test.go index 2118821922515..e13c48499b7ef 100644 --- a/lib/auth/auth_test.go +++ b/lib/auth/auth_test.go @@ -34,6 +34,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" "github.com/gravitational/license" "github.com/gravitational/trace" @@ -233,7 +234,7 @@ func TestSessions(t *testing.T) { out, err := s.a.GetWebSessionInfo(ctx, user, ws.GetName()) require.NoError(t, err) ws.SetPriv(nil) - require.Equal(t, ws, out) + require.Empty(t, cmp.Diff(ws, out, cmpopts.IgnoreFields(types.Metadata{}, "ID", "Revision"))) err = s.a.WebSessions().Delete(ctx, types.DeleteWebSessionRequest{ User: user, diff --git a/lib/backend/backend.go b/lib/backend/backend.go index 4d2dbf461a58b..51f6a975357a9 100644 --- a/lib/backend/backend.go +++ b/lib/backend/backend.go @@ -433,6 +433,12 @@ func CreateRevision() string { return uuid.NewString() } +// BlankRevision is a placeholder revision to be used by backends when +// the revision of the item in the backend is empty. This can happen +// to any existing resources that were last written before support for +// revisions was added. +var BlankRevision = uuid.Nil.String() + // NewLease creates a lease for the provided [Item]. func NewLease(item Item) *Lease { return &Lease{ diff --git a/lib/backend/dynamo/dynamodbbk.go b/lib/backend/dynamo/dynamodbbk.go index fcef63c45a995..9ed68ea31faf7 100644 --- a/lib/backend/dynamo/dynamodbbk.go +++ b/lib/backend/dynamo/dynamodbbk.go @@ -412,6 +412,9 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte, if r.Expires != nil { values[i].Expires = time.Unix(*r.Expires, 0).UTC() } + if values[i].Revision == "" { + values[i].Revision = backend.BlankRevision + } } return &backend.GetResult{Items: values}, nil } @@ -503,6 +506,9 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) { if r.Expires != nil { item.Expires = time.Unix(*r.Expires, 0) } + if item.Revision == "" { + item.Revision = backend.BlankRevision + } return item, nil } @@ -572,6 +578,14 @@ func (b *Backend) Delete(ctx context.Context, key []byte) error { // ConditionalUpdate updates the matching item in Dynamo if the provided revision matches // the revision of the item in Dynamo. func (b *Backend) ConditionalUpdate(ctx context.Context, item backend.Item) (*backend.Lease, error) { + if item.Revision == "" { + return nil, trace.Wrap(backend.ErrIncorrectRevision) + } + + if item.Revision == backend.BlankRevision { + item.Revision = "" + } + rev, err := b.create(ctx, item, modeConditionalUpdate) if err != nil { return nil, trace.Wrap(err) @@ -584,6 +598,10 @@ func (b *Backend) ConditionalUpdate(ctx context.Context, item backend.Item) (*ba // ConditionalDelete deletes item by key if the provided revision matches // the revision of the item in Dynamo. func (b *Backend) ConditionalDelete(ctx context.Context, key []byte, rev string) error { + if rev == "" { + return trace.Wrap(backend.ErrIncorrectRevision) + } + av, err := dynamodbattribute.MarshalMap(keyLookup{ HashKey: hashKey, FullPath: prependPrefix(key), @@ -591,6 +609,9 @@ func (b *Backend) ConditionalDelete(ctx context.Context, key []byte, rev string) if err != nil { return trace.Wrap(err) } + if rev == backend.BlankRevision { + rev = "" + } input := dynamodb.DeleteItemInput{Key: av, TableName: aws.String(b.TableName)} input.SetConditionExpression("Revision = :rev") input.SetExpressionAttributeValues(map[string]*dynamodb.AttributeValue{":rev": {S: aws.String(rev)}}) diff --git a/lib/backend/dynamo/shards.go b/lib/backend/dynamo/shards.go index 0eccf2476ec76..e8f8f9d669f76 100644 --- a/lib/backend/dynamo/shards.go +++ b/lib/backend/dynamo/shards.go @@ -318,10 +318,11 @@ func toEvent(rec *dynamodbstreams.Record) (*backend.Event, error) { return &backend.Event{ Type: op, Item: backend.Item{ - Key: trimPrefix(r.FullPath), - Value: r.Value, - Expires: expires, - ID: r.ID, + Key: trimPrefix(r.FullPath), + Value: r.Value, + Expires: expires, + ID: r.ID, + Revision: r.Revision, }, }, nil case types.OpDelete: diff --git a/lib/backend/lite/lite.go b/lib/backend/lite/lite.go index e148a507cdd6d..30183ad92382b 100644 --- a/lib/backend/lite/lite.go +++ b/lib/backend/lite/lite.go @@ -588,6 +588,10 @@ func (l *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) { if err != nil { return nil, trace.Wrap(err) } + + if item.Revision == "" { + item.Revision = backend.BlankRevision + } return &item, nil } @@ -646,6 +650,9 @@ func (l *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte, return trace.Wrap(err) } i.Expires = expires.Time + if i.Revision == "" { + i.Revision = backend.BlankRevision + } result.Items = append(result.Items, i) } return nil @@ -791,8 +798,12 @@ func (l *Backend) DeleteRange(ctx context.Context, startKey, endKey []byte) erro } func (l *Backend) ConditionalUpdate(ctx context.Context, i backend.Item) (*backend.Lease, error) { - if i.Key == nil { - return nil, trace.BadParameter("missing parameter key") + if i.Key == nil || i.Revision == "" { + return nil, trace.Wrap(backend.ErrIncorrectRevision) + } + + if i.Revision == backend.BlankRevision { + i.Revision = "" } rev := backend.CreateRevision() @@ -822,7 +833,7 @@ func (l *Backend) ConditionalUpdate(ctx context.Context, i backend.Item) (*backe } defer stmt.Close() - if _, err := stmt.ExecContext(ctx, types.OpPut, now, string(i.Key), id(now), expires(i.Expires), i.Value, i.Revision); err != nil { + if _, err := stmt.ExecContext(ctx, types.OpPut, now, string(i.Key), id(now), expires(i.Expires), i.Value, rev); err != nil { return trace.Wrap(err) } } @@ -837,8 +848,12 @@ func (l *Backend) ConditionalUpdate(ctx context.Context, i backend.Item) (*backe } func (l *Backend) ConditionalDelete(ctx context.Context, key []byte, revision string) error { - if len(key) == 0 { - return trace.BadParameter("missing parameter key") + if len(key) == 0 || revision == "" { + return trace.Wrap(backend.ErrIncorrectRevision) + } + + if revision == backend.BlankRevision { + revision = "" } return l.inTransaction(ctx, func(tx *sql.Tx) error { diff --git a/lib/backend/memory/memory.go b/lib/backend/memory/memory.go index cf6125cd10793..b2ce5ef2ba58b 100644 --- a/lib/backend/memory/memory.go +++ b/lib/backend/memory/memory.go @@ -386,9 +386,10 @@ func (m *Memory) CompareAndSwap(ctx context.Context, expected backend.Item, repl } func (m *Memory) ConditionalDelete(ctx context.Context, key []byte, rev string) error { - if len(key) == 0 { - return trace.BadParameter("missing parameter key") + if len(key) == 0 || (rev == "" && !m.Mirror) { + return trace.Wrap(backend.ErrIncorrectRevision) } + m.Lock() defer m.Unlock() m.removeExpired() @@ -412,9 +413,10 @@ func (m *Memory) ConditionalDelete(ctx context.Context, key []byte, rev string) } func (m *Memory) ConditionalUpdate(ctx context.Context, i backend.Item) (*backend.Lease, error) { - if len(i.Key) == 0 { - return nil, trace.BadParameter("missing parameter key") + if len(i.Key) == 0 || (i.Revision == "" && !m.Mirror) { + return nil, trace.Wrap(backend.ErrIncorrectRevision) } + m.Lock() defer m.Unlock() m.removeExpired() diff --git a/lib/services/discoveryconfig.go b/lib/services/discoveryconfig.go index 4d40e2d506497..00bdde2b59a89 100644 --- a/lib/services/discoveryconfig.go +++ b/lib/services/discoveryconfig.go @@ -64,6 +64,7 @@ func MarshalDiscoveryConfig(discoveryConfig *discoveryconfig.DiscoveryConfig, op if !cfg.PreserveResourceID { copy := *discoveryConfig copy.SetResourceID(0) + copy.SetRevision("") discoveryConfig = © } return utils.FastMarshal(discoveryConfig) @@ -88,6 +89,9 @@ func UnmarshalDiscoveryConfig(data []byte, opts ...MarshalOption) (*discoverycon if cfg.ID != 0 { discoveryConfig.SetResourceID(cfg.ID) } + if cfg.Revision != "" { + discoveryConfig.SetRevision(cfg.Revision) + } if !cfg.Expires.IsZero() { discoveryConfig.SetExpiry(cfg.Expires) } diff --git a/lib/services/local/access.go b/lib/services/local/access.go index 9b735c15697d1..1b4ea12454968 100644 --- a/lib/services/local/access.go +++ b/lib/services/local/access.go @@ -104,16 +104,18 @@ func (s *AccessService) UpsertRole(ctx context.Context, role types.Role) error { return trace.Wrap(err) } + rev := role.GetRevision() value, err := services.MarshalRole(role) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(rolesPrefix, role.GetName(), paramsPrefix), - Value: value, - Expires: role.Expiry(), - ID: role.GetResourceID(), + Key: backend.Key(rolesPrefix, role.GetName(), paramsPrefix), + Value: value, + Expires: role.Expiry(), + ID: role.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) @@ -203,15 +205,17 @@ func (s *AccessService) GetLocks(ctx context.Context, inForceOnly bool, targets // UpsertLock upserts a lock. func (s *AccessService) UpsertLock(ctx context.Context, lock types.Lock) error { + rev := lock.GetRevision() value, err := services.MarshalLock(lock) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(locksPrefix, lock.GetName()), - Value: value, - Expires: lock.Expiry(), - ID: lock.GetResourceID(), + Key: backend.Key(locksPrefix, lock.GetName()), + Value: value, + Expires: lock.Expiry(), + ID: lock.GetResourceID(), + Revision: rev, } if _, err = s.Put(ctx, item); err != nil { @@ -260,15 +264,17 @@ func (s *AccessService) ReplaceRemoteLocks(ctx context.Context, clusterName stri if !strings.HasPrefix(lock.GetName(), clusterName) { lock.SetName(clusterName + "/" + lock.GetName()) } + rev := lock.GetRevision() value, err := services.MarshalLock(lock) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(locksPrefix, lock.GetName()), - Value: value, - Expires: lock.Expiry(), - ID: lock.GetResourceID(), + Key: backend.Key(locksPrefix, lock.GetName()), + Value: value, + Expires: lock.Expiry(), + ID: lock.GetResourceID(), + Revision: rev, } newRemoteLocksToStore[string(item.Key)] = item } diff --git a/lib/services/local/apps.go b/lib/services/local/apps.go index bbced4d535b63..2e9be88e2633d 100644 --- a/lib/services/local/apps.go +++ b/lib/services/local/apps.go @@ -99,15 +99,17 @@ func (s *AppService) UpdateApp(ctx context.Context, app types.Application) error if err := app.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := app.GetRevision() value, err := services.MarshalApp(app) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(appPrefix, app.GetName()), - Value: value, - Expires: app.Expiry(), - ID: app.GetResourceID(), + Key: backend.Key(appPrefix, app.GetName()), + Value: value, + Expires: app.Expiry(), + ID: app.GetResourceID(), + Revision: rev, } _, err = s.Update(ctx, item) if err != nil { diff --git a/lib/services/local/configuration.go b/lib/services/local/configuration.go index b57c461bf1860..67d42eda487c1 100644 --- a/lib/services/local/configuration.go +++ b/lib/services/local/configuration.go @@ -86,15 +86,17 @@ func (s *ClusterConfigurationService) DeleteClusterName() error { // SetClusterName sets the name of the cluster in the backend. SetClusterName // can only be called once on a cluster after which it will return trace.AlreadyExists. func (s *ClusterConfigurationService) SetClusterName(c types.ClusterName) error { + rev := c.GetRevision() value, err := services.MarshalClusterName(c) if err != nil { return trace.Wrap(err) } _, err = s.Create(context.TODO(), backend.Item{ - Key: backend.Key(clusterConfigPrefix, namePrefix), - Value: value, - Expires: c.Expiry(), + Key: backend.Key(clusterConfigPrefix, namePrefix), + Value: value, + Expires: c.Expiry(), + Revision: rev, }) if err != nil { return trace.Wrap(err) @@ -105,16 +107,18 @@ func (s *ClusterConfigurationService) SetClusterName(c types.ClusterName) error // UpsertClusterName sets the name of the cluster in the backend. func (s *ClusterConfigurationService) UpsertClusterName(c types.ClusterName) error { + rev := c.GetRevision() value, err := services.MarshalClusterName(c) if err != nil { return trace.Wrap(err) } _, err = s.Put(context.TODO(), backend.Item{ - Key: backend.Key(clusterConfigPrefix, namePrefix), - Value: value, - Expires: c.Expiry(), - ID: c.GetResourceID(), + Key: backend.Key(clusterConfigPrefix, namePrefix), + Value: value, + Expires: c.Expiry(), + ID: c.GetResourceID(), + Revision: rev, }) if err != nil { return trace.Wrap(err) @@ -138,15 +142,17 @@ func (s *ClusterConfigurationService) GetStaticTokens() (types.StaticTokens, err // SetStaticTokens sets the list of static tokens used to provision nodes. func (s *ClusterConfigurationService) SetStaticTokens(c types.StaticTokens) error { + rev := c.GetRevision() value, err := services.MarshalStaticTokens(c) if err != nil { return trace.Wrap(err) } _, err = s.Put(context.TODO(), backend.Item{ - Key: backend.Key(clusterConfigPrefix, staticTokensPrefix), - Value: value, - Expires: c.Expiry(), - ID: c.GetResourceID(), + Key: backend.Key(clusterConfigPrefix, staticTokensPrefix), + Value: value, + Expires: c.Expiry(), + ID: c.GetResourceID(), + Revision: rev, }) if err != nil { return trace.Wrap(err) @@ -185,6 +191,7 @@ func (s *ClusterConfigurationService) GetAuthPreference(ctx context.Context) (ty // on the backend. func (s *ClusterConfigurationService) SetAuthPreference(ctx context.Context, preferences types.AuthPreference) error { // Perform the modules-provided checks. + rev := preferences.GetRevision() if err := modules.ValidateResource(preferences); err != nil { return trace.Wrap(err) } @@ -195,9 +202,10 @@ func (s *ClusterConfigurationService) SetAuthPreference(ctx context.Context, pre } item := backend.Item{ - Key: backend.Key(authPrefix, preferencePrefix, generalPrefix), - Value: value, - ID: preferences.GetResourceID(), + Key: backend.Key(authPrefix, preferencePrefix, generalPrefix), + Value: value, + ID: preferences.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) @@ -234,15 +242,17 @@ func (s *ClusterConfigurationService) GetClusterAuditConfig(ctx context.Context, // SetClusterAuditConfig sets the cluster audit config on the backend. func (s *ClusterConfigurationService) SetClusterAuditConfig(ctx context.Context, auditConfig types.ClusterAuditConfig) error { + rev := auditConfig.GetRevision() value, err := services.MarshalClusterAuditConfig(auditConfig) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(clusterConfigPrefix, auditPrefix), - Value: value, - ID: auditConfig.GetResourceID(), + Key: backend.Key(clusterConfigPrefix, auditPrefix), + Value: value, + ID: auditConfig.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) @@ -284,15 +294,17 @@ func (s *ClusterConfigurationService) SetClusterNetworkingConfig(ctx context.Con return trace.Wrap(err) } + rev := netConfig.GetRevision() value, err := services.MarshalClusterNetworkingConfig(netConfig) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(clusterConfigPrefix, networkingPrefix), - Value: value, - ID: netConfig.GetResourceID(), + Key: backend.Key(clusterConfigPrefix, networkingPrefix), + Value: value, + ID: netConfig.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) @@ -333,15 +345,17 @@ func (s *ClusterConfigurationService) SetSessionRecordingConfig(ctx context.Cont return trace.Wrap(err) } + rev := recConfig.GetRevision() value, err := services.MarshalSessionRecordingConfig(recConfig) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(clusterConfigPrefix, sessionRecordingPrefix), - Value: value, - ID: recConfig.GetResourceID(), + Key: backend.Key(clusterConfigPrefix, sessionRecordingPrefix), + Value: value, + ID: recConfig.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) @@ -391,14 +405,16 @@ func (s *ClusterConfigurationService) GetUIConfig(ctx context.Context) (types.UI } func (s *ClusterConfigurationService) SetUIConfig(ctx context.Context, uic types.UIConfig) error { + rev := uic.GetRevision() value, err := services.MarshalUIConfig(uic) if err != nil { return trace.Wrap(err) } _, err = s.Put(ctx, backend.Item{ - Key: backend.Key(clusterConfigPrefix, uiPrefix), - Value: value, + Key: backend.Key(clusterConfigPrefix, uiPrefix), + Value: value, + Revision: rev, }) return trace.Wrap(err) } @@ -413,20 +429,22 @@ func (s *ClusterConfigurationService) GetInstaller(ctx context.Context, name str if err != nil { return nil, trace.Wrap(err) } - return services.UnmarshalInstaller(item.Value) + return services.UnmarshalInstaller(item.Value, services.WithRevision(item.Revision)) } // SetInstaller sets the script of the cluster in the backend func (s *ClusterConfigurationService) SetInstaller(ctx context.Context, ins types.Installer) error { + rev := ins.GetRevision() value, err := services.MarshalInstaller(ins) if err != nil { return trace.Wrap(err) } _, err = s.Put(ctx, backend.Item{ - Key: backend.Key(clusterConfigPrefix, scriptsPrefix, installerPrefix, ins.GetName()), - Value: value, - Expires: ins.Expiry(), + Key: backend.Key(clusterConfigPrefix, scriptsPrefix, installerPrefix, ins.GetName()), + Value: value, + Expires: ins.Expiry(), + Revision: rev, }) return trace.Wrap(err) } diff --git a/lib/services/local/connection_diagnostic.go b/lib/services/local/connection_diagnostic.go index 03b4cdfd26171..af498d3d3a036 100644 --- a/lib/services/local/connection_diagnostic.go +++ b/lib/services/local/connection_diagnostic.go @@ -67,15 +67,17 @@ func (s *ConnectionDiagnosticService) UpdateConnectionDiagnostic(ctx context.Con if err := connectionDiagnostic.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := connectionDiagnostic.GetRevision() value, err := services.MarshalConnectionDiagnostic(connectionDiagnostic) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(connectionDiagnosticPrefix, connectionDiagnostic.GetName()), - Value: value, - Expires: connectionDiagnostic.Expiry(), - ID: connectionDiagnostic.GetResourceID(), + Key: backend.Key(connectionDiagnosticPrefix, connectionDiagnostic.GetName()), + Value: value, + Expires: connectionDiagnostic.Expiry(), + ID: connectionDiagnostic.GetResourceID(), + Revision: rev, } _, err = s.Update(ctx, item) @@ -107,10 +109,11 @@ func (s *ConnectionDiagnosticService) AppendDiagnosticTrace(ctx context.Context, } newItem := backend.Item{ - Key: backend.Key(connectionDiagnosticPrefix, connectionDiagnostic.GetName()), - Value: value, - Expires: connectionDiagnostic.Expiry(), - ID: connectionDiagnostic.GetResourceID(), + Key: backend.Key(connectionDiagnosticPrefix, connectionDiagnostic.GetName()), + Value: value, + Expires: connectionDiagnostic.Expiry(), + ID: connectionDiagnostic.GetResourceID(), + Revision: existing.Revision, } _, err = s.CompareAndSwap(ctx, *existing, newItem) diff --git a/lib/services/local/databases.go b/lib/services/local/databases.go index b79c5dc7ad073..e8eac5d50d515 100644 --- a/lib/services/local/databases.go +++ b/lib/services/local/databases.go @@ -99,15 +99,17 @@ func (s *DatabaseService) UpdateDatabase(ctx context.Context, database types.Dat if err := database.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := database.GetRevision() value, err := services.MarshalDatabase(database) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(databasesPrefix, database.GetName()), - Value: value, - Expires: database.Expiry(), - ID: database.GetResourceID(), + Key: backend.Key(databasesPrefix, database.GetName()), + Value: value, + Expires: database.Expiry(), + ID: database.GetResourceID(), + Revision: rev, } _, err = s.Update(ctx, item) if err != nil { diff --git a/lib/services/local/databaseservice.go b/lib/services/local/databaseservice.go index 1534c1e251db6..58b77df401bea 100644 --- a/lib/services/local/databaseservice.go +++ b/lib/services/local/databaseservice.go @@ -42,15 +42,17 @@ func (s *DatabaseServicesService) UpsertDatabaseService(ctx context.Context, ser if err := service.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := service.GetRevision() value, err := services.MarshalDatabaseService(service) if err != nil { return nil, trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(databaseServicePrefix, service.GetName()), - Value: value, - Expires: service.Expiry(), - ID: service.GetResourceID(), + Key: backend.Key(databaseServicePrefix, service.GetName()), + Value: value, + Expires: service.Expiry(), + ID: service.GetResourceID(), + Revision: rev, } lease, err := s.Put(ctx, item) if err != nil { diff --git a/lib/services/local/desktops.go b/lib/services/local/desktops.go index feb5b2c684a43..ffb1e3c209859 100644 --- a/lib/services/local/desktops.go +++ b/lib/services/local/desktops.go @@ -91,15 +91,17 @@ func (s *WindowsDesktopService) UpdateWindowsDesktop(ctx context.Context, deskto if err := desktop.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := desktop.GetRevision() value, err := services.MarshalWindowsDesktop(desktop) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(windowsDesktopsPrefix, desktop.GetHostID(), desktop.GetName()), - Value: value, - Expires: desktop.Expiry(), - ID: desktop.GetResourceID(), + Key: backend.Key(windowsDesktopsPrefix, desktop.GetHostID(), desktop.GetName()), + Value: value, + Expires: desktop.Expiry(), + ID: desktop.GetResourceID(), + Revision: rev, } _, err = s.Update(ctx, item) if err != nil { @@ -113,15 +115,17 @@ func (s *WindowsDesktopService) UpsertWindowsDesktop(ctx context.Context, deskto if err := desktop.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := desktop.GetRevision() value, err := services.MarshalWindowsDesktop(desktop) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(windowsDesktopsPrefix, desktop.GetHostID(), desktop.GetName()), - Value: value, - Expires: desktop.Expiry(), - ID: desktop.GetResourceID(), + Key: backend.Key(windowsDesktopsPrefix, desktop.GetHostID(), desktop.GetName()), + Value: value, + Expires: desktop.Expiry(), + ID: desktop.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) if err != nil { diff --git a/lib/services/local/discoveryconfig_test.go b/lib/services/local/discoveryconfig_test.go index 3f1a9edb53d04..ba065f9d1ab93 100644 --- a/lib/services/local/discoveryconfig_test.go +++ b/lib/services/local/discoveryconfig_test.go @@ -58,7 +58,7 @@ func TestDiscoveryConfigCRUD(t *testing.T) { require.Empty(t, out) cmpOpts := []cmp.Option{ - cmpopts.IgnoreFields(header.Metadata{}, "ID"), + cmpopts.IgnoreFields(header.Metadata{}, "ID", "Revision"), } // Create both discovery configs. diff --git a/lib/services/local/dynamic_access.go b/lib/services/local/dynamic_access.go index 98cdf57da3207..9f189eee99437 100644 --- a/lib/services/local/dynamic_access.go +++ b/lib/services/local/dynamic_access.go @@ -481,15 +481,17 @@ func (s *DynamicAccessService) updateAccessRequestPluginData(ctx context.Context } func itemFromAccessRequest(req types.AccessRequest) (backend.Item, error) { + rev := req.GetRevision() value, err := services.MarshalAccessRequest(req) if err != nil { return backend.Item{}, trace.Wrap(err) } return backend.Item{ - Key: accessRequestKey(req.GetName()), - Value: value, - Expires: req.Expiry(), - ID: req.GetResourceID(), + Key: accessRequestKey(req.GetName()), + Value: value, + Expires: req.Expiry(), + ID: req.GetResourceID(), + Revision: rev, }, nil } @@ -499,10 +501,11 @@ func itemFromAccessListPromotions(req types.AccessRequest, suggestedItems *types return backend.Item{}, trace.Wrap(err) } return backend.Item{ - Key: AccessRequestAllowedPromotionKey(req.GetName()), - Value: value, - Expires: req.Expiry(), // expire the promotion at the same time as the access request - ID: req.GetResourceID(), + Key: AccessRequestAllowedPromotionKey(req.GetName()), + Value: value, + Expires: req.Expiry(), // expire the promotion at the same time as the access request + ID: req.GetResourceID(), + Revision: req.GetRevision(), }, nil } @@ -524,6 +527,7 @@ func itemToAccessRequest(item backend.Item, opts ...services.MarshalOption) (typ } func itemFromPluginData(data types.PluginData) (backend.Item, error) { + rev := data.GetRevision() value, err := services.MarshalPluginData(data) if err != nil { return backend.Item{}, trace.Wrap(err) @@ -534,10 +538,11 @@ func itemFromPluginData(data types.PluginData) (backend.Item, error) { return backend.Item{}, trace.BadParameter("plugin data size limit exceeded") } return backend.Item{ - Key: pluginDataKey(data.GetSubKind(), data.GetName()), - Value: value, - Expires: data.Expiry(), - ID: data.GetResourceID(), + Key: pluginDataKey(data.GetSubKind(), data.GetName()), + Value: value, + Expires: data.Expiry(), + ID: data.GetResourceID(), + Revision: rev, }, nil } diff --git a/lib/services/local/generic/generic.go b/lib/services/local/generic/generic.go index 94f2c92115ff6..5ab09f0d4c826 100644 --- a/lib/services/local/generic/generic.go +++ b/lib/services/local/generic/generic.go @@ -126,7 +126,7 @@ func (s *Service[T]) GetResources(ctx context.Context) ([]T, error) { out := make([]T, 0, len(result.Items)) for _, item := range result.Items { - resource, err := s.unmarshalFunc(item.Value) + resource, err := s.unmarshalFunc(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, trace.Wrap(err) } @@ -156,7 +156,7 @@ func (s *Service[T]) ListResources(ctx context.Context, pageSize int, pageToken out := make([]T, 0, len(result.Items)) for _, item := range result.Items { - resource, err := s.unmarshalFunc(item.Value) + resource, err := s.unmarshalFunc(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, "", trace.Wrap(err) } @@ -282,15 +282,17 @@ func (s *Service[T]) MakeBackendItem(resource T, name string) (backend.Item, err if err := resource.CheckAndSetDefaults(); err != nil { return backend.Item{}, trace.Wrap(err) } + rev := resource.GetRevision() value, err := s.marshalFunc(resource) if err != nil { return backend.Item{}, trace.Wrap(err) } item := backend.Item{ - Key: s.MakeKey(name), - Value: value, - Expires: resource.Expiry(), - ID: resource.GetResourceID(), + Key: s.MakeKey(name), + Value: value, + Expires: resource.Expiry(), + ID: resource.GetResourceID(), + Revision: rev, } return item, nil diff --git a/lib/services/local/inventory.go b/lib/services/local/inventory.go index e7919429bd61e..667926e984d52 100644 --- a/lib/services/local/inventory.go +++ b/lib/services/local/inventory.go @@ -102,15 +102,17 @@ func (s *PresenceService) UpsertInstance(ctx context.Context, instance types.Ins return trace.BadParameter("unexpected type %T, expected %T", instance, v1) } + rev := instance.GetRevision() value, err := utils.FastMarshal(v1) if err != nil { return trace.Errorf("failed to marshal Instance: %v", err) } item := backend.Item{ - Key: backend.Key(instancePrefix, instance.GetName()), - Value: value, - Expires: instance.Expiry(), + Key: backend.Key(instancePrefix, instance.GetName()), + Value: value, + Expires: instance.Expiry(), + Revision: rev, } _, err = s.Backend.Put(ctx, item) diff --git a/lib/services/local/kube.go b/lib/services/local/kube.go index bc604d70bcdf0..0975a00da92f2 100644 --- a/lib/services/local/kube.go +++ b/lib/services/local/kube.go @@ -99,15 +99,17 @@ func (s *KubernetesService) UpdateKubernetesCluster(ctx context.Context, cluster if err := cluster.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := cluster.GetRevision() value, err := services.MarshalKubeCluster(cluster) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(kubernetesPrefix, cluster.GetName()), - Value: value, - Expires: cluster.Expiry(), - ID: cluster.GetResourceID(), + Key: backend.Key(kubernetesPrefix, cluster.GetName()), + Value: value, + Expires: cluster.Expiry(), + ID: cluster.GetResourceID(), + Revision: rev, } _, err = s.Update(ctx, item) if err != nil { diff --git a/lib/services/local/plugins.go b/lib/services/local/plugins.go index 00335fe71fc97..5133919baa77d 100644 --- a/lib/services/local/plugins.go +++ b/lib/services/local/plugins.go @@ -211,16 +211,18 @@ func (s *PluginsService) updateAndSwap(ctx context.Context, name string, modify return trace.Wrap(err) } + rev := newPlugin.GetRevision() value, err := services.MarshalPlugin(newPlugin) if err != nil { return trace.Wrap(err) } _, err = s.backend.CompareAndSwap(ctx, *item, backend.Item{ - Key: backend.Key(pluginsPrefix, plugin.GetName()), - Value: value, - Expires: plugin.Expiry(), - ID: plugin.GetResourceID(), + Key: backend.Key(pluginsPrefix, plugin.GetName()), + Value: value, + Expires: plugin.Expiry(), + ID: plugin.GetResourceID(), + Revision: rev, }) return trace.Wrap(err) diff --git a/lib/services/local/presence.go b/lib/services/local/presence.go index 3a94c7117bc77..c4ff5dda57f57 100644 --- a/lib/services/local/presence.go +++ b/lib/services/local/presence.go @@ -95,15 +95,17 @@ func (s *PresenceService) UpsertNamespace(n types.Namespace) error { if err := n.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := n.GetRevision() value, err := services.MarshalNamespace(n) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(namespacesPrefix, n.Metadata.Name, paramsPrefix), - Value: value, - Expires: n.Metadata.Expiry(), - ID: n.Metadata.ID, + Key: backend.Key(namespacesPrefix, n.Metadata.Name, paramsPrefix), + Value: value, + Expires: n.Metadata.Expiry(), + ID: n.Metadata.ID, + Revision: rev, } _, err = s.Put(context.TODO(), item) @@ -192,15 +194,17 @@ func (s *PresenceService) UpsertServerInfo(ctx context.Context, si types.ServerI if err := si.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := si.GetRevision() value, err := services.MarshalServerInfo(si) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: serverInfoKey(si.GetSubKind(), si.GetName()), - Value: value, - Expires: si.Expiry(), - ID: si.GetResourceID(), + Key: serverInfoKey(si.GetSubKind(), si.GetName()), + Value: value, + Expires: si.Expiry(), + ID: si.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) @@ -257,15 +261,17 @@ func (s *PresenceService) getServers(ctx context.Context, kind, prefix string) ( } func (s *PresenceService) upsertServer(ctx context.Context, prefix string, server types.Server) error { + rev := server.GetRevision() value, err := services.MarshalServer(server) if err != nil { return trace.Wrap(err) } _, err = s.Put(ctx, backend.Item{ - Key: backend.Key(prefix, server.GetName()), - Value: value, - Expires: server.Expiry(), - ID: server.GetResourceID(), + Key: backend.Key(prefix, server.GetName()), + Value: value, + Expires: server.Expiry(), + ID: server.GetResourceID(), + Revision: rev, }) return trace.Wrap(err) } @@ -346,15 +352,17 @@ func (s *PresenceService) UpsertNode(ctx context.Context, server types.Server) ( if n := server.GetNamespace(); n != apidefaults.Namespace { return nil, trace.BadParameter("cannot place node in namespace %q, custom namespaces are deprecated", n) } + rev := server.GetRevision() value, err := services.MarshalServer(server) if err != nil { return nil, trace.Wrap(err) } lease, err := s.Put(ctx, backend.Item{ - Key: backend.Key(nodesPrefix, server.GetNamespace(), server.GetName()), - Value: value, - Expires: server.Expiry(), - ID: server.GetResourceID(), + Key: backend.Key(nodesPrefix, server.GetNamespace(), server.GetName()), + Value: value, + Expires: server.Expiry(), + ID: server.GetResourceID(), + Revision: rev, }) if err != nil { return nil, trace.Wrap(err) @@ -426,15 +434,17 @@ func (s *PresenceService) UpsertReverseTunnel(tunnel types.ReverseTunnel) error if err := services.ValidateReverseTunnel(tunnel); err != nil { return trace.Wrap(err) } + rev := tunnel.GetRevision() value, err := services.MarshalReverseTunnel(tunnel) if err != nil { return trace.Wrap(err) } _, err = s.Put(context.TODO(), backend.Item{ - Key: backend.Key(reverseTunnelsPrefix, tunnel.GetName()), - Value: value, - Expires: tunnel.Expiry(), - ID: tunnel.GetResourceID(), + Key: backend.Key(reverseTunnelsPrefix, tunnel.GetName()), + Value: value, + Expires: tunnel.Expiry(), + ID: tunnel.GetResourceID(), + Revision: rev, }) return trace.Wrap(err) } @@ -484,15 +494,17 @@ func (s *PresenceService) UpsertTrustedCluster(ctx context.Context, trustedClust if err := services.ValidateTrustedCluster(trustedCluster); err != nil { return nil, trace.Wrap(err) } + rev := trustedCluster.GetRevision() value, err := services.MarshalTrustedCluster(trustedCluster) if err != nil { return nil, trace.Wrap(err) } _, err = s.Put(ctx, backend.Item{ - Key: backend.Key(trustedClustersPrefix, trustedCluster.GetName()), - Value: value, - Expires: trustedCluster.Expiry(), - ID: trustedCluster.GetResourceID(), + Key: backend.Key(trustedClustersPrefix, trustedCluster.GetName()), + Value: value, + Expires: trustedCluster.Expiry(), + ID: trustedCluster.GetResourceID(), + Revision: rev, }) if err != nil { return nil, trace.Wrap(err) @@ -553,15 +565,17 @@ func (s *PresenceService) UpsertTunnelConnection(conn types.TunnelConnection) er return trace.Wrap(err) } + rev := conn.GetRevision() value, err := services.MarshalTunnelConnection(conn) if err != nil { return trace.Wrap(err) } _, err = s.Put(context.TODO(), backend.Item{ - Key: backend.Key(tunnelConnectionsPrefix, conn.GetClusterName(), conn.GetName()), - Value: value, - Expires: conn.Expiry(), - ID: conn.GetResourceID(), + Key: backend.Key(tunnelConnectionsPrefix, conn.GetClusterName(), conn.GetName()), + Value: value, + Expires: conn.Expiry(), + ID: conn.GetResourceID(), + Revision: rev, }) if err != nil { return trace.Wrap(err) @@ -694,14 +708,16 @@ func (s *PresenceService) UpdateRemoteCluster(ctx context.Context, rc types.Remo update.SetConnectionStatus(rc.GetConnectionStatus()) update.SetMetadata(rc.GetMetadata()) + rev := update.GetRevision() updateValue, err := services.MarshalRemoteCluster(update) if err != nil { return trace.Wrap(err) } updateItem := backend.Item{ - Key: backend.Key(remoteClustersPrefix, update.GetName()), - Value: updateValue, - Expires: update.Expiry(), + Key: backend.Key(remoteClustersPrefix, update.GetName()), + Value: updateValue, + Expires: update.Expiry(), + Revision: rev, } _, err = s.CompareAndSwap(ctx, *existingItem, updateItem) @@ -862,14 +878,16 @@ func (s *PresenceService) initSemaphore(ctx context.Context, key []byte, leaseID if err != nil { return nil, trace.Wrap(err) } + rev := sem.GetRevision() value, err := services.MarshalSemaphore(sem) if err != nil { return nil, trace.Wrap(err) } item := backend.Item{ - Key: key, - Value: value, - Expires: sem.Expiry(), + Key: key, + Value: value, + Expires: sem.Expiry(), + Revision: rev, } _, err = s.Create(ctx, item) if err != nil { @@ -897,15 +915,17 @@ func (s *PresenceService) acquireSemaphore(ctx context.Context, key []byte, leas return nil, trace.Wrap(err) } + rev := sem.GetRevision() newValue, err := services.MarshalSemaphore(sem) if err != nil { return nil, trace.Wrap(err) } newItem := backend.Item{ - Key: key, - Value: newValue, - Expires: sem.Expiry(), + Key: key, + Value: newValue, + Expires: sem.Expiry(), + Revision: rev, } if _, err := s.CompareAndSwap(ctx, *item, newItem); err != nil { @@ -945,15 +965,17 @@ func (s *PresenceService) KeepAliveSemaphoreLease(ctx context.Context, lease typ return trace.Wrap(err) } + rev := sem.GetRevision() newValue, err := services.MarshalSemaphore(sem) if err != nil { return trace.Wrap(err) } newItem := backend.Item{ - Key: key, - Value: newValue, - Expires: sem.Expiry(), + Key: key, + Value: newValue, + Expires: sem.Expiry(), + Revision: rev, } _, err = s.CompareAndSwap(ctx, *item, newItem) @@ -1005,15 +1027,17 @@ func (s *PresenceService) CancelSemaphoreLease(ctx context.Context, lease types. return trace.Wrap(err) } + rev := sem.GetRevision() newValue, err := services.MarshalSemaphore(sem) if err != nil { return trace.Wrap(err) } newItem := backend.Item{ - Key: key, - Value: newValue, - Expires: sem.Expiry(), + Key: key, + Value: newValue, + Expires: sem.Expiry(), + Revision: rev, } _, err = s.CompareAndSwap(ctx, *item, newItem) @@ -1064,7 +1088,7 @@ func (s *PresenceService) GetSemaphores(ctx context.Context, filter types.Semaph sems := make([]types.Semaphore, 0, len(items)) for _, item := range items { - sem, err := services.UnmarshalSemaphore(item.Value) + sem, err := services.UnmarshalSemaphore(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, trace.Wrap(err) } @@ -1089,6 +1113,7 @@ func (s *PresenceService) UpsertKubernetesServer(ctx context.Context, server typ if err := server.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := server.GetRevision() value, err := services.MarshalKubeServer(server) if err != nil { return nil, trace.Wrap(err) @@ -1101,9 +1126,10 @@ func (s *PresenceService) UpsertKubernetesServer(ctx context.Context, server typ Key: backend.Key(kubeServersPrefix, server.GetHostID(), server.GetName()), - Value: value, - Expires: server.Expiry(), - ID: server.GetResourceID(), + Value: value, + Expires: server.Expiry(), + ID: server.GetResourceID(), + Revision: rev, }) if err != nil { return nil, trace.Wrap(err) @@ -1197,6 +1223,7 @@ func (s *PresenceService) UpsertDatabaseServer(ctx context.Context, server types if err := server.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := server.GetRevision() value, err := services.MarshalDatabaseServer(server) if err != nil { return nil, trace.Wrap(err) @@ -1209,9 +1236,10 @@ func (s *PresenceService) UpsertDatabaseServer(ctx context.Context, server types server.GetNamespace(), server.GetHostID(), server.GetName()), - Value: value, - Expires: server.Expiry(), - ID: server.GetResourceID(), + Value: value, + Expires: server.Expiry(), + ID: server.GetResourceID(), + Revision: rev, }) if err != nil { return nil, trace.Wrap(err) @@ -1291,6 +1319,7 @@ func (s *PresenceService) UpsertApplicationServer(ctx context.Context, server ty if err := server.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := server.GetRevision() value, err := services.MarshalAppServer(server) if err != nil { return nil, trace.Wrap(err) @@ -1304,9 +1333,10 @@ func (s *PresenceService) UpsertApplicationServer(ctx context.Context, server ty server.GetNamespace(), server.GetHostID(), server.GetName()), - Value: value, - Expires: server.Expiry(), - ID: server.GetResourceID(), + Value: value, + Expires: server.Expiry(), + ID: server.GetResourceID(), + Revision: rev, }) if err != nil { return nil, trace.Wrap(err) @@ -1417,15 +1447,17 @@ func (s *PresenceService) UpsertWindowsDesktopService(ctx context.Context, srv t if err := srv.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := srv.GetRevision() value, err := services.MarshalWindowsDesktopService(srv) if err != nil { return nil, trace.Wrap(err) } lease, err := s.Put(ctx, backend.Item{ - Key: backend.Key(windowsDesktopServicesPrefix, srv.GetName()), - Value: value, - Expires: srv.Expiry(), - ID: srv.GetResourceID(), + Key: backend.Key(windowsDesktopServicesPrefix, srv.GetName()), + Value: value, + Expires: srv.Expiry(), + ID: srv.GetResourceID(), + Revision: rev, }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/services/local/provisioning.go b/lib/services/local/provisioning.go index 7e6a63601b194..3d4731efbac6f 100644 --- a/lib/services/local/provisioning.go +++ b/lib/services/local/provisioning.go @@ -69,15 +69,17 @@ func (s *ProvisioningService) tokenToItem(p types.ProvisionToken) (*backend.Item if err := p.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := p.GetRevision() data, err := services.MarshalProvisionToken(p) if err != nil { return nil, trace.Wrap(err) } item := &backend.Item{ - Key: backend.Key(tokensPrefix, p.GetName()), - Value: data, - Expires: p.Expiry(), - ID: p.GetResourceID(), + Key: backend.Key(tokensPrefix, p.GetName()), + Value: data, + Expires: p.Expiry(), + ID: p.GetResourceID(), + Revision: rev, } return item, nil } diff --git a/lib/services/local/resource.go b/lib/services/local/resource.go index c38d17b207579..049cf271baccd 100644 --- a/lib/services/local/resource.go +++ b/lib/services/local/resource.go @@ -121,15 +121,17 @@ func itemFromUser(user types.User) (*backend.Item, error) { if err := services.ValidateUser(user); err != nil { return nil, trace.Wrap(err) } + rev := user.GetRevision() value, err := services.MarshalUser(user) if err != nil { return nil, trace.Wrap(err) } item := &backend.Item{ - Key: backend.Key(webPrefix, usersPrefix, user.GetName(), paramsPrefix), - Value: value, - Expires: user.Expiry(), - ID: user.GetResourceID(), + Key: backend.Key(webPrefix, usersPrefix, user.GetName(), paramsPrefix), + Value: value, + Expires: user.Expiry(), + ID: user.GetResourceID(), + Revision: rev, } return item, nil } @@ -158,15 +160,17 @@ func itemFromCertAuthority(ca types.CertAuthority) (*backend.Item, error) { if err := services.ValidateCertAuthority(ca); err != nil { return nil, trace.Wrap(err) } + rev := ca.GetRevision() value, err := services.MarshalCertAuthority(ca) if err != nil { return nil, trace.Wrap(err) } item := &backend.Item{ - Key: backend.Key(authoritiesPrefix, string(ca.GetType()), ca.GetName()), - Value: value, - Expires: ca.Expiry(), - ID: ca.GetResourceID(), + Key: backend.Key(authoritiesPrefix, string(ca.GetType()), ca.GetName()), + Value: value, + Expires: ca.Expiry(), + ID: ca.GetResourceID(), + Revision: rev, } return item, nil } @@ -177,15 +181,17 @@ func itemFromProvisionToken(p types.ProvisionToken) (*backend.Item, error) { if err := p.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := p.GetRevision() value, err := services.MarshalProvisionToken(p) if err != nil { return nil, trace.Wrap(err) } item := &backend.Item{ - Key: backend.Key(tokensPrefix, p.GetName()), - Value: value, - Expires: p.Expiry(), - ID: p.GetResourceID(), + Key: backend.Key(tokensPrefix, p.GetName()), + Value: value, + Expires: p.Expiry(), + ID: p.GetResourceID(), + Revision: rev, } return item, nil } @@ -196,15 +202,17 @@ func itemFromTrustedCluster(tc types.TrustedCluster) (*backend.Item, error) { if err := tc.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := tc.GetRevision() value, err := services.MarshalTrustedCluster(tc) if err != nil { return nil, trace.Wrap(err) } item := &backend.Item{ - Key: backend.Key(trustedClustersPrefix, tc.GetName()), - Value: value, - Expires: tc.Expiry(), - ID: tc.GetResourceID(), + Key: backend.Key(trustedClustersPrefix, tc.GetName()), + Value: value, + Expires: tc.Expiry(), + ID: tc.GetResourceID(), + Revision: rev, } return item, nil } @@ -215,15 +223,17 @@ func itemFromGithubConnector(gc types.GithubConnector) (*backend.Item, error) { if err := gc.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := gc.GetRevision() value, err := services.MarshalGithubConnector(gc) if err != nil { return nil, trace.Wrap(err) } item := &backend.Item{ - Key: backend.Key(webPrefix, connectorsPrefix, githubPrefix, connectorsPrefix, gc.GetName()), - Value: value, - Expires: gc.Expiry(), - ID: gc.GetResourceID(), + Key: backend.Key(webPrefix, connectorsPrefix, githubPrefix, connectorsPrefix, gc.GetName()), + Value: value, + Expires: gc.Expiry(), + ID: gc.GetResourceID(), + Revision: rev, } return item, nil } @@ -231,16 +241,18 @@ func itemFromGithubConnector(gc types.GithubConnector) (*backend.Item, error) { // itemFromRole attempts to encode the supplied role as an // instance of `backend.Item` suitable for storage. func itemFromRole(role types.Role) (*backend.Item, error) { + rev := role.GetRevision() value, err := services.MarshalRole(role) if err != nil { return nil, trace.Wrap(err) } item := &backend.Item{ - Key: backend.Key(rolesPrefix, role.GetName(), paramsPrefix), - Value: value, - Expires: role.Expiry(), - ID: role.GetResourceID(), + Key: backend.Key(rolesPrefix, role.GetName(), paramsPrefix), + Value: value, + Expires: role.Expiry(), + ID: role.GetResourceID(), + Revision: rev, } return item, nil } @@ -248,15 +260,17 @@ func itemFromRole(role types.Role) (*backend.Item, error) { // itemFromOIDCConnector attempts to encode the supplied connector as an // instance of `backend.Item` suitable for storage. func itemFromOIDCConnector(connector types.OIDCConnector) (*backend.Item, error) { + rev := connector.GetRevision() value, err := services.MarshalOIDCConnector(connector) if err != nil { return nil, trace.Wrap(err) } item := &backend.Item{ - Key: backend.Key(webPrefix, connectorsPrefix, oidcPrefix, connectorsPrefix, connector.GetName()), - Value: value, - Expires: connector.Expiry(), - ID: connector.GetResourceID(), + Key: backend.Key(webPrefix, connectorsPrefix, oidcPrefix, connectorsPrefix, connector.GetName()), + Value: value, + Expires: connector.Expiry(), + ID: connector.GetResourceID(), + Revision: rev, } return item, nil } @@ -264,6 +278,7 @@ func itemFromOIDCConnector(connector types.OIDCConnector) (*backend.Item, error) // itemFromSAMLConnector attempts to encode the supplied connector as an // instance of `backend.Item` suitable for storage. func itemFromSAMLConnector(connector types.SAMLConnector) (*backend.Item, error) { + rev := connector.GetRevision() if err := services.ValidateSAMLConnector(connector, nil); err != nil { return nil, trace.Wrap(err) } @@ -272,10 +287,11 @@ func itemFromSAMLConnector(connector types.SAMLConnector) (*backend.Item, error) return nil, trace.Wrap(err) } item := &backend.Item{ - Key: backend.Key(webPrefix, connectorsPrefix, samlPrefix, connectorsPrefix, connector.GetName()), - Value: value, - Expires: connector.Expiry(), - ID: connector.GetResourceID(), + Key: backend.Key(webPrefix, connectorsPrefix, samlPrefix, connectorsPrefix, connector.GetName()), + Value: value, + Expires: connector.Expiry(), + ID: connector.GetResourceID(), + Revision: rev, } return item, nil } @@ -359,15 +375,17 @@ func itemFromLock(l types.Lock) (*backend.Item, error) { if err := l.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + rev := l.GetRevision() value, err := services.MarshalLock(l) if err != nil { return nil, trace.Wrap(err) } return &backend.Item{ - Key: backend.Key(locksPrefix, l.GetName()), - Value: value, - Expires: l.Expiry(), - ID: l.GetResourceID(), + Key: backend.Key(locksPrefix, l.GetName()), + Value: value, + Expires: l.Expiry(), + ID: l.GetResourceID(), + Revision: rev, }, nil } diff --git a/lib/services/local/restrictions.go b/lib/services/local/restrictions.go index f5145036794c9..32da2f8cd476a 100644 --- a/lib/services/local/restrictions.go +++ b/lib/services/local/restrictions.go @@ -41,16 +41,18 @@ func (s *RestrictionsService) SetNetworkRestrictions(ctx context.Context, nr typ if err := nr.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := nr.GetRevision() value, err := services.MarshalNetworkRestrictions(nr) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(restrictionsPrefix, network), - Value: value, - Expires: nr.Expiry(), - ID: nr.GetResourceID(), + Key: backend.Key(restrictionsPrefix, network), + Value: value, + Expires: nr.Expiry(), + ID: nr.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) diff --git a/lib/services/local/session.go b/lib/services/local/session.go index 2fe93752d0d2d..ac2a612ecd863 100644 --- a/lib/services/local/session.go +++ b/lib/services/local/session.go @@ -60,7 +60,7 @@ func (s *IdentityService) getSession(ctx context.Context, keyParts ...string) (t if err != nil { return nil, trace.Wrap(err) } - session, err := services.UnmarshalWebSession(item.Value) + session, err := services.UnmarshalWebSession(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, trace.Wrap(err) } @@ -86,7 +86,7 @@ func (s *IdentityService) GetSnowflakeSessions(ctx context.Context) ([]types.Web out := make([]types.WebSession, len(result.Items)) for i, item := range result.Items { - session, err := services.UnmarshalWebSession(item.Value) + session, err := services.UnmarshalWebSession(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, trace.Wrap(err) } @@ -124,7 +124,7 @@ func (s *IdentityService) listSessions(ctx context.Context, pageSize int, pageTo out = make([]types.WebSession, 0, len(result.Items)) for _, item := range result.Items { - session, err := services.UnmarshalWebSession(item.Value) + session, err := services.UnmarshalWebSession(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, "", trace.Wrap(err) } @@ -138,7 +138,7 @@ func (s *IdentityService) listSessions(ctx context.Context, pageSize int, pageTo break } - session, err := services.UnmarshalWebSession(item.Value) + session, err := services.UnmarshalWebSession(item.Value, services.WithRevision(item.Revision)) if err != nil { return false, trace.Wrap(err) } @@ -181,14 +181,16 @@ func (s *IdentityService) UpsertSAMLIdPSession(ctx context.Context, session type // upsertSession creates a web session. func (s *IdentityService) upsertSession(ctx context.Context, session types.WebSession, keyPrefix ...string) error { + rev := session.GetRevision() value, err := services.MarshalWebSession(session) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(append(keyPrefix, session.GetName())...), - Value: value, - Expires: session.GetExpiryTime(), + Key: backend.Key(append(keyPrefix, session.GetName())...), + Value: value, + Expires: session.GetExpiryTime(), + Revision: rev, } if _, err = s.Put(ctx, item); err != nil { @@ -316,7 +318,7 @@ func (r *webSessions) Get(ctx context.Context, req types.GetWebSessionRequest) ( if err != nil { return nil, trace.Wrap(err) } - session, err := services.UnmarshalWebSession(item.Value) + session, err := services.UnmarshalWebSession(item.Value, services.WithRevision(item.Revision)) if err != nil && !trace.IsNotFound(err) { return nil, trace.Wrap(err) } @@ -332,7 +334,7 @@ func (r *webSessions) List(ctx context.Context) (out []types.WebSession, err err return nil, trace.Wrap(err) } for _, item := range result.Items { - session, err := services.UnmarshalWebSession(item.Value) + session, err := services.UnmarshalWebSession(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, trace.Wrap(err) } @@ -349,15 +351,17 @@ func (r *webSessions) List(ctx context.Context) (out []types.WebSession, err err // Upsert updates the existing or inserts a new web session. func (r *webSessions) Upsert(ctx context.Context, session types.WebSession) error { + rev := session.GetRevision() value, err := services.MarshalWebSession(session) if err != nil { return trace.Wrap(err) } sessionMetadata := session.GetMetadata() item := backend.Item{ - Key: webSessionKey(session.GetName()), - Value: value, - Expires: backend.EarliestExpiry(session.GetBearerTokenExpiryTime(), sessionMetadata.Expiry()), + Key: webSessionKey(session.GetName()), + Value: value, + Expires: backend.EarliestExpiry(session.GetBearerTokenExpiryTime(), sessionMetadata.Expiry()), + Revision: rev, } _, err = r.backend.Put(ctx, item) if err != nil { @@ -397,7 +401,7 @@ func (r *webSessions) listLegacySessions(ctx context.Context) ([]types.WebSessio if suffix != sessionsPrefix { continue } - session, err := services.UnmarshalWebSession(item.Value) + session, err := services.UnmarshalWebSession(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, trace.Wrap(err) } @@ -425,7 +429,7 @@ func (r *webTokens) Get(ctx context.Context, req types.GetWebTokenRequest) (type if err != nil { return nil, trace.Wrap(err) } - token, err := services.UnmarshalWebToken(item.Value) + token, err := services.UnmarshalWebToken(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, trace.Wrap(err) } @@ -440,7 +444,7 @@ func (r *webTokens) List(ctx context.Context) (out []types.WebToken, err error) return nil, trace.Wrap(err) } for _, item := range result.Items { - token, err := services.UnmarshalWebToken(item.Value) + token, err := services.UnmarshalWebToken(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, trace.Wrap(err) } @@ -451,15 +455,17 @@ func (r *webTokens) List(ctx context.Context) (out []types.WebToken, err error) // Upsert updates the existing or inserts a new web token. func (r *webTokens) Upsert(ctx context.Context, token types.WebToken) error { + rev := token.GetRevision() bytes, err := services.MarshalWebToken(token, services.WithVersion(types.V3)) if err != nil { return trace.Wrap(err) } metadata := token.GetMetadata() item := backend.Item{ - Key: webTokenKey(token.GetToken()), - Value: bytes, - Expires: metadata.Expiry(), + Key: webTokenKey(token.GetToken()), + Value: bytes, + Expires: metadata.Expiry(), + Revision: rev, } _, err = r.backend.Put(ctx, item) if err != nil { diff --git a/lib/services/local/sessiontracker.go b/lib/services/local/sessiontracker.go index 2ca5c47e50a69..68d5963877dc7 100644 --- a/lib/services/local/sessiontracker.go +++ b/lib/services/local/sessiontracker.go @@ -72,8 +72,7 @@ func (s *sessionTracker) UpdatePresence(ctx context.Context, sessionID, user str return trace.Wrap(err) } - err = session.UpdatePresence(user, s.bk.Clock().Now().UTC()) - if err != nil { + if err := session.UpdatePresence(user, s.bk.Clock().Now().UTC()); err != nil { return trace.Wrap(err) } @@ -83,9 +82,10 @@ func (s *sessionTracker) UpdatePresence(ctx context.Context, sessionID, user str } item := backend.Item{ - Key: backend.Key(sessionPrefix, sessionID), - Value: sessionJSON, - Expires: session.Expiry(), + Key: backend.Key(sessionPrefix, sessionID), + Value: sessionJSON, + Expires: session.Expiry(), + Revision: sessionItem.Revision, } _, err = s.bk.CompareAndSwap(ctx, *sessionItem, item) if trace.IsCompareFailed(err) { @@ -261,9 +261,10 @@ func (s *sessionTracker) UpdateSessionTracker(ctx context.Context, req *proto.Up } item := backend.Item{ - Key: backend.Key(sessionPrefix, req.SessionID), - Value: sessionJSON, - Expires: expiry, + Key: backend.Key(sessionPrefix, req.SessionID), + Value: sessionJSON, + Expires: expiry, + Revision: sessionItem.Revision, } _, err = s.bk.CompareAndSwap(ctx, *sessionItem, item) if trace.IsCompareFailed(err) { diff --git a/lib/services/local/status.go b/lib/services/local/status.go index ca4874a157027..452f6b56f9494 100644 --- a/lib/services/local/status.go +++ b/lib/services/local/status.go @@ -126,15 +126,17 @@ func (s *StatusService) UpsertClusterAlert(ctx context.Context, alert types.Clus alert.Metadata.SetExpiry(alert.Spec.Created.Add(time.Hour * 24)) } + rev := alert.GetRevision() val, err := utils.FastMarshal(&alert) if err != nil { return trace.Wrap(err) } _, err = s.Backend.Put(ctx, backend.Item{ - Key: backend.Key(clusterAlertPrefix, alert.Metadata.Name), - Value: val, - Expires: alert.Metadata.Expiry(), + Key: backend.Key(clusterAlertPrefix, alert.Metadata.Name), + Value: val, + Expires: alert.Metadata.Expiry(), + Revision: rev, }) return trace.Wrap(err) } diff --git a/lib/services/local/trust.go b/lib/services/local/trust.go index 19fe4404af3fd..596a09b1c82a8 100644 --- a/lib/services/local/trust.go +++ b/lib/services/local/trust.go @@ -89,15 +89,17 @@ func (s *CA) UpsertCertAuthority(ctx context.Context, ca types.CertAuthority) er } } + rev := ca.GetRevision() value, err := services.MarshalCertAuthority(ca) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(authoritiesPrefix, string(ca.GetType()), ca.GetName()), - Value: value, - Expires: ca.Expiry(), - ID: ca.GetResourceID(), + Key: backend.Key(authoritiesPrefix, string(ca.GetType()), ca.GetName()), + Value: value, + Expires: ca.Expiry(), + ID: ca.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) @@ -130,14 +132,16 @@ func (s *CA) CompareAndSwapCertAuthority(new, expected types.CertAuthority) erro return trace.CompareFailed("cluster %v settings have been updated, try again", new.GetName()) } + rev := new.GetRevision() newValue, err := services.MarshalCertAuthority(new) if err != nil { return trace.Wrap(err) } newItem := backend.Item{ - Key: key, - Value: newValue, - Expires: new.Expiry(), + Key: key, + Value: newValue, + Expires: new.Expiry(), + Revision: rev, } _, err = s.CompareAndSwap(context.TODO(), *actualItem, newItem) @@ -216,15 +220,17 @@ func (s *CA) DeactivateCertAuthority(id types.CertAuthID) error { return trace.Wrap(err) } + rev := certAuthority.GetRevision() value, err := services.MarshalCertAuthority(certAuthority) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(authoritiesPrefix, deactivatedPrefix, string(id.Type), id.DomainName), - Value: value, - Expires: certAuthority.Expiry(), - ID: certAuthority.GetResourceID(), + Key: backend.Key(authoritiesPrefix, deactivatedPrefix, string(id.Type), id.DomainName), + Value: value, + Expires: certAuthority.Expiry(), + ID: certAuthority.GetResourceID(), + Revision: rev, } _, err = s.Put(context.TODO(), item) @@ -317,14 +323,16 @@ func (s *CA) UpdateUserCARoleMap(ctx context.Context, name string, roleMap types actual.SetRoleMap(roleMap) + rev := actual.GetRevision() newValue, err := services.MarshalCertAuthority(actual) if err != nil { return trace.Wrap(err) } newItem := backend.Item{ - Key: key, - Value: newValue, - Expires: actual.Expiry(), + Key: key, + Value: newValue, + Expires: actual.Expiry(), + Revision: rev, } _, err = s.CompareAndSwap(ctx, *actualItem, newItem) if err != nil { diff --git a/lib/services/local/users.go b/lib/services/local/users.go index 1ba30984ac1a9..653b6aefe94a6 100644 --- a/lib/services/local/users.go +++ b/lib/services/local/users.go @@ -200,6 +200,7 @@ func (s *IdentityService) UpdateUserWithContext(ctx context.Context, user types. return nil, trace.Wrap(err) } + rev := user.GetRevision() value, err := services.MarshalUser(user.WithoutSecrets().(types.User)) if err != nil { return nil, trace.Wrap(err) @@ -209,7 +210,7 @@ func (s *IdentityService) UpdateUserWithContext(ctx context.Context, user types. Value: value, Expires: user.Expiry(), ID: user.GetResourceID(), - Revision: user.GetRevision(), + Revision: rev, } lease, err := s.Update(ctx, item) if err != nil { @@ -272,6 +273,7 @@ func (s *IdentityService) UpsertUserWithContext(ctx context.Context, user types. if err := services.ValidateUser(user); err != nil { return nil, trace.Wrap(err) } + rev := user.GetRevision() value, err := services.MarshalUser(user.WithoutSecrets().(types.User)) if err != nil { return nil, trace.Wrap(err) @@ -281,7 +283,7 @@ func (s *IdentityService) UpsertUserWithContext(ctx context.Context, user types. Value: value, Expires: user.Expiry(), ID: user.GetResourceID(), - Revision: user.GetRevision(), + Revision: rev, } lease, err := s.Put(ctx, item) if err != nil { @@ -308,6 +310,7 @@ func (s *IdentityService) CompareAndSwapUser(ctx context.Context, new, existing if !ok { return trace.BadParameter("Invalid user type %T", new) } + rev := new.GetRevision() newValue, err := services.MarshalUser(newRaw) if err != nil { return trace.Wrap(err) @@ -317,7 +320,7 @@ func (s *IdentityService) CompareAndSwapUser(ctx context.Context, new, existing Value: newValue, Expires: new.Expiry(), ID: new.GetResourceID(), - Revision: new.GetRevision(), + Revision: rev, } existingRaw, ok := existing.WithoutSecrets().(types.User) @@ -937,13 +940,15 @@ func (s *IdentityService) UpsertMFADevice(ctx context.Context, user string, d *t } } + rev := d.GetRevision() value, err := json.Marshal(d) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(webPrefix, usersPrefix, user, mfaDevicePrefix, d.Id), - Value: value, + Key: backend.Key(webPrefix, usersPrefix, user, mfaDevicePrefix, d.Id), + Value: value, + Revision: rev, } if _, err := s.Put(ctx, item); err != nil { @@ -1004,15 +1009,17 @@ func (s *IdentityService) GetMFADevices(ctx context.Context, user string, withSe // UpsertOIDCConnector upserts OIDC Connector func (s *IdentityService) UpsertOIDCConnector(ctx context.Context, connector types.OIDCConnector) error { + rev := connector.GetRevision() value, err := services.MarshalOIDCConnector(connector) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(webPrefix, connectorsPrefix, oidcPrefix, connectorsPrefix, connector.GetName()), - Value: value, - Expires: connector.Expiry(), - ID: connector.GetResourceID(), + Key: backend.Key(webPrefix, connectorsPrefix, oidcPrefix, connectorsPrefix, connector.GetName()), + Value: value, + Expires: connector.Expiry(), + ID: connector.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) if err != nil { @@ -1119,14 +1126,16 @@ func (s *IdentityService) UpsertSAMLConnector(ctx context.Context, connector typ if err := services.ValidateSAMLConnector(connector, nil); err != nil { return trace.Wrap(err) } + rev := connector.GetRevision() value, err := services.MarshalSAMLConnector(connector) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(webPrefix, connectorsPrefix, samlPrefix, connectorsPrefix, connector.GetName()), - Value: value, - Expires: connector.Expiry(), + Key: backend.Key(webPrefix, connectorsPrefix, samlPrefix, connectorsPrefix, connector.GetName()), + Value: value, + Expires: connector.Expiry(), + Revision: rev, } _, err = s.Put(ctx, item) if err != nil { @@ -1296,15 +1305,17 @@ func (s *IdentityService) UpsertGithubConnector(ctx context.Context, connector t if err := connector.CheckAndSetDefaults(); err != nil { return trace.Wrap(err) } + rev := connector.GetRevision() value, err := services.MarshalGithubConnector(connector) if err != nil { return trace.Wrap(err) } item := backend.Item{ - Key: backend.Key(webPrefix, connectorsPrefix, githubPrefix, connectorsPrefix, connector.GetName()), - Value: value, - Expires: connector.Expiry(), - ID: connector.GetResourceID(), + Key: backend.Key(webPrefix, connectorsPrefix, githubPrefix, connectorsPrefix, connector.GetName()), + Value: value, + Expires: connector.Expiry(), + ID: connector.GetResourceID(), + Revision: rev, } _, err = s.Put(ctx, item) if err != nil { diff --git a/lib/services/plugin_data.go b/lib/services/plugin_data.go index 70e6bf2149897..eb391dce4b7a1 100644 --- a/lib/services/plugin_data.go +++ b/lib/services/plugin_data.go @@ -41,6 +41,7 @@ func MarshalPluginData(pluginData types.PluginData, opts ...MarshalOption) ([]by // to prevent unexpected data races cp := *pluginData cp.SetResourceID(0) + cp.SetRevision("") pluginData = &cp } return utils.FastMarshal(pluginData) diff --git a/lib/services/session.go b/lib/services/session.go index aedb735526a01..c39da78a2381e 100644 --- a/lib/services/session.go +++ b/lib/services/session.go @@ -142,6 +142,9 @@ func UnmarshalWebToken(bytes []byte, opts ...MarshalOption) (types.WebToken, err if config.ID != 0 { token.SetResourceID(config.ID) } + if config.Revision != "" { + token.SetRevision(config.Revision) + } if !config.Expires.IsZero() { token.Metadata.SetExpiry(config.Expires) } diff --git a/lib/services/suite/suite.go b/lib/services/suite/suite.go index 9fc62972516b2..380c0e3691444 100644 --- a/lib/services/suite/suite.go +++ b/lib/services/suite/suite.go @@ -559,7 +559,7 @@ func (s *ServicesTestSuite) WebSessionCRUD(t *testing.T) { out, err := s.WebS.WebSessions().Get(ctx, req) require.NoError(t, err) - require.Empty(t, cmp.Diff(out, ws)) + require.Empty(t, cmp.Diff(out, ws, cmpopts.IgnoreFields(types.Metadata{}, "ID", "Revision"))) ws1, err := types.NewWebSession("sid1", types.KindWebSession, types.WebSessionSpecV2{ @@ -575,7 +575,7 @@ func (s *ServicesTestSuite) WebSessionCRUD(t *testing.T) { out2, err := s.WebS.WebSessions().Get(ctx, req) require.NoError(t, err) - require.Empty(t, cmp.Diff(out2, ws1)) + require.Empty(t, cmp.Diff(out2, ws1, cmpopts.IgnoreFields(types.Metadata{}, "ID", "Revision"))) require.NoError(t, s.WebS.WebSessions().Delete(ctx, types.DeleteWebSessionRequest{ User: req.User, diff --git a/lib/services/user_login_state.go b/lib/services/user_login_state.go index 55356848f6ba6..940613be67856 100644 --- a/lib/services/user_login_state.go +++ b/lib/services/user_login_state.go @@ -61,8 +61,13 @@ func MarshalUserLoginState(userLoginState *userloginstate.UserLoginState, opts . if !cfg.PreserveResourceID { prevID := userLoginState.GetResourceID() - defer func() { userLoginState.SetResourceID(prevID) }() + prevRev := userLoginState.GetRevision() + defer func() { + userLoginState.SetResourceID(prevID) + userLoginState.SetRevision(prevRev) + }() userLoginState.SetResourceID(0) + userLoginState.SetRevision("") } return utils.FastMarshal(userLoginState) }