diff --git a/pkg/kv/kvtest/store.go b/pkg/kv/kvtest/store.go index 12a9f701a3d..85a5d719014 100644 --- a/pkg/kv/kvtest/store.go +++ b/pkg/kv/kvtest/store.go @@ -67,6 +67,11 @@ func DriverTest(t *testing.T, ms MakeStore) { t.Run("SecondaryIterator", func(t *testing.T) { testSecondaryIterator(t, ms) }) } +func DriverTransactionsTest(t *testing.T, ms MakeStore) { + t.Helper() + t.Run("Transactions", func(t *testing.T) { testTransactions(t, ms) }) +} + func testDriverOpen(t *testing.T, ms MakeStore) { ctx := context.Background() store1 := ms(t, ctx) diff --git a/pkg/kv/kvtest/transactions.go b/pkg/kv/kvtest/transactions.go new file mode 100644 index 00000000000..b654d56464f --- /dev/null +++ b/pkg/kv/kvtest/transactions.go @@ -0,0 +1,151 @@ +package kvtest + +import ( + "bytes" + "context" + "errors" + "testing" + "time" + + "github.com/treeverse/lakefs/pkg/kv" + + "github.com/cenkalti/backoff/v4" +) + +func testTransactions(t *testing.T, ms MakeStore) { + ctx := context.Background() + store := ms(t, ctx) + + txnStore, ok := store.(kv.TransactionerStore) + if !ok { + t.Fatalf("Store %s is not a Transactioner", store) + } + + t.Run("simple", func(t *testing.T) { testSimpleTransaction(t, ctx, txnStore) }) + t.Run("raceRetry", func(t *testing.T) { testRaceRetry(t, ctx, txnStore) }) + // TODO(ariels): Test Scan. + + // TODO(ariels): Test retries (failures) also when racing against the Store API. (This + // is not required to test "local", which uses the same underlying API to perform _all_ + // modifications!) +} + +var ( + key1 = []byte("key1") + value1 = []byte("the first value") + value2 = []byte("a second value") +) + +func partition(t testing.TB) []byte { + return []byte(t.Name()) +} + +func tryOnce() kv.TransactionOpts { + return kv.TransactionOpts{Backoff: backoff.WithMaxRetries(&backoff.ZeroBackOff{}, 0)} +} + +func tryMany() kv.TransactionOpts { + const maxDuration = 5 * time.Second + return kv.TransactionOpts{ + Backoff: backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(maxDuration)), + } +} + +// testSimpleTransaction tests a single transaction works, with no races. +func testSimpleTransaction(t testing.TB, ctx context.Context, tx kv.Transactioner) { + // Load some data. + err := tx.Transact(ctx, func(op kv.Operations) error { + err := op.Set(ctx, partition(t), key1, value1) + if err != nil { + return err + } + + value, err := op.Get(ctx, partition(t), key1) + if err != nil { + return err + } + if !bytes.Equal(value, value1) { + t.Errorf("Got %s not %s on key %s", string(value), string(value1), string(key1)) + } + return nil + }, tryOnce()) + if err != nil { + t.Fatalf("Transaction failed: %s", err) + } + + // Verify it again, on another transaction. + err = tx.Transact(ctx, func(op kv.Operations) error { + value, err := op.Get(ctx, partition(t), key1) + if err != nil { + return err + } + if !bytes.Equal(value, value1) { + t.Errorf("Got %s not %s on key %s", string(value), string(value1), string(key1)) + } + return nil + }, tryOnce()) + if err != nil { + t.Fatalf("Transaction failed: %s", err) + } +} + +func testRaceRetry(t testing.TB, ctx context.Context, tx kv.Transactioner) { + const ( + expectedIterations = 2 + ) + var ( + ch1 = make(chan struct{}) + ch2 = make(chan struct{}) + err error + ) + + go func() { + // This transaction runs between the 2 iterations of the main transaction, and + // always succeeds. + err = tx.Transact(ctx, func(op kv.Operations) error { + // Wait for the main transaction to set its value. + <-ch1 + err = op.Set(ctx, partition(t), key1, value2) + if err != nil { + return err + } + return nil + }, tryOnce()) + if err != nil { + t.Errorf("Middle transaction failed: %s", err) + } + // Now release the main transaction, so it sees that key1 changed and tries again. + close(ch2) + }() + + iteration := 0 + // The main transaction. The first time it sets a value and sleeps before reading it, + // giving time for the auxiliary transaction above to change the value and fail it. The + // second time it does not need to wait, and should succeed. + err = tx.Transact(ctx, func(op kv.Operations) error { + // Create a dependency by reading the value. + _, err = op.Get(ctx, partition(t), key1) + if err != nil && !errors.Is(err, kv.ErrNotFound) { + return err + } + err = op.Set(ctx, partition(t), key1, value1) + if err != nil { + return err + } + if iteration == 0 { + // Release the auxiliary transaction. + close(ch1) + // Wait for it to finish. + <-ch2 + } + iteration++ + return nil + }, tryMany()) + + if err != nil { + t.Error(err) + } + if iteration != expectedIterations { + t.Errorf("Main transaction ran %d != 2 times", iteration) + } +} diff --git a/pkg/kv/local/store.go b/pkg/kv/local/store.go index db1bb983ab7..749142c79d9 100644 --- a/pkg/kv/local/store.go +++ b/pkg/kv/local/store.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "errors" + "fmt" "time" + "github.com/cenkalti/backoff/v4" "github.com/dgraph-io/badger/v4" "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/logging" @@ -34,11 +36,15 @@ type Store struct { path string } -func (s *Store) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWithPredicate, error) { - k := composeKey(partitionKey, key) - start := time.Now() - log := s.logger.WithField("key", string(k)).WithField("op", "get").WithContext(ctx) - log.Trace("performing operation") +var _ kv.TransactionerStore = (*Store)(nil) + +func getFromTxn(ctx context.Context, log logging.Logger, txn *badger.Txn, partitionKey, key []byte) ([]byte, error) { + log = log.WithContext(ctx).WithFields(logging.Fields{ + "partition": string(partitionKey), + "key": string(key), + "op": "get", + }) + if len(partitionKey) == 0 { log.WithError(kv.ErrMissingPartitionKey).Warn("got empty partition key") return nil, kv.ErrMissingPartitionKey @@ -48,24 +54,34 @@ func (s *Store) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWit return nil, kv.ErrMissingKey } + start := time.Now() + k := composeKey(partitionKey, key) + log.Trace("performing operation") + item, err := txn.Get(k) + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, kv.ErrNotFound + } + if err != nil { + log.WithError(err).Error("error getting key") + return nil, err + } + value, err := item.ValueCopy(nil) + if err != nil { + err = fmt.Errorf("extract key: %w", err) + log.WithError(err).Error("operation failed") + return nil, err + } + log.WithField("took", time.Since(start)).WithError(err).WithField("size", len(value)).Trace("operation complete") + return value, nil +} + +func (s *Store) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWithPredicate, error) { var value []byte err := s.db.View(func(txn *badger.Txn) error { - item, err := txn.Get(k) - if errors.Is(err, badger.ErrKeyNotFound) { - return kv.ErrNotFound - } - if err != nil { - log.WithError(err).Error("error getting key") - return err - } - value, err = item.ValueCopy(nil) - if err != nil { - log.WithError(err).Error("error getting value for key") - return err - } - return nil + var err error + value, err = getFromTxn(ctx, s.logger, txn, partitionKey, key) + return err }) - log.WithField("took", time.Since(start)).WithError(err).WithField("size", len(value)).Trace("operation complete") if err != nil { return nil, err } @@ -75,10 +91,13 @@ func (s *Store) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWit }, nil } -func (s *Store) Set(ctx context.Context, partitionKey, key, value []byte) error { +func setFromTxn(ctx context.Context, log logging.Logger, txn *badger.Txn, partitionKey, key, value []byte) error { k := composeKey(partitionKey, key) start := time.Now() - log := s.logger.WithField("key", string(k)).WithField("op", "set").WithContext(ctx) + log = log.WithFields(logging.Fields{ + "key": string(k), + "op": "set", + }).WithContext(ctx) log.Trace("performing operation") if len(partitionKey) == 0 { log.WithError(kv.ErrMissingPartitionKey).Warn("got empty partition key") @@ -92,9 +111,7 @@ func (s *Store) Set(ctx context.Context, partitionKey, key, value []byte) error log.WithError(kv.ErrMissingValue).Warn("got nil value") return kv.ErrMissingValue } - err := s.db.Update(func(txn *badger.Txn) error { - return txn.Set(k, value) - }) + err := txn.Set(k, value) if err != nil { log.WithError(err).Error("error setting value") return err @@ -103,6 +120,12 @@ func (s *Store) Set(ctx context.Context, partitionKey, key, value []byte) error return nil } +func (s *Store) Set(ctx context.Context, partitionKey, key, value []byte) error { + return s.db.Update(func(txn *badger.Txn) error { + return setFromTxn(ctx, s.logger, txn, partitionKey, key, value) + }) +} + func (s *Store) SetIf(ctx context.Context, partitionKey, key, value []byte, valuePredicate kv.Predicate) error { k := composeKey(partitionKey, key) start := time.Now() @@ -161,13 +184,13 @@ func (s *Store) SetIf(ctx context.Context, partitionKey, key, value []byte, valu return err } -func (s *Store) Delete(ctx context.Context, partitionKey, key []byte) error { +func deleteFromTxn(ctx context.Context, log logging.Logger, txn *badger.Txn, partitionKey, key []byte) error { k := composeKey(partitionKey, key) start := time.Now() - log := s.logger. - WithField("key", string(k)). - WithField("op", "delete"). - WithContext(ctx) + log = log.WithFields(logging.Fields{ + "key": string(k), + "op": "delete", + }).WithContext(ctx) log.Trace("performing operation") if len(partitionKey) == 0 { log.WithError(kv.ErrMissingPartitionKey).Warn("got empty partition key") @@ -177,9 +200,7 @@ func (s *Store) Delete(ctx context.Context, partitionKey, key []byte) error { log.WithError(kv.ErrMissingKey).Warn("got empty key") return kv.ErrMissingKey } - err := s.db.Update(func(txn *badger.Txn) error { - return txn.Delete(k) - }) + err := txn.Delete(k) took := time.Since(start) log = log.WithField("took", took) if err != nil { @@ -190,8 +211,14 @@ func (s *Store) Delete(ctx context.Context, partitionKey, key []byte) error { return nil } -func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOptions) (kv.EntriesIterator, error) { - log := s.logger.WithFields(logging.Fields{ +func (s *Store) Delete(ctx context.Context, partitionKey, key []byte) error { + return s.db.Update(func(txn *badger.Txn) error { + return deleteFromTxn(ctx, s.logger, txn, partitionKey, key) + }) +} + +func scanFromTxn(ctx context.Context, log logging.Logger, txn *badger.Txn, partitionKey []byte, prefetchSize int, options kv.ScanOptions) (kv.EntriesIterator, error) { + log = log.WithFields(logging.Fields{ "partition_key": string(partitionKey), "start_key": string(options.KeyStart), "op": "scan", @@ -203,9 +230,8 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp } prefix := partitionRange(partitionKey) - txn := s.db.NewTransaction(false) opts := badger.DefaultIteratorOptions - opts.PrefetchSize = s.prefetchSize + opts.PrefetchSize = prefetchSize if options.BatchSize != 0 && opts.PrefetchSize != 0 && options.BatchSize < opts.PrefetchSize { opts.PrefetchSize = options.BatchSize } @@ -223,6 +249,11 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp }, nil } +func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOptions) (kv.EntriesIterator, error) { + txn := s.db.NewTransaction(false) + return scanFromTxn(ctx, s.logger, txn, partitionKey, s.prefetchSize, options) +} + func (s *Store) Close() { driverLock.Lock() defer driverLock.Unlock() @@ -232,3 +263,53 @@ func (s *Store) Close() { delete(dbMap, s.path) } } + +func (s *Store) Transact(ctx context.Context, fn func(operations kv.Operations) error, opts kv.TransactionOpts) error { + for { + err := s.db.Update(func(txn *badger.Txn) error { + return fn(s.newBadgerOperations(txn)) + }) + if err == nil || !errors.Is(err, badger.ErrConflict) { + // TODO(ariels): Wrap err in a kv-ish error. + return err + } + if opts.Backoff != nil { + duration := opts.Backoff.NextBackOff() + if duration == backoff.Stop { + break + } + time.Sleep(duration) + } + } + return kv.ErrConflict +} + +func (s *Store) newBadgerOperations(txn *badger.Txn) *operations { + return &operations{ + s.logger.WithField("txn", time.Now().String()), + txn, + s.prefetchSize, + } +} + +type operations struct { + logger logging.Logger + txn *badger.Txn + prefetchSize int +} + +func (op *operations) Get(ctx context.Context, partitionKey, key []byte) ([]byte, error) { + return getFromTxn(ctx, op.logger, op.txn, partitionKey, key) +} + +func (op *operations) Set(ctx context.Context, partitionKey, key, value []byte) error { + return setFromTxn(ctx, op.logger, op.txn, partitionKey, key, value) +} + +func (op *operations) Delete(ctx context.Context, partitionKey, key []byte) error { + return deleteFromTxn(ctx, op.logger, op.txn, partitionKey, key) +} + +func (op *operations) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOptions) (kv.EntriesIterator, error) { + return scanFromTxn(ctx, op.logger, op.txn, partitionKey, op.prefetchSize, options) +} diff --git a/pkg/kv/local/store_test.go b/pkg/kv/local/store_test.go index 79299032a35..b527902d406 100644 --- a/pkg/kv/local/store_test.go +++ b/pkg/kv/local/store_test.go @@ -10,20 +10,23 @@ import ( "github.com/treeverse/lakefs/pkg/kv/local" ) -func TestLocalKV(t *testing.T) { - kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { - t.Helper() - store, err := kv.Open(ctx, kvparams.Config{ - Type: local.DriverName, - Local: &kvparams.Local{ - Path: t.TempDir(), - EnableLogging: true, - }, - }) - if err != nil { - t.Fatalf("failed to open kv '%s' store: %s", local.DriverName, err) - } - t.Cleanup(store.Close) - return store +func makeStore(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + store, err := kv.Open(ctx, kvparams.Config{ + Type: local.DriverName, + Local: &kvparams.Local{ + Path: t.TempDir(), + EnableLogging: true, + }, }) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", local.DriverName, err) + } + t.Cleanup(store.Close) + return store +} + +func TestLocalKV(t *testing.T) { + kvtest.DriverTest(t, makeStore) + kvtest.DriverTransactionsTest(t, makeStore) } diff --git a/pkg/kv/metrics.go b/pkg/kv/metrics.go index c620cc399d2..a7d6d54e9a9 100644 --- a/pkg/kv/metrics.go +++ b/pkg/kv/metrics.go @@ -95,6 +95,23 @@ func (s *StoreMetricsWrapper) Close() { s.Store.Close() } -func storeMetrics(store Store, storeType string) *StoreMetricsWrapper { +// StoreTransactionerMetricsWrapper wraps any StoreTransactioner with metrics +type StoreTransactionerMetricsWrapper struct { + StoreMetricsWrapper + Transactioner +} + +func (s *StoreTransactionerMetricsWrapper) Transact(ctx context.Context, fn func(operations Operations) error, opts TransactionOpts) error { + // TODO(ariels): Wrap the passed-in Operations struct to measure _its_ latenceis. + return s.Transactioner.Transact(ctx, fn, opts) +} + +func storeMetrics(store Store, storeType string) Store { + if transactionStore, ok := store.(TransactionerStore); ok { + return &StoreTransactionerMetricsWrapper{ + StoreMetricsWrapper: StoreMetricsWrapper{Store: transactionStore, StoreType: storeType}, + Transactioner: transactionStore, + } + } return &StoreMetricsWrapper{Store: store, StoreType: storeType} } diff --git a/pkg/kv/mock/store.go b/pkg/kv/mock/store.go index b92f3f5de56..607c3c848dc 100644 --- a/pkg/kv/mock/store.go +++ b/pkg/kv/mock/store.go @@ -74,6 +74,101 @@ func (m *MockPredicate) EXPECT() *MockPredicateMockRecorder { return m.recorder } +// MockOperations is a mock of Operations interface. +type MockOperations struct { + ctrl *gomock.Controller + recorder *MockOperationsMockRecorder +} + +// MockOperationsMockRecorder is the mock recorder for MockOperations. +type MockOperationsMockRecorder struct { + mock *MockOperations +} + +// NewMockOperations creates a new mock instance. +func NewMockOperations(ctrl *gomock.Controller) *MockOperations { + mock := &MockOperations{ctrl: ctrl} + mock.recorder = &MockOperationsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockOperations) EXPECT() *MockOperationsMockRecorder { + return m.recorder +} + +// Delete mocks base method. +func (m *MockOperations) Delete(ctx context.Context, partitionKey, key []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", ctx, partitionKey, key) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete. +func (mr *MockOperationsMockRecorder) Delete(ctx, partitionKey, key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockOperations)(nil).Delete), ctx, partitionKey, key) +} + +// Get mocks base method. +func (m *MockOperations) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWithPredicate, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", ctx, partitionKey, key) + ret0, _ := ret[0].(*kv.ValueWithPredicate) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockOperationsMockRecorder) Get(ctx, partitionKey, key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockOperations)(nil).Get), ctx, partitionKey, key) +} + +// Scan mocks base method. +func (m *MockOperations) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOptions) (kv.EntriesIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Scan", ctx, partitionKey, options) + ret0, _ := ret[0].(kv.EntriesIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Scan indicates an expected call of Scan. +func (mr *MockOperationsMockRecorder) Scan(ctx, partitionKey, options interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Scan", reflect.TypeOf((*MockOperations)(nil).Scan), ctx, partitionKey, options) +} + +// Set mocks base method. +func (m *MockOperations) Set(ctx context.Context, partitionKey, key, value []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Set", ctx, partitionKey, key, value) + ret0, _ := ret[0].(error) + return ret0 +} + +// Set indicates an expected call of Set. +func (mr *MockOperationsMockRecorder) Set(ctx, partitionKey, key, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockOperations)(nil).Set), ctx, partitionKey, key, value) +} + +// SetIf mocks base method. +func (m *MockOperations) SetIf(ctx context.Context, partitionKey, key, value []byte, valuePredicate kv.Predicate) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetIf", ctx, partitionKey, key, value, valuePredicate) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetIf indicates an expected call of SetIf. +func (mr *MockOperationsMockRecorder) SetIf(ctx, partitionKey, key, value, valuePredicate interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIf", reflect.TypeOf((*MockOperations)(nil).SetIf), ctx, partitionKey, key, value, valuePredicate) +} + // MockStore is a mock of Store interface. type MockStore struct { ctrl *gomock.Controller @@ -181,6 +276,43 @@ func (mr *MockStoreMockRecorder) SetIf(ctx, partitionKey, key, value, valuePredi return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIf", reflect.TypeOf((*MockStore)(nil).SetIf), ctx, partitionKey, key, value, valuePredicate) } +// MockTransactioner is a mock of Transactioner interface. +type MockTransactioner struct { + ctrl *gomock.Controller + recorder *MockTransactionerMockRecorder +} + +// MockTransactionerMockRecorder is the mock recorder for MockTransactioner. +type MockTransactionerMockRecorder struct { + mock *MockTransactioner +} + +// NewMockTransactioner creates a new mock instance. +func NewMockTransactioner(ctrl *gomock.Controller) *MockTransactioner { + mock := &MockTransactioner{ctrl: ctrl} + mock.recorder = &MockTransactionerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTransactioner) EXPECT() *MockTransactionerMockRecorder { + return m.recorder +} + +// Transact mocks base method. +func (m *MockTransactioner) Transact(ctx context.Context, fn func(kv.Operations) error, opts kv.TransactionOpts) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Transact", ctx, fn, opts) + ret0, _ := ret[0].(error) + return ret0 +} + +// Transact indicates an expected call of Transact. +func (mr *MockTransactionerMockRecorder) Transact(ctx, fn, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Transact", reflect.TypeOf((*MockTransactioner)(nil).Transact), ctx, fn, opts) +} + // MockEntriesIterator is a mock of EntriesIterator interface. type MockEntriesIterator struct { ctrl *gomock.Controller diff --git a/pkg/kv/store.go b/pkg/kv/store.go index e5fb576db26..1b830a4bc58 100644 --- a/pkg/kv/store.go +++ b/pkg/kv/store.go @@ -10,6 +10,7 @@ import ( "strings" "sync" + "github.com/cenkalti/backoff/v4" "github.com/treeverse/lakefs/pkg/kv/kvparams" ) @@ -37,6 +38,7 @@ var ( ErrMissingValue = errors.New("missing value") ErrNotFound = errors.New("not found") ErrPredicateFailed = errors.New("predicate failed") + ErrConflict = fmt.Errorf("%w (giving up)", ErrPredicateFailed) ErrSetupFailed = errors.New("setup failed") ErrUnknownDriver = errors.New("unknown driver") ErrTableNotActive = errors.New("table not active") @@ -85,6 +87,14 @@ type ScanOptions struct { BatchSize int } +// Operations are a simple(r) KV CRUD, exposed to transactions. In particular, there are no predicates in transactions. +type Operations interface { + Get(ctx context.Context, partitionKey, key []byte) ([]byte, error) + Set(ctx context.Context, partitionKey, key, value []byte) error + Delete(ctx context.Context, partitionKey, key []byte) error + Scan(ctx context.Context, partitionKey []byte, options ScanOptions) (EntriesIterator, error) +} + type Store interface { // Get returns a result containing the Value and Predicate for the given key, or ErrNotFound if key doesn't exist // Predicate can be used for SetIf operation @@ -110,6 +120,25 @@ type Store interface { Close() } +type TransactionOpts struct { + // Backoff could become an exponential backoff. + Backoff backoff.BackOff +} + +// Transactioner is implemented by some Stores. +type Transactioner interface { + // Transact runs fn as a single transaction. Typically it uses + // optimistic concurrency: It runs fn and trie to commit it + // atomically. If any keys read by fn change while it is running, + // it may retry according to opts. + Transact(ctx context.Context, fn func(operations Operations) error, opts TransactionOpts) error +} + +type TransactionerStore interface { + Store + Transactioner +} + // EntriesIterator used to enumerate over Scan results type EntriesIterator interface { // Next should be called first before access Entry.