Skip to content

Commit

Permalink
fix: Improve the way migrations handle transactions (sourcenetwork#1737)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves sourcenetwork#1649 sourcenetwork#1592

## Description

Improves the way migrations handle transactions, as well as fixing a
couple of concurrency issues:

- Adds locks around the various registry properties, these maps can be
accessed concurrently and need to be protected.
- Removes the transaction continuity issue in the client.LenRegistry
interface, where db.LensRegistry() returns an object that does not
respect the transactionality of the parent store, and takes `txn`s as
input parameters to some of its functions. It does this by following the
same pattern as `db.db`. (sourcenetwork#1649)
- Fixes the bugs in the lens package where migrations set were not
visible/accessible until after commit. They are now visible within the
transaction scope. (sourcenetwork#1592)

It still does not provide transaction snapshot isolation, I see that
issue as relatively high effort low reward at the moment.
  • Loading branch information
AndrewSisley authored Aug 3, 2023
1 parent de10564 commit a832f04
Show file tree
Hide file tree
Showing 18 changed files with 529 additions and 124 deletions.
16 changes: 2 additions & 14 deletions api/http/handlerfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,26 +298,14 @@ func setMigrationHandler(rw http.ResponseWriter, req *http.Request) {
return
}

txn, err := db.NewTxn(req.Context(), false)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
}

var cfg client.LensConfig
err = json.Unmarshal(cfgStr, &cfg)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
}

err = db.LensRegistry().SetMigration(req.Context(), txn, cfg)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
}

err = txn.Commit(req.Context())
err = db.LensRegistry().SetMigration(req.Context(), cfg)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
Expand All @@ -338,7 +326,7 @@ func getMigrationHandler(rw http.ResponseWriter, req *http.Request) {
return
}

cfgs := db.LensRegistry().Config()
cfgs, err := db.LensRegistry().Config(req.Context())
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
Expand Down
26 changes: 20 additions & 6 deletions client/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type LensConfig struct {
// LensRegistry exposes several useful thread-safe migration related functions which may
// be used to manage migrations.
type LensRegistry interface {
// WithTxn returns a new LensRegistry scoped to the given transaction.
//
// WARNING: Currently this does not provide snapshot isolation, if other transactions are commited
// after this has been created, the results of those commits will be visible within this scope.
WithTxn(datastore.Txn) LensRegistry

// SetMigration sets the migration for the given source-destination schema version IDs. Is equivilent to
// calling `Store.SetMigration(ctx, cfg)`.
//
Expand All @@ -55,29 +61,37 @@ type LensRegistry interface {
//
// Migrations will only run if there is a complete path from the document schema version to the latest local
// schema version.
SetMigration(context.Context, datastore.Txn, LensConfig) error
SetMigration(context.Context, LensConfig) error

// ReloadLenses clears any cached migrations, loads their configurations from the database and re-initializes
// them. It is run on database start if the database already existed.
ReloadLenses(ctx context.Context, txn datastore.Txn) error
ReloadLenses(context.Context) error

// MigrateUp returns an enumerable that feeds the given source through the Lens migration for the given
// schema version id if one is found, if there is no matching migration the given source will be returned.
MigrateUp(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error)
MigrateUp(
context.Context,
enumerable.Enumerable[map[string]any],
string,
) (enumerable.Enumerable[map[string]any], error)

// MigrateDown returns an enumerable that feeds the given source through the Lens migration for the schema
// version that precedes the given schema version id in reverse, if one is found, if there is no matching
// migration the given source will be returned.
//
// This downgrades any documents in the source enumerable if/when enumerated.
MigrateDown(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error)
MigrateDown(
context.Context,
enumerable.Enumerable[map[string]any],
string,
) (enumerable.Enumerable[map[string]any], error)

// Config returns a slice of the configurations of the currently loaded migrations.
//
// Modifying the slice does not affect the loaded configurations.
Config() []LensConfig
Config(context.Context) ([]LensConfig, error)

// HasMigration returns true if there is a migration registered for the given schema version id, otherwise
// will return false.
HasMigration(string) bool
HasMigration(context.Context, string) (bool, error)
}
4 changes: 3 additions & 1 deletion datastore/concurrent_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type concurrentTxn struct {
}

// NewConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls
func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) {
func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) {
var rootTxn ds.Txn
var err error

Expand All @@ -54,6 +54,8 @@ func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readon
return &txn{
rootConcurentTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
}, nil
Expand Down
8 changes: 4 additions & 4 deletions datastore/concurrent_txt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestNewConcurrentTxnFrom(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewConcurrentTxnFrom(ctx, rootstore, false)
txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

err = txn.Commit(ctx)
Expand All @@ -44,15 +44,15 @@ func TestNewConcurrentTxnFromWithStoreClosed(t *testing.T) {
err = rootstore.Close()
require.NoError(t, err)

_, err = NewConcurrentTxnFrom(ctx, rootstore, false)
_, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false)
require.ErrorIs(t, err, badgerds.ErrClosed)
}

func TestNewConcurrentTxnFromNonIterable(t *testing.T) {
ctx := context.Background()
rootstore := memory.NewDatastore(ctx)

txn, err := NewConcurrentTxnFrom(ctx, rootstore, false)
txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

err = txn.Commit(ctx)
Expand All @@ -66,7 +66,7 @@ func TestNewConcurrentTxnFromNonIterableWithStoreClosed(t *testing.T) {
err := rootstore.Close()
require.NoError(t, err)

_, err = NewConcurrentTxnFrom(ctx, rootstore, false)
_, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false)
require.ErrorIs(t, err, badgerds.ErrClosed)
}

Expand Down
74 changes: 74 additions & 0 deletions datastore/mocks/txn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 32 additions & 8 deletions datastore/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
type Txn interface {
MultiStore

// ID returns the unique immutable identifier for this transaction.
ID() uint64

// Commit finalizes a transaction, attempting to commit it to the Datastore.
// May return an error if the transaction has gone stale. The presence of an
// error is an indication that the data was not committed to the Datastore.
Expand All @@ -32,22 +35,31 @@ type Txn interface {
// state of the Datastore, making it safe to defer.
Discard(ctx context.Context)

// OnSuccess registers a function to be called when the transaction is committed.
OnSuccess(fn func())

// OnError registers a function to be called when the transaction is rolled back.
OnError(fn func())

// OnDiscard registers a function to be called when the transaction is discarded.
OnDiscard(fn func())
}

type txn struct {
t ds.Txn
MultiStore

id uint64

successFns []func()
errorFns []func()
discardFns []func()
}

var _ Txn = (*txn)(nil)

// NewTxnFrom returns a new Txn from the rootstore.
func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) {
func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) {
// check if our datastore natively supports iterable transaction, transactions or batching
if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok {
rootTxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly)
Expand All @@ -58,6 +70,8 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (
return &txn{
rootTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
}, nil
Expand All @@ -73,24 +87,32 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (
return &txn{
rootTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
}, nil
}

// ID returns the unique immutable identifier for this transaction.
func (t *txn) ID() uint64 {
return t.id
}

// Commit finalizes a transaction, attempting to commit it to the Datastore.
func (t *txn) Commit(ctx context.Context) error {
if err := t.t.Commit(ctx); err != nil {
t.runErrorFns(ctx)
runFns(t.errorFns)
return err
}
t.runSuccessFns(ctx)
runFns(t.successFns)
return nil
}

// Discard throws away changes recorded in a transaction without committing.
func (t *txn) Discard(ctx context.Context) {
t.t.Discard(ctx)
runFns(t.discardFns)
}

// OnSuccess registers a function to be called when the transaction is committed.
Expand All @@ -109,14 +131,16 @@ func (txn *txn) OnError(fn func()) {
txn.errorFns = append(txn.errorFns, fn)
}

func (txn *txn) runErrorFns(ctx context.Context) {
for _, fn := range txn.errorFns {
fn()
// OnDiscard registers a function to be called when the transaction is discarded.
func (txn *txn) OnDiscard(fn func()) {
if fn == nil {
return
}
txn.discardFns = append(txn.discardFns, fn)
}

func (txn *txn) runSuccessFns(ctx context.Context) {
for _, fn := range txn.successFns {
func runFns(fns []func()) {
for _, fn := range fns {
fn()
}
}
Expand Down
8 changes: 4 additions & 4 deletions datastore/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewTxnFrom(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewTxnFrom(ctx, rootstore, false)
txn, err := NewTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

err = txn.Commit(ctx)
Expand All @@ -43,7 +43,7 @@ func TestNewTxnFromWithStoreClosed(t *testing.T) {
err = rootstore.Close()
require.NoError(t, err)

_, err = NewTxnFrom(ctx, rootstore, false)
_, err = NewTxnFrom(ctx, rootstore, 0, false)
require.ErrorIs(t, err, badgerds.ErrClosed)
}

Expand All @@ -53,7 +53,7 @@ func TestOnSuccess(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewTxnFrom(ctx, rootstore, false)
txn, err := NewTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

txn.OnSuccess(nil)
Expand All @@ -74,7 +74,7 @@ func TestOnError(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewTxnFrom(ctx, rootstore, false)
txn, err := NewTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

txn.OnError(nil)
Expand Down
Loading

0 comments on commit a832f04

Please sign in to comment.