Skip to content

Commit

Permalink
Support entry version in Write batch (#1310)
Browse files Browse the repository at this point in the history
This PR adds support for setting different versions for different
keys in write batch. The existing implementation of write batch
allows setting only a single version for all keys in the write batch.
With this PR, user can do the following
```
wb := db.NewManagedWriteBatch()
wb.SetEntryAt(e, ts)
wb.Flush()
```

Also, the existing behavior of `txn.Commit()` in `un-managed
(normal) mode` is to panic, the new behavior would be to
return an error.
  • Loading branch information
Ibrahim Jarif authored Apr 21, 2020
1 parent f9332eb commit fd693e4
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 22 deletions.
17 changes: 17 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/dgraph-io/badger/v2/y"
"github.com/pkg/errors"
)

// WriteBatch holds the necessary info to perform batched writes.
Expand Down Expand Up @@ -88,6 +89,16 @@ func (wb *WriteBatch) callback(err error) {
wb.err = err
}

// SetEntryAt is the equivalent of Txn.SetEntry but it also allows setting version for the entry.
// SetEntryAt can be used only in managed mode.
func (wb *WriteBatch) SetEntryAt(e *Entry, ts uint64) error {
if !wb.db.opt.managedTxns {
return errors.New("SetEntryAt can only be used in managed mode. Use SetEntry instead")
}
e.version = ts
return wb.SetEntry(e)
}

// SetEntry is the equivalent of Txn.SetEntry.
func (wb *WriteBatch) SetEntry(e *Entry) error {
wb.Lock()
Expand Down Expand Up @@ -115,6 +126,12 @@ func (wb *WriteBatch) Set(k, v []byte) error {
return wb.SetEntry(e)
}

// DeleteAt is equivalent of Txn.Delete but accepts a delete timestamp.
func (wb *WriteBatch) DeleteAt(k []byte, ts uint64) error {
e := Entry{Key: k, meta: bitDelete, version: ts}
return wb.SetEntry(&e)
}

// Delete is equivalent of Txn.Delete.
func (wb *WriteBatch) Delete(k []byte) error {
wb.Lock()
Expand Down
3 changes: 3 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func TestWriteBatch(t *testing.T) {
wb := db.NewWriteBatch()
defer wb.Cancel()

// Sanity check for SetEntryAt.
require.Error(t, wb.SetEntryAt(&Entry{}, 12))

N, M := 50000, 1000
start := time.Now()

Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
txn = append(txn, te)

default:
// This entry is from a rewrite.
// This entry is from a rewrite or via SetEntryAt(..).
toLSM(nk, v)

// We shouldn't get this entry in the middle of a transaction.
Expand Down
8 changes: 8 additions & 0 deletions managed_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch {
wb.txn.commitTs = commitTs
return wb
}
func (db *DB) NewManagedWriteBatch() *WriteBatch {
if !db.opt.managedTxns {
panic("cannot use NewWriteBatchAt with managedDB=false. Use NewWriteBatch instead")
}

wb := db.newWriteBatch()
return wb
}

// CommitAt commits the transaction, following the same logic as Commit(), but
// at the given commit timestamp. This will panic if not used with managed transactions.
Expand Down
47 changes: 47 additions & 0 deletions managed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,53 @@ func TestWriteBatchManagedMode(t *testing.T) {
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(1))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}
func TestWriteBatchManaged(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
opt.MaxTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewManagedWriteBatch()
defer wb.Cancel()

N, M := 50000, 1000
start := time.Now()

for i := 0; i < N; i++ {
require.NoError(t, wb.SetEntryAt(&Entry{Key: key(i), Value: val(i)}, 1))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.DeleteAt(key(i), 2))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))

err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(1))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
Expand Down
1 change: 1 addition & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type Entry struct {
UserMeta byte
ExpiresAt uint64 // time.Unix
meta byte
version uint64

// Fields maintained internally.
offset uint32
Expand Down
81 changes: 61 additions & 20 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,21 @@ func (txn *Txn) commitAndSend() (func() error, error) {
defer orc.writeChLock.Unlock()

commitTs := orc.newCommitTs(txn)
if commitTs == 0 {
// The commitTs can be zero if the transaction is running in managed mode.
// Individual entries might have their own timestamps.
if commitTs == 0 && !txn.db.opt.managedTxns {
return nil, ErrConflict
}

keepTogether := true
for _, e := range txn.pendingWrites {
if e.version == 0 {
e.version = commitTs
} else {
keepTogether = false
}
}

// The following debug information is what led to determining the cause of
// bank txn violation bug, and it took a whole bunch of effort to narrow it
// down to here. So, keep this around for at least a couple of months.
Expand All @@ -478,21 +489,30 @@ func (txn *Txn) commitAndSend() (func() error, error) {
// txn.readTs, commitTs, txn.reads, txn.writes)
entries := make([]*Entry, 0, len(txn.pendingWrites)+1)
for _, e := range txn.pendingWrites {
// fmt.Fprintf(&b, "[%q : %q], ", e.Key, e.Value)

// Suffix the keys with commit ts, so the key versions are sorted in
// descending order of commit timestamp.
e.Key = y.KeyWithTs(e.Key, commitTs)
e.meta |= bitTxn
e.Key = y.KeyWithTs(e.Key, e.version)
// Add bitTxn only if these entries are part of a transaction. We
// support SetEntryAt(..) in managed mode which means a single
// transaction can have entries with different timestamps. If entries
// in a single transaction have different timestamps, we don't add the
// transaction markers.
if keepTogether {
e.meta |= bitTxn
}
entries = append(entries, e)
}
// log.Printf("%s\n", b.String())
e := &Entry{
Key: y.KeyWithTs(txnKey, commitTs),
Value: []byte(strconv.FormatUint(commitTs, 10)),
meta: bitFinTxn,

if keepTogether {
// CommitTs should not be zero if we're inserting transaction markers.
y.AssertTrue(commitTs != 0)
e := &Entry{
Key: y.KeyWithTs(txnKey, commitTs),
Value: []byte(strconv.FormatUint(commitTs, 10)),
meta: bitFinTxn,
}
entries = append(entries, e)
}
entries = append(entries, e)

req, err := txn.db.sendToWriteCh(entries)
if err != nil {
Expand All @@ -510,13 +530,26 @@ func (txn *Txn) commitAndSend() (func() error, error) {
return ret, nil
}

func (txn *Txn) commitPrecheck() {
if txn.commitTs == 0 && txn.db.opt.managedTxns {
panic("Commit cannot be called with managedDB=true. Use CommitAt.")
}
func (txn *Txn) commitPrecheck() error {
if txn.discarded {
panic("Trying to commit a discarded txn")
return errors.New("Trying to commit a discarded txn")
}
keepTogether := true
for _, e := range txn.pendingWrites {
if e.version != 0 {
keepTogether = false
}
}

// If keepTogether is True, it implies transaction markers will be added.
// In that case, commitTs should not be never be zero. This might happen if
// someone uses txn.Commit instead of txn.CommitAt in managed mode. This
// should happen only in managed mode. In normal mode, keepTogether will
// always be true.
if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 {
return errors.New("CommitTs cannot be zero. Please use commitAt instead")
}
return nil
}

// Commit commits the transaction, following these steps:
Expand All @@ -538,7 +571,10 @@ func (txn *Txn) commitPrecheck() {
// If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
// tree won't be updated, so there's no need for any rollback.
func (txn *Txn) Commit() error {
txn.commitPrecheck() // Precheck before discarding txn.
// Precheck before discarding txn.
if err := txn.commitPrecheck(); err != nil {
return err
}
defer txn.Discard()

if len(txn.writes) == 0 {
Expand Down Expand Up @@ -583,13 +619,18 @@ func runTxnCallback(cb *txnCb) {
// so it is safe to increment sync.WaitGroup before calling CommitWith, and
// decrementing it in the callback; to block until all callbacks are run.
func (txn *Txn) CommitWith(cb func(error)) {
txn.commitPrecheck() // Precheck before discarding txn.
defer txn.Discard()

if cb == nil {
panic("Nil callback provided to CommitWith")
}

// Precheck before discarding txn.
if err := txn.commitPrecheck(); err != nil {
cb(err)
return
}

defer txn.Discard()

if len(txn.writes) == 0 {
// Do not run these callbacks from here, because the CommitWith and the
// callback might be acquiring the same locks. Instead run the callback
Expand Down
2 changes: 1 addition & 1 deletion txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func TestManagedDB(t *testing.T) {
for i := 0; i <= 3; i++ {
require.NoError(t, txn.SetEntry(NewEntry(key(i), val(i))))
}
require.Panics(t, func() { txn.Commit() })
require.Error(t, txn.Commit())
require.NoError(t, txn.CommitAt(3, nil))

// Read data at t=2.
Expand Down

0 comments on commit fd693e4

Please sign in to comment.