Skip to content

Conversation

@arielshaqed
Copy link
Contributor

Infrastructure change for use elsewhere. Zero cost if you don't use it, so should be okay.

@arielshaqed arielshaqed added area/integrations exclude-changelog PR description should not be included in next release changelog area/KV Improvements to the KV store implementation minor-change Used for PRs that don't require issue attached labels Oct 16, 2025
@arielshaqed arielshaqed marked this pull request as draft October 16, 2025 15:28
@arielshaqed
Copy link
Contributor Author

Draft because I don't yet know we'll decide to use it.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds optimistic transactional support to the local KV store and wraps Transactioner-capable stores with metrics. It also introduces a transaction-focused Operations interface and accompanying tests.

  • Introduce Transactioner API (Operations, TransactionOpts with backoff, Transactioner/TransactionerStore) and ErrConflict
  • Implement local-store transactions (txn-aware Get/Set/Delete/Scan) with retry/backoff; add metrics wrapper for Transactioner
  • Add transaction tests and driver harness; add gomock types for Operations/Transactioner

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
pkg/kv/store.go Adds Operations, TransactionOpts, Transactioner/TransactionerStore, and ErrConflict to enable transactional APIs.
pkg/kv/local/store.go Implements transaction-aware helpers, Transact with retry/backoff, and uses txn-aware versions for CRUD/Scan.
pkg/kv/metrics.go Adds StoreTransactionerMetricsWrapper and updates storeMetrics to return a wrapper preserving Transactioner.
pkg/kv/local/store_test.go Refactors test setup and adds DriverTransactionsTest invocation.
pkg/kv/kvtest/transactions.go New transaction tests (simple and retry-on-conflict).
pkg/kv/kvtest/store.go Adds DriverTransactionsTest helper.
pkg/kv/mock/store.go Adds gomock types for Operations and Transactioner (but signatures need correction).

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +267 to +284
func (s *Store) Transact(ctx context.Context, fn func(operations kv.Operations) error, opts kv.TransactionOpts) error {
for {
err := s.db.Update(func(txn *badger.Txn) error {
return fn(s.newBadgerOperations(txn))
})
if err == nil || !errors.Is(err, badger.ErrConflict) {
// TODO(ariels): Wrap err in a kv-ish error.
return err
}
if opts.Backoff != nil {
duration := opts.Backoff.NextBackOff()
if duration == backoff.Stop {
break
}
time.Sleep(duration)
}
}
return kv.ErrConflict
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When opts.Backoff is nil, this loop retries indefinitely on badger.ErrConflict causing a potential busy-wait. Either attempt once or return kv.ErrConflict immediately if no backoff is configured.

Copilot uses AI. Check for mistakes.
Comment on lines 48 to 50
return kv.TransactionOpts{
Backoff: backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(5 * time.Second)),
}
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backoff.NewExponentialBackOff does not take functional options in v4; this will not compile. Initialize then set MaxElapsedTime instead, e.g.: b := backoff.NewExponentialBackOff(); b.MaxElapsedTime = 5 * time.Second; return kv.TransactionOpts{Backoff: b}.

Suggested change
return kv.TransactionOpts{
Backoff: backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(5 * time.Second)),
}
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 5 * time.Second
return kv.TransactionOpts{Backoff: b}

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +113
err error
)

go func() {
// This transaction runs between the 2 iterations of the main transaction, and
// always succeeds.
err = tx.Transact(ctx, func(op kv.Operations) error {
// Wait for the main transaction to set its value.
<-ch1
err = op.Set(ctx, partition(t), key1, value2)
if err != nil {
return err
}
return nil
}, tryOnce())
if err != nil {
t.Errorf("Middle transaction failed: %s", err)
}
// Now release the main transaction, so it sees that key1 changed and tries again.
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared variable err is written in the goroutine and also used in the outer scope, causing a data race. Use a separate variable inside the goroutine and report it via a channel or waitgroup, e.g., errCh := make(chan error, 1); defer close(errCh); errCh <- localErr; and check after the goroutine completes.

Copilot uses AI. Check for mistakes.
Comment on lines +115 to +118
func (m *MockOperations) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWithPredicate, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Get", ctx, partitionKey, key)
ret0, _ := ret[0].(*kv.ValueWithPredicate)
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MockOperations.Get signature does not match the Operations interface, which returns ([]byte, error). Update the mock to return ([]byte, error) and adjust the type assertions accordingly.

Suggested change
func (m *MockOperations) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWithPredicate, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Get", ctx, partitionKey, key)
ret0, _ := ret[0].(*kv.ValueWithPredicate)
func (m *MockOperations) Get(ctx context.Context, partitionKey, key []byte) ([]byte, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Get", ctx, partitionKey, key)
ret0, _ := ret[0].([]byte)

Copilot uses AI. Check for mistakes.
Comment on lines +158 to +171
// SetIf mocks base method.
func (m *MockOperations) SetIf(ctx context.Context, partitionKey, key, value []byte, valuePredicate kv.Predicate) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetIf", ctx, partitionKey, key, value, valuePredicate)
ret0, _ := ret[0].(error)
return ret0
}

// SetIf indicates an expected call of SetIf.
func (mr *MockOperationsMockRecorder) SetIf(ctx, partitionKey, key, value, valuePredicate interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIf", reflect.TypeOf((*MockOperations)(nil).SetIf), ctx, partitionKey, key, value, valuePredicate)
}

Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operations does not define SetIf; this mock adds a method that is not part of the interface and will not compile when used as kv.Operations. Remove SetIf from MockOperations.

Suggested change
// SetIf mocks base method.
func (m *MockOperations) SetIf(ctx context.Context, partitionKey, key, value []byte, valuePredicate kv.Predicate) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetIf", ctx, partitionKey, key, value, valuePredicate)
ret0, _ := ret[0].(error)
return ret0
}
// SetIf indicates an expected call of SetIf.
func (mr *MockOperationsMockRecorder) SetIf(ctx, partitionKey, key, value, valuePredicate interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIf", reflect.TypeOf((*MockOperations)(nil).SetIf), ctx, partitionKey, key, value, valuePredicate)
}

Copilot uses AI. Check for mistakes.
}

func (s *StoreTransactionerMetricsWrapper) Transact(ctx context.Context, fn func(operations Operations) error, opts TransactionOpts) error {
// TODO(ariels): Wrap the passed-in Operations struct to measure _its_ latenceis.
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'latenceis' to 'latencies'.

Suggested change
// TODO(ariels): Wrap the passed-in Operations struct to measure _its_ latenceis.
// TODO(ariels): Wrap the passed-in Operations struct to measure _its_ latencies.

Copilot uses AI. Check for mistakes.
// Transactioner is implemented by some Stores.
type Transactioner interface {
// Transact runs fn as a single transaction. Typically it uses
// optimistic concurrency: It runs fn and trie to commit it
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'trie' to 'tries' in the comment.

Suggested change
// optimistic concurrency: It runs fn and trie to commit it
// optimistic concurrency: It runs fn and tries to commit it

Copilot uses AI. Check for mistakes.
ErrMissingValue = errors.New("missing value")
ErrNotFound = errors.New("not found")
ErrPredicateFailed = errors.New("predicate failed")
ErrConflict = fmt.Errorf("%w (giving up)", ErrPredicateFailed)
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErrConflict wraps ErrPredicateFailed, making errors.Is(err, ErrPredicateFailed) true for transaction conflicts. This conflates two distinct error domains and can mislead callers. Consider defining a dedicated sentinel error (e.g., errors.New("transaction conflict")) without wrapping ErrPredicateFailed.

Suggested change
ErrConflict = fmt.Errorf("%w (giving up)", ErrPredicateFailed)
ErrConflict = errors.New("transaction conflict (giving up)")

Copilot uses AI. Check for mistakes.
if duration == backoff.Stop {
break
}
time.Sleep(duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use select with time.After and checking the ctx done to prevent blocking of context cancellation.

Comment on lines +123 to +127
func (s *Store) Set(ctx context.Context, partitionKey, key, value []byte) error {
return s.db.Update(func(txn *badger.Txn) error {
return setFromTxn(ctx, s.logger, txn, partitionKey, key, value)
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General note - I was sure that all previous operations (Get/Set/...) will use the new Transaction method to perform the operation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/integrations area/KV Improvements to the KV store implementation exclude-changelog PR description should not be included in next release changelog minor-change Used for PRs that don't require issue attached

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants