Skip to content

Commit

Permalink
Clean up transaction oracle as we go (#1275)
Browse files Browse the repository at this point in the history
In the existing implementation of oracle, if you happen to always have at least
one write transaction open the memory usage of the transaction oracle is
unbounded. It is actually relatively easy to hit when batch importing data. If
you have more than one WriteBatch active during the import the transaction
oracle will never be cleaned up.

This commit fixes it. The core idea is to avoid increasing contention on purely
read transactions; so only clean up the transaction oracle when write
transactions are committed even if technically we could free memory sooner;
Split the big `oracle.commit` map into one map per previously committed
transaction; (this allows Go to release memory sooner than when performing
deletes on a single map);

Take advantage of the fact that we have acquired the oracle lock in
oracle.newCommitTs to do the cleanup I am assuming here that the number of
committed-but-still-tracked transactions is small, which makes an
implementation based on a simple slice reasonable. If that's not the case we
will need some form of a sorted data-structure (i.e. a b-tree) here.

Co-authored-by: Damien Tournoud <[email protected]>
  • Loading branch information
muXxer and damz authored May 18, 2020
1 parent ef28ef3 commit 62b7a10
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 68 deletions.
18 changes: 10 additions & 8 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ type WriteBatch struct {
db *DB
throttle *y.Throttle
err error
commitTs uint64

isManaged bool
commitTs uint64
}

// NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes,
Expand All @@ -43,14 +45,15 @@ func (db *DB) NewWriteBatch() *WriteBatch {
if db.opt.managedTxns {
panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead")
}
return db.newWriteBatch()
return db.newWriteBatch(false)
}

func (db *DB) newWriteBatch() *WriteBatch {
func (db *DB) newWriteBatch(isManaged bool) *WriteBatch {
return &WriteBatch{
db: db,
txn: db.newTransaction(true, true),
throttle: y.NewThrottle(16),
db: db,
isManaged: isManaged,
txn: db.newTransaction(true, isManaged),
throttle: y.NewThrottle(16),
}
}

Expand Down Expand Up @@ -181,8 +184,7 @@ func (wb *WriteBatch) commit() error {
return err
}
wb.txn.CommitWith(wb.callback)
wb.txn = wb.db.newTransaction(true, true)
wb.txn.readTs = 0 // We're not reading anything.
wb.txn = wb.db.newTransaction(true, wb.isManaged)
wb.txn.commitTs = wb.commitTs
return wb.err
}
Expand Down
4 changes: 2 additions & 2 deletions managed_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch {
panic("cannot use NewWriteBatchAt with managedDB=false. Use NewWriteBatch instead")
}

wb := db.newWriteBatch()
wb := db.newWriteBatch(true)
wb.commitTs = commitTs
wb.txn.commitTs = commitTs
return wb
Expand All @@ -57,7 +57,7 @@ func (db *DB) NewManagedWriteBatch() *WriteBatch {
panic("cannot use NewManagedWriteBatch with managedDB=false. Use NewWriteBatch instead")
}

wb := db.newWriteBatch()
wb := db.newWriteBatch(true)
return wb
}

Expand Down
148 changes: 90 additions & 58 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (
)

type oracle struct {
// A 64-bit integer must be at the top for memory alignment. See issue #311.
refCount int64
isManaged bool // Does not change value, so no locking required.

sync.Mutex // For nextTxnTs and commits.
Expand All @@ -50,18 +48,23 @@ type oracle struct {
discardTs uint64 // Used by ManagedDB.
readMark *y.WaterMark // Used by DB.

// commits stores a key fingerprint and latest commit counter for it.
// refCount is used to clear out commits map to avoid a memory blowup.
commits map[uint64]uint64
// committedTxns contains all committed writes (contains fingerprints
// of keys written and their latest commit counter).
committedTxns []committedTxn
lastCleanupTs uint64

// closer is used to stop watermarks.
closer *y.Closer
}

type committedTxn struct {
ts uint64
writes map[uint64]struct{}
}

func newOracle(opt Options) *oracle {
orc := &oracle{
isManaged: opt.managedTxns,
commits: make(map[uint64]uint64),
// We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
//
// WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
Expand All @@ -79,28 +82,6 @@ func (o *oracle) Stop() {
o.closer.SignalAndWait()
}

func (o *oracle) addRef() {
atomic.AddInt64(&o.refCount, 1)
}

func (o *oracle) decrRef() {
if atomic.AddInt64(&o.refCount, -1) != 0 {
return
}

// Clear out commits maps to release memory.
o.Lock()
defer o.Unlock()
// Avoids the race where something new is added to commitsMap
// after we check refCount and before we take Lock.
if atomic.LoadInt64(&o.refCount) != 0 {
return
}
if len(o.commits) >= 1000 { // If the map is still small, let it slide.
o.commits = make(map[uint64]uint64)
}
}

func (o *oracle) readTs() uint64 {
if o.isManaged {
panic("ReadTs should not be retrieved for managed DB")
Expand Down Expand Up @@ -138,6 +119,7 @@ func (o *oracle) setDiscardTs(ts uint64) {
o.Lock()
defer o.Unlock()
o.discardTs = ts
o.cleanupCommittedTransactions()
}

func (o *oracle) discardAtOrBelow() uint64 {
Expand All @@ -154,13 +136,24 @@ func (o *oracle) hasConflict(txn *Txn) bool {
if len(txn.reads) == 0 {
return false
}
for _, ro := range txn.reads {
// A commit at the read timestamp is expected.
// But, any commit after the read timestamp should cause a conflict.
if ts, has := o.commits[ro]; has && ts > txn.readTs {
return true
for _, committedTxn := range o.committedTxns {
// If the committedTxn.ts is less than txn.readTs that implies that the
// committedTxn finished before the current transaction started.
// We don't need to check for conflict in that case.
// This change assumes linearizability. Lack of linearizability could
// cause the read ts of a new txn to be lower than the commit ts of
// a txn before it (@mrjn).
if committedTxn.ts <= txn.readTs {
continue
}

for _, ro := range txn.reads {
if _, has := committedTxn.writes[ro]; has {
return true
}
}
}

return false
}

Expand All @@ -174,6 +167,9 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 {

var ts uint64
if !o.isManaged {
o.doneRead(txn)
o.cleanupCommittedTransactions()

// This is the general case, when user doesn't specify the read and commit ts.
ts = o.nextTxnTs
o.nextTxnTs++
Expand All @@ -184,12 +180,62 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 {
ts = txn.commitTs
}

for _, w := range txn.writes {
o.commits[w] = ts // Update the commitTs.
y.AssertTrue(ts >= o.lastCleanupTs)

if !o.isManaged {
// We should ensure that txns are not added to o.committedTxns slice in
// managed mode. If the user doesn't set o.discardTs, the commitTxns
// slice would keep growing in managed mode.
o.committedTxns = append(o.committedTxns, committedTxn{
ts: ts,
writes: txn.writes,
})
}

return ts
}

func (o *oracle) doneRead(txn *Txn) {
if !txn.doneRead {
txn.doneRead = true
o.readMark.Done(txn.readTs)
}
}

func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
if o.isManaged {
// In managedMode, we do not store any committedTxns. It is expected
// that the system using badger in managedmode performs it's own
// conflict detection.
return
}
// Same logic as discardAtOrBelow but unlocked
var maxReadTs uint64
if o.isManaged {
maxReadTs = o.discardTs
} else {
maxReadTs = o.readMark.DoneUntil()
}

y.AssertTrue(maxReadTs >= o.lastCleanupTs)

// do not run clean up if the maxReadTs (read timestamp of the
// oldest transaction that is still in flight) has not increased
if maxReadTs == o.lastCleanupTs {
return
}
o.lastCleanupTs = maxReadTs

tmp := o.committedTxns[:0]
for _, txn := range o.committedTxns {
if txn.ts <= maxReadTs {
continue
}
tmp = append(tmp, txn)
}
o.committedTxns = tmp
}

func (o *oracle) doneCommit(cts uint64) {
if o.isManaged {
// No need to update anything.
Expand All @@ -203,16 +249,17 @@ type Txn struct {
readTs uint64
commitTs uint64

update bool // update is used to conditionally keep track of reads.
readsLock sync.Mutex // guards the reads slice. See addReadKey.
reads []uint64 // contains fingerprints of keys read.
writes []uint64 // contains fingerprints of keys written.
update bool // update is used to conditionally keep track of reads.
reads []uint64 // contains fingerprints of keys read.
writes map[uint64]struct{} // contains fingerprints of keys written.
readsLock sync.Mutex // guards the reads slice. See addReadKey.

pendingWrites map[string]*Entry // cache stores any writes done by txn.
duplicateWrites []*Entry // Used in managed mode to store duplicate entries.

db *DB
discarded bool
doneRead bool

size int64
count int64
Expand Down Expand Up @@ -337,7 +384,7 @@ func (txn *Txn) modify(e *Entry) error {
return err
}
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.writes = append(txn.writes, fp)
txn.writes[fp] = struct{}{}
// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
// Add the entry to duplicateWrites only if both the entries have different versions. For
// same versions, we will overwrite the existing entry.
Expand Down Expand Up @@ -464,10 +511,7 @@ func (txn *Txn) Discard() {
}
txn.discarded = true
if !txn.db.orc.isManaged {
txn.db.orc.readMark.Done(txn.readTs)
}
if txn.update {
txn.db.orc.decrRef()
txn.db.orc.doneRead(txn)
}
}

Expand Down Expand Up @@ -720,21 +764,9 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn {
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
}
if update {
txn.writes = make(map[uint64]struct{})
txn.pendingWrites = make(map[string]*Entry)
txn.db.orc.addRef()
}
// It is important that the oracle addRef happens BEFORE we retrieve a read
// timestamp. Otherwise, it is possible that the oracle commit map would
// become nil after we get the read timestamp.
// The sequence of events can be:
// 1. This txn gets a read timestamp.
// 2. Another txn working on the same keyset commits them, and decrements
// the reference to oracle.
// 3. Oracle ref reaches zero, resetting commit map.
// 4. This txn increments the oracle reference.
// 5. Now this txn would go on to commit the keyset, and no conflicts
// would be detected.
// See issue: https://github.com/dgraph-io/badger/issues/574
}
if !isManaged {
txn.readTs = db.orc.readTs()
}
Expand Down

0 comments on commit 62b7a10

Please sign in to comment.