Skip to content

Commit

Permalink
fix(txn): Fix data races in transaction code (#8060)
Browse files Browse the repository at this point in the history
Fix data races.

(cherry picked from commit cf22bf7)
ahsanbarkati committed Oct 4, 2021
1 parent a0e38da commit a93d027
Showing 4 changed files with 28 additions and 16 deletions.
8 changes: 8 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
@@ -163,6 +163,14 @@ func (lc *LocalCache) getNoStore(key string) *List {
return nil
}

func (lc *LocalCache) ReadKeys() map[uint64]struct{} {
return lc.readKeys
}

func (lc *LocalCache) Deltas() map[string][]byte {
return lc.deltas
}

// SetIfAbsent adds the list for the specified key to the cache. If a list for the same
// key already exists, the cache will not be modified and the existing list
// will be returned instead. This behavior is meant to prevent the goroutines
12 changes: 2 additions & 10 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
@@ -247,16 +247,8 @@ func (txn *Txn) addConflictKey(conflictKey uint64) {
}
}

func (txn *Txn) ReadKeys() map[uint64]struct{} {
txn.Lock()
defer txn.Unlock()
return txn.cache.readKeys
}

func (txn *Txn) Deltas() map[string][]byte {
txn.Lock()
defer txn.Unlock()
return txn.cache.deltas
func (txn *Txn) Cache() *LocalCache {
return txn.cache
}

// FillContext updates the given transaction context with data from this transaction.
5 changes: 4 additions & 1 deletion posting/oracle.go
Original file line number Diff line number Diff line change
@@ -275,9 +275,12 @@ func (o *oracle) DeleteTxnsAndRollupKeys(delta *pb.OracleDelta) {
for _, status := range delta.Txns {
txn := o.pendingTxns[status.StartTs]
if txn != nil && status.CommitTs > 0 {
for k := range txn.Deltas() {
c := txn.Cache()
c.RLock()
for k := range c.Deltas() {
IncrRollup.addKeyToBatch([]byte(k), 0)
}
c.RUnlock()
}
delete(o.pendingTxns, status.StartTs)
}
19 changes: 14 additions & 5 deletions worker/draft.go
Original file line number Diff line number Diff line change
@@ -132,7 +132,11 @@ func (kw *keysWritten) StillValid(txn *posting.Txn) bool {
kw.validTxns++
return true
}
for hash := range txn.ReadKeys() {

c := txn.Cache()
c.Lock()
defer c.Unlock()
for hash := range c.ReadKeys() {
// If the commitTs is between (MaxAssignedSeen, StartTs], the txn reads were invalid. If the
// commitTs is > StartTs, then it doesn't matter for reads. If the commit ts is <
// MaxAssignedSeen, that means our reads are valid.
@@ -1081,12 +1085,17 @@ func (n *node) commitOrAbort(_ uint64, delta *pb.OracleDelta) error {
if txn == nil || status.CommitTs == 0 {
continue
}
for k := range txn.Deltas() {
c := txn.Cache()
c.RLock()
for k := range c.Deltas() {
n.keysWritten.keyCommitTs[z.MemHashString(k)] = status.CommitTs
}
n.keysWritten.totalKeys += len(txn.Deltas())
numKeys += len(txn.Deltas())
if len(txn.Deltas()) == 0 {
num := len(c.Deltas())
c.RUnlock()

n.keysWritten.totalKeys += num
numKeys += num
if num == 0 {
continue
}
txns = append(txns, txn)

0 comments on commit a93d027

Please sign in to comment.