diff --git a/pkg/server/plugin/datastore/sql/sql.go b/pkg/server/plugin/datastore/sql/sql.go index dfe0b4aa9e..d5d7edf771 100644 --- a/pkg/server/plugin/datastore/sql/sql.go +++ b/pkg/server/plugin/datastore/sql/sql.go @@ -139,7 +139,7 @@ func (ds *Plugin) CreateBundle(ctx context.Context, req *datastore.CreateBundleR // UpdateBundle updates an existing bundle with the given CAs. Overwrites any // existing certificates. func (ds *Plugin) UpdateBundle(ctx context.Context, req *datastore.UpdateBundleRequest) (resp *datastore.UpdateBundleResponse, err error) { - if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = updateBundle(tx, req) return err }); err != nil { @@ -161,7 +161,7 @@ func (ds *Plugin) SetBundle(ctx context.Context, req *datastore.SetBundleRequest // AppendBundle append bundle contents to the existing bundle (by trust domain). If no existing one is present, create it. func (ds *Plugin) AppendBundle(ctx context.Context, req *datastore.AppendBundleRequest) (resp *datastore.AppendBundleResponse, err error) { - if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = appendBundle(tx, req) return err }); err != nil { @@ -216,7 +216,7 @@ func (ds *Plugin) ListBundles(ctx context.Context, req *datastore.ListBundlesReq // PruneBundle removes expired certs and keys from a bundle func (ds *Plugin) PruneBundle(ctx context.Context, req *datastore.PruneBundleRequest) (resp *datastore.PruneBundleResponse, err error) { - if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = pruneBundle(tx, req, ds.log) return err }); err != nil { @@ -282,7 +282,7 @@ func (ds *Plugin) ListAttestedNodes(ctx context.Context, // UpdateAttestedNode updates the given node's cert serial and expiration. func (ds *Plugin) UpdateAttestedNode(ctx context.Context, req *datastore.UpdateAttestedNodeRequest) (resp *datastore.UpdateAttestedNodeResponse, err error) { - if err = ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = updateAttestedNode(tx, req) return err }); err != nil { @@ -384,7 +384,7 @@ func (ds *Plugin) ListRegistrationEntries(ctx context.Context, // UpdateRegistrationEntry updates an existing registration entry func (ds *Plugin) UpdateRegistrationEntry(ctx context.Context, req *datastore.UpdateRegistrationEntryRequest) (resp *datastore.UpdateRegistrationEntryResponse, err error) { - if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = updateRegistrationEntry(tx, req) return err }); err != nil { @@ -561,14 +561,38 @@ func (*Plugin) GetPluginInfo(context.Context, *spi.GetPluginInfoRequest) (*spi.G return &pluginInfo, nil } -func (ds *Plugin) withWriteRepeatableReadTx(ctx context.Context, op func(tx *gorm.DB) error) error { - return ds.withTx(ctx, op, false, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) -} - +// withReadModifyWriteTx wraps the operation in a transaction appropriate for +// operations that will read one or more rows, change one or more columns in +// those rows, and then set them back. This requires a stronger level of +// consistency that prevents two transactions from doing read-modify-write +// concurrently. +func (ds *Plugin) withReadModifyWriteTx(ctx context.Context, op func(tx *gorm.DB) error) error { + isolationLevel := sql.LevelRepeatableRead + if ds.db.databaseType == MySQL { + // MySQL REPEATABLE READ is weaker than that of PostgreSQL. Namely, + // PostgreSQL, beyond providing the minimum consistency guarantees + // mandated for REPEATABLE READ in the standard, automatically fails + // concurrent transactions that try to update the same target row. + // + // MySQL SERIALIZABLE is the same as REPEATABLE READ except that it + // automatically converts `SELECT` to `SELECT ... LOCK FOR SHARE MODE` + // which "sets a shared lock that permits other transactions to read + // the examined rows but not to update or delete them", which is what + // we want. + isolationLevel = sql.LevelSerializable + } + return ds.withTx(ctx, op, false, &sql.TxOptions{Isolation: isolationLevel}) +} + +// withWriteTx wraps the operation in a transaction appropriate for operations +// that unconditionally create/update rows, without reading them first. If two +// transactions try and update at the same time, last writer wins. func (ds *Plugin) withWriteTx(ctx context.Context, op func(tx *gorm.DB) error) error { return ds.withTx(ctx, op, false, nil) } +// withWriteTx wraps the operation in a transaction appropriate for operations +// that only read rows. func (ds *Plugin) withReadTx(ctx context.Context, op func(tx *gorm.DB) error) error { return ds.withTx(ctx, op, true, nil) } @@ -1514,6 +1538,7 @@ FROM attested_node_entries N return builder.String(), args, nil } + func updateAttestedNode(tx *gorm.DB, req *datastore.UpdateAttestedNodeRequest) (*datastore.UpdateAttestedNodeResponse, error) { var model AttestedNode if err := tx.Find(&model, "spiffe_id = ?", req.SpiffeId).Error; err != nil {