Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions pkg/server/plugin/datastore/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (ds *Plugin) CreateBundle(ctx context.Context, req *datastore.CreateBundleR
// UpdateBundle updates an existing bundle with the given CAs. Overwrites any
// existing certificates.
func (ds *Plugin) UpdateBundle(ctx context.Context, req *datastore.UpdateBundleRequest) (resp *datastore.UpdateBundleResponse, err error) {
if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) {
if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) {
resp, err = updateBundle(tx, req)
return err
}); err != nil {
Expand All @@ -161,7 +161,7 @@ func (ds *Plugin) SetBundle(ctx context.Context, req *datastore.SetBundleRequest

// AppendBundle append bundle contents to the existing bundle (by trust domain). If no existing one is present, create it.
func (ds *Plugin) AppendBundle(ctx context.Context, req *datastore.AppendBundleRequest) (resp *datastore.AppendBundleResponse, err error) {
if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) {
if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) {
resp, err = appendBundle(tx, req)
return err
}); err != nil {
Expand Down Expand Up @@ -216,7 +216,7 @@ func (ds *Plugin) ListBundles(ctx context.Context, req *datastore.ListBundlesReq

// PruneBundle removes expired certs and keys from a bundle
func (ds *Plugin) PruneBundle(ctx context.Context, req *datastore.PruneBundleRequest) (resp *datastore.PruneBundleResponse, err error) {
if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) {
if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) {
resp, err = pruneBundle(tx, req, ds.log)
return err
}); err != nil {
Expand Down Expand Up @@ -282,7 +282,7 @@ func (ds *Plugin) ListAttestedNodes(ctx context.Context,
// UpdateAttestedNode updates the given node's cert serial and expiration.
func (ds *Plugin) UpdateAttestedNode(ctx context.Context,
req *datastore.UpdateAttestedNodeRequest) (resp *datastore.UpdateAttestedNodeResponse, err error) {
if err = ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) {
if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) {
resp, err = updateAttestedNode(tx, req)
return err
}); err != nil {
Expand Down Expand Up @@ -384,7 +384,7 @@ func (ds *Plugin) ListRegistrationEntries(ctx context.Context,
// UpdateRegistrationEntry updates an existing registration entry
func (ds *Plugin) UpdateRegistrationEntry(ctx context.Context,
req *datastore.UpdateRegistrationEntryRequest) (resp *datastore.UpdateRegistrationEntryResponse, err error) {
if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) {
if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) {
resp, err = updateRegistrationEntry(tx, req)
return err
}); err != nil {
Expand Down Expand Up @@ -561,14 +561,38 @@ func (*Plugin) GetPluginInfo(context.Context, *spi.GetPluginInfoRequest) (*spi.G
return &pluginInfo, nil
}

func (ds *Plugin) withWriteRepeatableReadTx(ctx context.Context, op func(tx *gorm.DB) error) error {
return ds.withTx(ctx, op, false, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
}

// withReadModifyWriteTx wraps the operation in a transaction appropriate for
// operations that will read one or more rows, change one or more columns in
// those rows, and then set them back. This requires a stronger level of
// consistency that prevents two transactions from doing read-modify-write
// concurrently.
func (ds *Plugin) withReadModifyWriteTx(ctx context.Context, op func(tx *gorm.DB) error) error {
isolationLevel := sql.LevelRepeatableRead
if ds.db.databaseType == MySQL {
// MySQL REPEATABLE READ is weaker than that of PostgreSQL. Namely,
// PostgreSQL, beyond providing the minimum consistency guarantees
// mandated for REPEATABLE READ in the standard, automatically fails
// concurrent transactions that try to update the same target row.
//
// MySQL SERIALIZABLE is the same as REPEATABLE READ except that it
// automatically converts `SELECT` to `SELECT ... LOCK FOR SHARE MODE`
// which "sets a shared lock that permits other transactions to read
// the examined rows but not to update or delete them", which is what
// we want.
isolationLevel = sql.LevelSerializable
}
return ds.withTx(ctx, op, false, &sql.TxOptions{Isolation: isolationLevel})
}

// withWriteTx wraps the operation in a transaction appropriate for operations
// that unconditionally create/update rows, without reading them first. If two
// transactions try and update at the same time, last writer wins.
func (ds *Plugin) withWriteTx(ctx context.Context, op func(tx *gorm.DB) error) error {
return ds.withTx(ctx, op, false, nil)
}

// withWriteTx wraps the operation in a transaction appropriate for operations
// that only read rows.
func (ds *Plugin) withReadTx(ctx context.Context, op func(tx *gorm.DB) error) error {
return ds.withTx(ctx, op, true, nil)
}
Expand Down Expand Up @@ -1514,6 +1538,7 @@ FROM attested_node_entries N

return builder.String(), args, nil
}

func updateAttestedNode(tx *gorm.DB, req *datastore.UpdateAttestedNodeRequest) (*datastore.UpdateAttestedNodeResponse, error) {
var model AttestedNode
if err := tx.Find(&model, "spiffe_id = ?", req.SpiffeId).Error; err != nil {
Expand Down