From 62b7a10a949e77b33d43cd7438979833c32cc865 Mon Sep 17 00:00:00 2001 From: muXxer Date: Mon, 18 May 2020 15:00:18 +0200 Subject: [PATCH] Clean up transaction oracle as we go (#1275) In the existing implementation of oracle, if you happen to always have at least one write transaction open the memory usage of the transaction oracle is unbounded. It is actually relatively easy to hit when batch importing data. If you have more than one WriteBatch active during the import the transaction oracle will never be cleaned up. This commit fixes it. The core idea is to avoid increasing contention on purely read transactions; so only clean up the transaction oracle when write transactions are committed even if technically we could free memory sooner; Split the big `oracle.commit` map into one map per previously committed transaction; (this allows Go to release memory sooner than when performing deletes on a single map); Take advantage of the fact that we have acquired the oracle lock in oracle.newCommitTs to do the cleanup I am assuming here that the number of committed-but-still-tracked transactions is small, which makes an implementation based on a simple slice reasonable. If that's not the case we will need some form of a sorted data-structure (i.e. a b-tree) here. Co-authored-by: Damien Tournoud --- batch.go | 18 +++--- managed_db.go | 4 +- txn.go | 148 ++++++++++++++++++++++++++++++-------------------- 3 files changed, 102 insertions(+), 68 deletions(-) diff --git a/batch.go b/batch.go index 5dd7d7a50..ff94e861d 100644 --- a/batch.go +++ b/batch.go @@ -31,7 +31,9 @@ type WriteBatch struct { db *DB throttle *y.Throttle err error - commitTs uint64 + + isManaged bool + commitTs uint64 } // NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes, @@ -43,14 +45,15 @@ func (db *DB) NewWriteBatch() *WriteBatch { if db.opt.managedTxns { panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead") } - return db.newWriteBatch() + return db.newWriteBatch(false) } -func (db *DB) newWriteBatch() *WriteBatch { +func (db *DB) newWriteBatch(isManaged bool) *WriteBatch { return &WriteBatch{ - db: db, - txn: db.newTransaction(true, true), - throttle: y.NewThrottle(16), + db: db, + isManaged: isManaged, + txn: db.newTransaction(true, isManaged), + throttle: y.NewThrottle(16), } } @@ -181,8 +184,7 @@ func (wb *WriteBatch) commit() error { return err } wb.txn.CommitWith(wb.callback) - wb.txn = wb.db.newTransaction(true, true) - wb.txn.readTs = 0 // We're not reading anything. + wb.txn = wb.db.newTransaction(true, wb.isManaged) wb.txn.commitTs = wb.commitTs return wb.err } diff --git a/managed_db.go b/managed_db.go index 9a51c289d..23c798845 100644 --- a/managed_db.go +++ b/managed_db.go @@ -47,7 +47,7 @@ func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch { panic("cannot use NewWriteBatchAt with managedDB=false. Use NewWriteBatch instead") } - wb := db.newWriteBatch() + wb := db.newWriteBatch(true) wb.commitTs = commitTs wb.txn.commitTs = commitTs return wb @@ -57,7 +57,7 @@ func (db *DB) NewManagedWriteBatch() *WriteBatch { panic("cannot use NewManagedWriteBatch with managedDB=false. Use NewWriteBatch instead") } - wb := db.newWriteBatch() + wb := db.newWriteBatch(true) return wb } diff --git a/txn.go b/txn.go index 0c4045a97..cb531350f 100644 --- a/txn.go +++ b/txn.go @@ -32,8 +32,6 @@ import ( ) type oracle struct { - // A 64-bit integer must be at the top for memory alignment. See issue #311. - refCount int64 isManaged bool // Does not change value, so no locking required. sync.Mutex // For nextTxnTs and commits. @@ -50,18 +48,23 @@ type oracle struct { discardTs uint64 // Used by ManagedDB. readMark *y.WaterMark // Used by DB. - // commits stores a key fingerprint and latest commit counter for it. - // refCount is used to clear out commits map to avoid a memory blowup. - commits map[uint64]uint64 + // committedTxns contains all committed writes (contains fingerprints + // of keys written and their latest commit counter). + committedTxns []committedTxn + lastCleanupTs uint64 // closer is used to stop watermarks. closer *y.Closer } +type committedTxn struct { + ts uint64 + writes map[uint64]struct{} +} + func newOracle(opt Options) *oracle { orc := &oracle{ isManaged: opt.managedTxns, - commits: make(map[uint64]uint64), // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open. // // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here. @@ -79,28 +82,6 @@ func (o *oracle) Stop() { o.closer.SignalAndWait() } -func (o *oracle) addRef() { - atomic.AddInt64(&o.refCount, 1) -} - -func (o *oracle) decrRef() { - if atomic.AddInt64(&o.refCount, -1) != 0 { - return - } - - // Clear out commits maps to release memory. - o.Lock() - defer o.Unlock() - // Avoids the race where something new is added to commitsMap - // after we check refCount and before we take Lock. - if atomic.LoadInt64(&o.refCount) != 0 { - return - } - if len(o.commits) >= 1000 { // If the map is still small, let it slide. - o.commits = make(map[uint64]uint64) - } -} - func (o *oracle) readTs() uint64 { if o.isManaged { panic("ReadTs should not be retrieved for managed DB") @@ -138,6 +119,7 @@ func (o *oracle) setDiscardTs(ts uint64) { o.Lock() defer o.Unlock() o.discardTs = ts + o.cleanupCommittedTransactions() } func (o *oracle) discardAtOrBelow() uint64 { @@ -154,13 +136,24 @@ func (o *oracle) hasConflict(txn *Txn) bool { if len(txn.reads) == 0 { return false } - for _, ro := range txn.reads { - // A commit at the read timestamp is expected. - // But, any commit after the read timestamp should cause a conflict. - if ts, has := o.commits[ro]; has && ts > txn.readTs { - return true + for _, committedTxn := range o.committedTxns { + // If the committedTxn.ts is less than txn.readTs that implies that the + // committedTxn finished before the current transaction started. + // We don't need to check for conflict in that case. + // This change assumes linearizability. Lack of linearizability could + // cause the read ts of a new txn to be lower than the commit ts of + // a txn before it (@mrjn). + if committedTxn.ts <= txn.readTs { + continue + } + + for _, ro := range txn.reads { + if _, has := committedTxn.writes[ro]; has { + return true + } } } + return false } @@ -174,6 +167,9 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { var ts uint64 if !o.isManaged { + o.doneRead(txn) + o.cleanupCommittedTransactions() + // This is the general case, when user doesn't specify the read and commit ts. ts = o.nextTxnTs o.nextTxnTs++ @@ -184,12 +180,62 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { ts = txn.commitTs } - for _, w := range txn.writes { - o.commits[w] = ts // Update the commitTs. + y.AssertTrue(ts >= o.lastCleanupTs) + + if !o.isManaged { + // We should ensure that txns are not added to o.committedTxns slice in + // managed mode. If the user doesn't set o.discardTs, the commitTxns + // slice would keep growing in managed mode. + o.committedTxns = append(o.committedTxns, committedTxn{ + ts: ts, + writes: txn.writes, + }) } + return ts } +func (o *oracle) doneRead(txn *Txn) { + if !txn.doneRead { + txn.doneRead = true + o.readMark.Done(txn.readTs) + } +} + +func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock + if o.isManaged { + // In managedMode, we do not store any committedTxns. It is expected + // that the system using badger in managedmode performs it's own + // conflict detection. + return + } + // Same logic as discardAtOrBelow but unlocked + var maxReadTs uint64 + if o.isManaged { + maxReadTs = o.discardTs + } else { + maxReadTs = o.readMark.DoneUntil() + } + + y.AssertTrue(maxReadTs >= o.lastCleanupTs) + + // do not run clean up if the maxReadTs (read timestamp of the + // oldest transaction that is still in flight) has not increased + if maxReadTs == o.lastCleanupTs { + return + } + o.lastCleanupTs = maxReadTs + + tmp := o.committedTxns[:0] + for _, txn := range o.committedTxns { + if txn.ts <= maxReadTs { + continue + } + tmp = append(tmp, txn) + } + o.committedTxns = tmp +} + func (o *oracle) doneCommit(cts uint64) { if o.isManaged { // No need to update anything. @@ -203,16 +249,17 @@ type Txn struct { readTs uint64 commitTs uint64 - update bool // update is used to conditionally keep track of reads. - readsLock sync.Mutex // guards the reads slice. See addReadKey. - reads []uint64 // contains fingerprints of keys read. - writes []uint64 // contains fingerprints of keys written. + update bool // update is used to conditionally keep track of reads. + reads []uint64 // contains fingerprints of keys read. + writes map[uint64]struct{} // contains fingerprints of keys written. + readsLock sync.Mutex // guards the reads slice. See addReadKey. pendingWrites map[string]*Entry // cache stores any writes done by txn. duplicateWrites []*Entry // Used in managed mode to store duplicate entries. db *DB discarded bool + doneRead bool size int64 count int64 @@ -337,7 +384,7 @@ func (txn *Txn) modify(e *Entry) error { return err } fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. - txn.writes = append(txn.writes, fp) + txn.writes[fp] = struct{}{} // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice. // Add the entry to duplicateWrites only if both the entries have different versions. For // same versions, we will overwrite the existing entry. @@ -464,10 +511,7 @@ func (txn *Txn) Discard() { } txn.discarded = true if !txn.db.orc.isManaged { - txn.db.orc.readMark.Done(txn.readTs) - } - if txn.update { - txn.db.orc.decrRef() + txn.db.orc.doneRead(txn) } } @@ -720,21 +764,9 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn { size: int64(len(txnKey) + 10), // Some buffer for the extra entry. } if update { + txn.writes = make(map[uint64]struct{}) txn.pendingWrites = make(map[string]*Entry) - txn.db.orc.addRef() - } - // It is important that the oracle addRef happens BEFORE we retrieve a read - // timestamp. Otherwise, it is possible that the oracle commit map would - // become nil after we get the read timestamp. - // The sequence of events can be: - // 1. This txn gets a read timestamp. - // 2. Another txn working on the same keyset commits them, and decrements - // the reference to oracle. - // 3. Oracle ref reaches zero, resetting commit map. - // 4. This txn increments the oracle reference. - // 5. Now this txn would go on to commit the keyset, and no conflicts - // would be detected. - // See issue: https://github.com/dgraph-io/badger/issues/574 + } if !isManaged { txn.readTs = db.orc.readTs() }