diff --git a/batch.go b/batch.go index 5dd7d7a50..ff94e861d 100644 --- a/batch.go +++ b/batch.go @@ -31,7 +31,9 @@ type WriteBatch struct { db *DB throttle *y.Throttle err error - commitTs uint64 + + isManaged bool + commitTs uint64 } // NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes, @@ -43,14 +45,15 @@ func (db *DB) NewWriteBatch() *WriteBatch { if db.opt.managedTxns { panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead") } - return db.newWriteBatch() + return db.newWriteBatch(false) } -func (db *DB) newWriteBatch() *WriteBatch { +func (db *DB) newWriteBatch(isManaged bool) *WriteBatch { return &WriteBatch{ - db: db, - txn: db.newTransaction(true, true), - throttle: y.NewThrottle(16), + db: db, + isManaged: isManaged, + txn: db.newTransaction(true, isManaged), + throttle: y.NewThrottle(16), } } @@ -181,8 +184,7 @@ func (wb *WriteBatch) commit() error { return err } wb.txn.CommitWith(wb.callback) - wb.txn = wb.db.newTransaction(true, true) - wb.txn.readTs = 0 // We're not reading anything. + wb.txn = wb.db.newTransaction(true, wb.isManaged) wb.txn.commitTs = wb.commitTs return wb.err } diff --git a/managed_db.go b/managed_db.go index 9a51c289d..23c798845 100644 --- a/managed_db.go +++ b/managed_db.go @@ -47,7 +47,7 @@ func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch { panic("cannot use NewWriteBatchAt with managedDB=false. Use NewWriteBatch instead") } - wb := db.newWriteBatch() + wb := db.newWriteBatch(true) wb.commitTs = commitTs wb.txn.commitTs = commitTs return wb @@ -57,7 +57,7 @@ func (db *DB) NewManagedWriteBatch() *WriteBatch { panic("cannot use NewManagedWriteBatch with managedDB=false. Use NewWriteBatch instead") } - wb := db.newWriteBatch() + wb := db.newWriteBatch(true) return wb } diff --git a/txn.go b/txn.go index 0c4045a97..cb531350f 100644 --- a/txn.go +++ b/txn.go @@ -32,8 +32,6 @@ import ( ) type oracle struct { - // A 64-bit integer must be at the top for memory alignment. See issue #311. - refCount int64 isManaged bool // Does not change value, so no locking required. sync.Mutex // For nextTxnTs and commits. @@ -50,18 +48,23 @@ type oracle struct { discardTs uint64 // Used by ManagedDB. readMark *y.WaterMark // Used by DB. - // commits stores a key fingerprint and latest commit counter for it. - // refCount is used to clear out commits map to avoid a memory blowup. - commits map[uint64]uint64 + // committedTxns contains all committed writes (contains fingerprints + // of keys written and their latest commit counter). + committedTxns []committedTxn + lastCleanupTs uint64 // closer is used to stop watermarks. closer *y.Closer } +type committedTxn struct { + ts uint64 + writes map[uint64]struct{} +} + func newOracle(opt Options) *oracle { orc := &oracle{ isManaged: opt.managedTxns, - commits: make(map[uint64]uint64), // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open. // // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here. @@ -79,28 +82,6 @@ func (o *oracle) Stop() { o.closer.SignalAndWait() } -func (o *oracle) addRef() { - atomic.AddInt64(&o.refCount, 1) -} - -func (o *oracle) decrRef() { - if atomic.AddInt64(&o.refCount, -1) != 0 { - return - } - - // Clear out commits maps to release memory. - o.Lock() - defer o.Unlock() - // Avoids the race where something new is added to commitsMap - // after we check refCount and before we take Lock. - if atomic.LoadInt64(&o.refCount) != 0 { - return - } - if len(o.commits) >= 1000 { // If the map is still small, let it slide. - o.commits = make(map[uint64]uint64) - } -} - func (o *oracle) readTs() uint64 { if o.isManaged { panic("ReadTs should not be retrieved for managed DB") @@ -138,6 +119,7 @@ func (o *oracle) setDiscardTs(ts uint64) { o.Lock() defer o.Unlock() o.discardTs = ts + o.cleanupCommittedTransactions() } func (o *oracle) discardAtOrBelow() uint64 { @@ -154,13 +136,24 @@ func (o *oracle) hasConflict(txn *Txn) bool { if len(txn.reads) == 0 { return false } - for _, ro := range txn.reads { - // A commit at the read timestamp is expected. - // But, any commit after the read timestamp should cause a conflict. - if ts, has := o.commits[ro]; has && ts > txn.readTs { - return true + for _, committedTxn := range o.committedTxns { + // If the committedTxn.ts is less than txn.readTs that implies that the + // committedTxn finished before the current transaction started. + // We don't need to check for conflict in that case. + // This change assumes linearizability. Lack of linearizability could + // cause the read ts of a new txn to be lower than the commit ts of + // a txn before it (@mrjn). + if committedTxn.ts <= txn.readTs { + continue + } + + for _, ro := range txn.reads { + if _, has := committedTxn.writes[ro]; has { + return true + } } } + return false } @@ -174,6 +167,9 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { var ts uint64 if !o.isManaged { + o.doneRead(txn) + o.cleanupCommittedTransactions() + // This is the general case, when user doesn't specify the read and commit ts. ts = o.nextTxnTs o.nextTxnTs++ @@ -184,12 +180,62 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { ts = txn.commitTs } - for _, w := range txn.writes { - o.commits[w] = ts // Update the commitTs. + y.AssertTrue(ts >= o.lastCleanupTs) + + if !o.isManaged { + // We should ensure that txns are not added to o.committedTxns slice in + // managed mode. If the user doesn't set o.discardTs, the commitTxns + // slice would keep growing in managed mode. + o.committedTxns = append(o.committedTxns, committedTxn{ + ts: ts, + writes: txn.writes, + }) } + return ts } +func (o *oracle) doneRead(txn *Txn) { + if !txn.doneRead { + txn.doneRead = true + o.readMark.Done(txn.readTs) + } +} + +func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock + if o.isManaged { + // In managedMode, we do not store any committedTxns. It is expected + // that the system using badger in managedmode performs it's own + // conflict detection. + return + } + // Same logic as discardAtOrBelow but unlocked + var maxReadTs uint64 + if o.isManaged { + maxReadTs = o.discardTs + } else { + maxReadTs = o.readMark.DoneUntil() + } + + y.AssertTrue(maxReadTs >= o.lastCleanupTs) + + // do not run clean up if the maxReadTs (read timestamp of the + // oldest transaction that is still in flight) has not increased + if maxReadTs == o.lastCleanupTs { + return + } + o.lastCleanupTs = maxReadTs + + tmp := o.committedTxns[:0] + for _, txn := range o.committedTxns { + if txn.ts <= maxReadTs { + continue + } + tmp = append(tmp, txn) + } + o.committedTxns = tmp +} + func (o *oracle) doneCommit(cts uint64) { if o.isManaged { // No need to update anything. @@ -203,16 +249,17 @@ type Txn struct { readTs uint64 commitTs uint64 - update bool // update is used to conditionally keep track of reads. - readsLock sync.Mutex // guards the reads slice. See addReadKey. - reads []uint64 // contains fingerprints of keys read. - writes []uint64 // contains fingerprints of keys written. + update bool // update is used to conditionally keep track of reads. + reads []uint64 // contains fingerprints of keys read. + writes map[uint64]struct{} // contains fingerprints of keys written. + readsLock sync.Mutex // guards the reads slice. See addReadKey. pendingWrites map[string]*Entry // cache stores any writes done by txn. duplicateWrites []*Entry // Used in managed mode to store duplicate entries. db *DB discarded bool + doneRead bool size int64 count int64 @@ -337,7 +384,7 @@ func (txn *Txn) modify(e *Entry) error { return err } fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. - txn.writes = append(txn.writes, fp) + txn.writes[fp] = struct{}{} // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice. // Add the entry to duplicateWrites only if both the entries have different versions. For // same versions, we will overwrite the existing entry. @@ -464,10 +511,7 @@ func (txn *Txn) Discard() { } txn.discarded = true if !txn.db.orc.isManaged { - txn.db.orc.readMark.Done(txn.readTs) - } - if txn.update { - txn.db.orc.decrRef() + txn.db.orc.doneRead(txn) } } @@ -720,21 +764,9 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn { size: int64(len(txnKey) + 10), // Some buffer for the extra entry. } if update { + txn.writes = make(map[uint64]struct{}) txn.pendingWrites = make(map[string]*Entry) - txn.db.orc.addRef() - } - // It is important that the oracle addRef happens BEFORE we retrieve a read - // timestamp. Otherwise, it is possible that the oracle commit map would - // become nil after we get the read timestamp. - // The sequence of events can be: - // 1. This txn gets a read timestamp. - // 2. Another txn working on the same keyset commits them, and decrements - // the reference to oracle. - // 3. Oracle ref reaches zero, resetting commit map. - // 4. This txn increments the oracle reference. - // 5. Now this txn would go on to commit the keyset, and no conflicts - // would be detected. - // See issue: https://github.com/dgraph-io/badger/issues/574 + } if !isManaged { txn.readTs = db.orc.readTs() }