From 2bb5a658d4cf3e52353a23dd75e575ae69a60252 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 26 May 2020 12:56:38 +0530 Subject: [PATCH 1/4] Support disabling conflict detection This commit adds support for disabling conflict detection by setting the `option.DetectConflicts=false`. When conflict detection is disabled badger will not store information required to detect the conflict. This reduces the amount of memory used by transactions running parallely and also allows transactions to be processed at a faster rate. --- options.go | 20 ++++++++++++++++++++ txn.go | 34 ++++++++++++++++++++-------------- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/options.go b/options.go index e97ba4770..f348a2ac1 100644 --- a/options.go +++ b/options.go @@ -93,6 +93,11 @@ type Options struct { // ChecksumVerificationMode decides when db should verify checksums for SSTable blocks. ChecksumVerificationMode options.ChecksumVerificationMode + // DetectConflicts determines whether the transactions would be checked for + // conflicts. The transactions can be processed at a higher rate when + // conflict detection is disabled. + DetectConflicts bool + // Transaction start and commit timestamps are managed by end-user. // This is only useful for databases built on top of Badger (like Dgraph). // Not recommended for most users. @@ -157,6 +162,7 @@ func DefaultOptions(path string) Options { LogRotatesToFlush: 2, EncryptionKey: []byte{}, EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days. + DetectConflicts: true, } } @@ -631,3 +637,17 @@ func (opt Options) WithLoadBloomsOnOpen(b bool) Options { opt.LoadBloomsOnOpen = b return opt } + +// WithDetectConflicts returns a new Options value with DetectConflicts set to the given value. +// +// Detect conflicts options determines if the transactions would be checked for +// conflicts before committing them. When this option is set to false +// (detectConflicts=false) badger can process transactions at a higher rate. +// Setting this options to false might be useful when the user application +// deals with conflict detection and resolution. +// +// The default value of Detect conflicts is True. +func (opt Options) WithDetectConflicts(b bool) Options { + opt.DetectConflicts = b + return opt +} diff --git a/txn.go b/txn.go index cb531350f..c6a2b8b5d 100644 --- a/txn.go +++ b/txn.go @@ -32,7 +32,8 @@ import ( ) type oracle struct { - isManaged bool // Does not change value, so no locking required. + isManaged bool // Does not change value, so no locking required. + detectConflicts bool // Determines if the txns should be checked for conflicts. sync.Mutex // For nextTxnTs and commits. // writeChLock lock is for ensuring that transactions go to the write @@ -182,10 +183,9 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { y.AssertTrue(ts >= o.lastCleanupTs) - if !o.isManaged { - // We should ensure that txns are not added to o.committedTxns slice in - // managed mode. If the user doesn't set o.discardTs, the commitTxns - // slice would keep growing in managed mode. + if o.detectConflicts { + // We should ensure that txns are not added to o.committedTxns slice when + // conflict detection is disabled otherwise this slice would keep growing. o.committedTxns = append(o.committedTxns, committedTxn{ ts: ts, writes: txn.writes, @@ -203,10 +203,9 @@ func (o *oracle) doneRead(txn *Txn) { } func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock - if o.isManaged { - // In managedMode, we do not store any committedTxns. It is expected - // that the system using badger in managedmode performs it's own - // conflict detection. + if !o.detectConflicts { + // When detectConflicts is set to false, we do not store any + // committedTxns and so there's nothing to clean up. return } // Same logic as discardAtOrBelow but unlocked @@ -383,8 +382,13 @@ func (txn *Txn) modify(e *Entry) error { if err := txn.checkSize(e); err != nil { return err } - fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. - txn.writes[fp] = struct{}{} + + // The txn.writes is used for conflict detection. If conflict detection + // is disabled, we don't need to store key hashes in this map. + if txn.db.opt.DetectConflicts { + fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. + txn.writes[fp] = struct{}{} + } // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice. // Add the entry to duplicateWrites only if both the entries have different versions. For // same versions, we will overwrite the existing entry. @@ -646,7 +650,7 @@ func (txn *Txn) commitPrecheck() error { // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM // tree won't be updated, so there's no need for any rollback. func (txn *Txn) Commit() error { - if len(txn.writes) == 0 { + if len(txn.pendingWrites) == 0 { return nil // Nothing to do. } // Precheck before discarding txn. @@ -697,7 +701,7 @@ func (txn *Txn) CommitWith(cb func(error)) { panic("Nil callback provided to CommitWith") } - if len(txn.writes) == 0 { + if len(txn.pendingWrites) == 0 { // Do not run these callbacks from here, because the CommitWith and the // callback might be acquiring the same locks. Instead run the callback // from another goroutine. @@ -764,7 +768,9 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn { size: int64(len(txnKey) + 10), // Some buffer for the extra entry. } if update { - txn.writes = make(map[uint64]struct{}) + if db.opt.DetectConflicts { + txn.writes = make(map[uint64]struct{}) + } txn.pendingWrites = make(map[string]*Entry) } if !isManaged { From ed03bc8c853bf105683684d2c4b9a2de57795762 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 15:38:43 +0530 Subject: [PATCH 2/4] fix bug --- txn.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/txn.go b/txn.go index c6a2b8b5d..2e206b583 100644 --- a/txn.go +++ b/txn.go @@ -65,7 +65,8 @@ type committedTxn struct { func newOracle(opt Options) *oracle { orc := &oracle{ - isManaged: opt.managedTxns, + isManaged: opt.managedTxns, + detectConflicts: opt.DetectConflicts, // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open. // // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here. From 2c6f5f4871a0a03445b3e930b1af4c4a164c4625 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 3 Jun 2020 18:49:44 +0530 Subject: [PATCH 3/4] Rename writes to conflictKeys --- txn.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/txn.go b/txn.go index 2e206b583..e625695bc 100644 --- a/txn.go +++ b/txn.go @@ -59,8 +59,9 @@ type oracle struct { } type committedTxn struct { - ts uint64 - writes map[uint64]struct{} + ts uint64 + // ConflictKeys Keeps track of the entries written at timestamp ts. + conflictKeys map[uint64]struct{} } func newOracle(opt Options) *oracle { @@ -150,7 +151,7 @@ func (o *oracle) hasConflict(txn *Txn) bool { } for _, ro := range txn.reads { - if _, has := committedTxn.writes[ro]; has { + if _, has := committedTxn.conflictKeys[ro]; has { return true } } @@ -188,8 +189,8 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { // We should ensure that txns are not added to o.committedTxns slice when // conflict detection is disabled otherwise this slice would keep growing. o.committedTxns = append(o.committedTxns, committedTxn{ - ts: ts, - writes: txn.writes, + ts: ts, + conflictKeys: txn.conflictKeys, }) } @@ -249,10 +250,11 @@ type Txn struct { readTs uint64 commitTs uint64 - update bool // update is used to conditionally keep track of reads. - reads []uint64 // contains fingerprints of keys read. - writes map[uint64]struct{} // contains fingerprints of keys written. - readsLock sync.Mutex // guards the reads slice. See addReadKey. + update bool // update is used to conditionally keep track of reads. + reads []uint64 // contains fingerprints of keys read. + // contains fingerprints of keys written. This is used for conflict detection. + conflictKeys map[uint64]struct{} + readsLock sync.Mutex // guards the reads slice. See addReadKey. pendingWrites map[string]*Entry // cache stores any writes done by txn. duplicateWrites []*Entry // Used in managed mode to store duplicate entries. @@ -651,6 +653,8 @@ func (txn *Txn) commitPrecheck() error { // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM // tree won't be updated, so there's no need for any rollback. func (txn *Txn) Commit() error { + // txn.conflictKeys can be zero if conflict detection is turned off. So we + // should check txn.pendingWrites. if len(txn.pendingWrites) == 0 { return nil // Nothing to do. } @@ -770,7 +774,7 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn { } if update { if db.opt.DetectConflicts { - txn.writes = make(map[uint64]struct{}) + txn.conflictKeys = make(map[uint64]struct{}) } txn.pendingWrites = make(map[string]*Entry) } From 1359fd9ebe8896ac75fb2cc8dbda36079396144d Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 3 Jun 2020 19:02:38 +0530 Subject: [PATCH 4/4] fixup --- txn.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/txn.go b/txn.go index e625695bc..3b16cb9a0 100644 --- a/txn.go +++ b/txn.go @@ -386,11 +386,11 @@ func (txn *Txn) modify(e *Entry) error { return err } - // The txn.writes is used for conflict detection. If conflict detection + // The txn.conflictKeys is used for conflict detection. If conflict detection // is disabled, we don't need to store key hashes in this map. if txn.db.opt.DetectConflicts { fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. - txn.writes[fp] = struct{}{} + txn.conflictKeys[fp] = struct{}{} } // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice. // Add the entry to duplicateWrites only if both the entries have different versions. For @@ -577,7 +577,7 @@ func (txn *Txn) commitAndSend() (func() error, error) { // down to here. So, keep this around for at least a couple of months. // var b strings.Builder // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ", - // txn.readTs, commitTs, txn.reads, txn.writes) + // txn.readTs, commitTs, txn.reads, txn.conflictKeys) for _, e := range txn.pendingWrites { processEntry(e) }