Skip to content

Commit

Permalink
remove the need for transaction ID
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Aug 3, 2023
1 parent 02a5e1e commit b059e91
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 81 deletions.
3 changes: 1 addition & 2 deletions datastore/concurrent_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type concurrentTxn struct {
}

// NewConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls
func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) {
func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) {
var rootTxn ds.Txn
var err error

Expand All @@ -54,7 +54,6 @@ func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uin
return &txn{
rootConcurentTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
Expand Down
8 changes: 4 additions & 4 deletions datastore/concurrent_txt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestNewConcurrentTxnFrom(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false)
txn, err := NewConcurrentTxnFrom(ctx, rootstore, false)
require.NoError(t, err)

err = txn.Commit(ctx)
Expand All @@ -44,15 +44,15 @@ func TestNewConcurrentTxnFromWithStoreClosed(t *testing.T) {
err = rootstore.Close()
require.NoError(t, err)

_, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false)
_, err = NewConcurrentTxnFrom(ctx, rootstore, false)
require.ErrorIs(t, err, badgerds.ErrClosed)
}

func TestNewConcurrentTxnFromNonIterable(t *testing.T) {
ctx := context.Background()
rootstore := memory.NewDatastore(ctx)

txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false)
txn, err := NewConcurrentTxnFrom(ctx, rootstore, false)
require.NoError(t, err)

err = txn.Commit(ctx)
Expand All @@ -66,7 +66,7 @@ func TestNewConcurrentTxnFromNonIterableWithStoreClosed(t *testing.T) {
err := rootstore.Close()
require.NoError(t, err)

_, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false)
_, err = NewConcurrentTxnFrom(ctx, rootstore, false)
require.ErrorIs(t, err, badgerds.ErrClosed)
}

Expand Down
14 changes: 1 addition & 13 deletions datastore/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import (
type Txn interface {
MultiStore

// ID returns the unique immutable identifier for this transaction.
ID() uint64

// Commit finalizes a transaction, attempting to commit it to the Datastore.
// May return an error if the transaction has gone stale. The presence of an
// error is an indication that the data was not committed to the Datastore.
Expand All @@ -49,8 +46,6 @@ type txn struct {
t ds.Txn
MultiStore

id uint64

successFns []func()
errorFns []func()
discardFns []func()
Expand All @@ -59,7 +54,7 @@ type txn struct {
var _ Txn = (*txn)(nil)

// NewTxnFrom returns a new Txn from the rootstore.
func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) {
func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) {
// check if our datastore natively supports iterable transaction, transactions or batching
if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok {
rootTxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly)
Expand All @@ -70,7 +65,6 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, reado
return &txn{
rootTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
Expand All @@ -87,18 +81,12 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, reado
return &txn{
rootTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
}, nil
}

// ID returns the unique immutable identifier for this transaction.
func (t *txn) ID() uint64 {
return t.id
}

// Commit finalizes a transaction, attempting to commit it to the Datastore.
func (t *txn) Commit(ctx context.Context) error {
if err := t.t.Commit(ctx); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions datastore/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewTxnFrom(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewTxnFrom(ctx, rootstore, 0, false)
txn, err := NewTxnFrom(ctx, rootstore, false)
require.NoError(t, err)

err = txn.Commit(ctx)
Expand All @@ -43,7 +43,7 @@ func TestNewTxnFromWithStoreClosed(t *testing.T) {
err = rootstore.Close()
require.NoError(t, err)

_, err = NewTxnFrom(ctx, rootstore, 0, false)
_, err = NewTxnFrom(ctx, rootstore, false)
require.ErrorIs(t, err, badgerds.ErrClosed)
}

Expand All @@ -53,7 +53,7 @@ func TestOnSuccess(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewTxnFrom(ctx, rootstore, 0, false)
txn, err := NewTxnFrom(ctx, rootstore, false)
require.NoError(t, err)

txn.OnSuccess(nil)
Expand All @@ -74,7 +74,7 @@ func TestOnError(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewTxnFrom(ctx, rootstore, 0, false)
txn, err := NewTxnFrom(ctx, rootstore, false)
require.NoError(t, err)

txn.OnError(nil)
Expand Down
15 changes: 2 additions & 13 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package db
import (
"context"
"sync"
"sync/atomic"

blockstore "github.com/ipfs/boxo/blockstore"
ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -51,13 +50,6 @@ const (
// DB is the main interface for interacting with the
// DefraDB storage system.
type db struct {
// The ID of the last transaction created.
//
// Warning: we currently rely on this prop being 64-bit aligned in memory
// relative to the start of the `db` struct (for atomic.Add calls). The
// easiest way to ensure this alignment is to declare it at the top.
previousTxnID uint64

glock sync.RWMutex

rootstore datastore.RootStore
Expand Down Expand Up @@ -158,15 +150,12 @@ func newDB(ctx context.Context, rootstore datastore.RootStore, options ...Option

// NewTxn creates a new transaction.
func (db *db) NewTxn(ctx context.Context, readonly bool) (datastore.Txn, error) {
txnId := atomic.AddUint64(&db.previousTxnID, 1)

return datastore.NewTxnFrom(ctx, db.rootstore, txnId, readonly)
return datastore.NewTxnFrom(ctx, db.rootstore, readonly)
}

// NewConcurrentTxn creates a new transaction that supports concurrent API calls.
func (db *db) NewConcurrentTxn(ctx context.Context, readonly bool) (datastore.Txn, error) {
txnId := atomic.AddUint64(&db.previousTxnID, 1)
return datastore.NewConcurrentTxnFrom(ctx, db.rootstore, txnId, readonly)
return datastore.NewConcurrentTxnFrom(ctx, db.rootstore, readonly)
}

// WithTxn returns a new [client.Store] that respects the given transaction.
Expand Down
2 changes: 0 additions & 2 deletions db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ func (vf *VersionedFetcher) Init(
vf.store, err = datastore.NewTxnFrom(
ctx,
vf.root,
// We can take the parent txn id here
txn.ID(),
false,
) // were going to discard and nuke this later
if err != nil {
Expand Down
39 changes: 1 addition & 38 deletions lens/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ type lensRegistry struct {
// lens configurations by source schema version ID
configs map[string]client.LensConfig
configLock sync.RWMutex

// Writable transaction contexts by transaction ID.
//
// Read-only transaction contexts are not tracked.
txnCtxs map[uint64]*txnContext
txnLock sync.RWMutex
}

// txnContext contains uncommitted transaction state tracked by the registry,
Expand Down Expand Up @@ -109,29 +103,16 @@ func NewRegistry(lensPoolSize immutable.Option[int], db TxnSource) client.LensRe
lensPoolsBySchemaVersionID: map[string]*lensPool{},
reversedPoolsBySchemaVersionID: map[string]*lensPool{},
configs: map[string]client.LensConfig{},
txnCtxs: map[uint64]*txnContext{},
},
}
}

func (r *lensRegistry) getCtx(txn datastore.Txn, readonly bool) *txnContext {
r.txnLock.RLock()
if txnCtx, ok := r.txnCtxs[txn.ID()]; ok {
r.txnLock.RUnlock()
return txnCtx
}
r.txnLock.RUnlock()

txnCtx := newTxnCtx(txn)
if readonly {
return txnCtx
}

r.txnLock.Lock()
r.txnCtxs[txn.ID()] = txnCtx
r.txnLock.Unlock()

txnCtx.txn.OnSuccess(func() {
txn.OnSuccess(func() {
r.poolLock.Lock()
for schemaVersionID, locker := range txnCtx.lensPoolsBySchemaVersionID {
r.lensPoolsBySchemaVersionID[schemaVersionID] = locker
Expand All @@ -146,24 +127,6 @@ func (r *lensRegistry) getCtx(txn datastore.Txn, readonly bool) *txnContext {
r.configs[schemaVersionID] = cfg
}
r.configLock.Unlock()

r.txnLock.Lock()
delete(r.txnCtxs, txn.ID())
r.txnLock.Unlock()
})

txn.OnError(func() {
r.txnLock.Lock()
delete(r.txnCtxs, txn.ID())
r.txnLock.Unlock()
})

txn.OnDiscard(func() {
// Delete it to help reduce the build up of memory, the txnCtx will be re-contructed if the
// txn is reused after discard.
r.txnLock.Lock()
delete(r.txnCtxs, txn.ID())
r.txnLock.Unlock()
})

return txnCtx
Expand Down
13 changes: 8 additions & 5 deletions lens/txn_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type implicitTxnLensRegistry struct {
type explicitTxnLensRegistry struct {
registry *lensRegistry
txn datastore.Txn
txnCtx *txnContext
}

var _ client.LensRegistry = (*implicitTxnLensRegistry)(nil)
Expand All @@ -36,13 +37,15 @@ func (r *implicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry
return &explicitTxnLensRegistry{
registry: r.registry,
txn: txn,
txnCtx: r.registry.getCtx(txn, false),
}
}

func (r *explicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry {
return &explicitTxnLensRegistry{
registry: r.registry,
txn: txn,
txnCtx: r.registry.getCtx(txn, false),
}
}

Expand All @@ -63,7 +66,7 @@ func (r *implicitTxnLensRegistry) SetMigration(ctx context.Context, cfg client.L
}

func (r *explicitTxnLensRegistry) SetMigration(ctx context.Context, cfg client.LensConfig) error {
return r.registry.setMigration(ctx, r.registry.getCtx(r.txn, false), cfg)
return r.registry.setMigration(ctx, r.txnCtx, cfg)
}

func (r *implicitTxnLensRegistry) ReloadLenses(ctx context.Context) error {
Expand All @@ -83,7 +86,7 @@ func (r *implicitTxnLensRegistry) ReloadLenses(ctx context.Context) error {
}

func (r *explicitTxnLensRegistry) ReloadLenses(ctx context.Context) error {
return r.registry.reloadLenses(ctx, r.registry.getCtx(r.txn, true))
return r.registry.reloadLenses(ctx, r.txnCtx)
}

func (r *implicitTxnLensRegistry) MigrateUp(
Expand All @@ -106,7 +109,7 @@ func (r *explicitTxnLensRegistry) MigrateUp(
src enumerable.Enumerable[LensDoc],
schemaVersionID string,
) (enumerable.Enumerable[map[string]any], error) {
return r.registry.migrateUp(r.registry.getCtx(r.txn, true), src, schemaVersionID)
return r.registry.migrateUp(r.txnCtx, src, schemaVersionID)
}

func (r *implicitTxnLensRegistry) MigrateDown(
Expand Down Expand Up @@ -144,7 +147,7 @@ func (r *implicitTxnLensRegistry) Config(ctx context.Context) ([]client.LensConf
}

func (r *explicitTxnLensRegistry) Config(ctx context.Context) ([]client.LensConfig, error) {
return r.registry.config(r.registry.getCtx(r.txn, true)), nil
return r.registry.config(r.txnCtx), nil
}

func (r *implicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersionID string) (bool, error) {
Expand All @@ -159,5 +162,5 @@ func (r *implicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersio
}

func (r *explicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersionID string) (bool, error) {
return r.registry.hasMigration(r.registry.getCtx(r.txn, true), schemaVersionID), nil
return r.registry.hasMigration(r.txnCtx, schemaVersionID), nil
}

0 comments on commit b059e91

Please sign in to comment.