Skip to content

Commit

Permalink
Bug fix: Send commit timestamps in order (#2687)
Browse files Browse the repository at this point in the history
Zero had a race condition, where it could send commit timestamps out of order. This can cause invalid reads anyway. But, even more serious than that, Alpha was recently changed to ensure that it does not write over a newer version of the key (a4bd06f2f). If commits come out of order, some of these commits can be dropped on the floor.

This change fixes that by ensuring that Zero would send all the commits in the right order.

P.S. Also did a change for @danielmai , which sets `stderrthreshold=0` always, and doesn't let a user overwrite it.

Changelog:

* Fix to ensure that Zero sends out all commit timestamps in order.
* Do the sorting of txn status in Alpha, because Alpha also batches multiple updates from Zero.
  • Loading branch information
manishrjain authored Oct 23, 2018
1 parent 8f56eb9 commit af08300
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
5 changes: 5 additions & 0 deletions dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ func init() {
RootCmd.PersistentFlags().Bool("expose_trace", false,
"Allow trace endpoint to be accessible from remote")
rootConf.BindPFlags(RootCmd.PersistentFlags())

flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
// Always set stderrthreshold=0. Don't let users set it themselves.
x.Check(flag.Set("stderrthreshold", "0"))
x.Check(flag.CommandLine.MarkDeprecated("stderrthreshold",
"Dgraph always sets this flag to 0. It can't be overwritten."))

var subcommands = []*x.SubCommand{
&bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero,
Expand Down
24 changes: 16 additions & 8 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package zero
import (
"errors"
"math/rand"
"sort"
"time"

"github.com/dgraph-io/dgo/protos/api"
Expand Down Expand Up @@ -116,12 +115,6 @@ func (o *Oracle) commit(src *api.TxnContext) error {
return nil
}

func sortTxns(delta *pb.OracleDelta) {
sort.Slice(delta.Txns, func(i, j int) bool {
return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs
})
}

func (o *Oracle) currentState() *pb.OracleDelta {
o.AssertRLock()
resp := &pb.OracleDelta{}
Expand Down Expand Up @@ -175,7 +168,22 @@ func (o *Oracle) sendDeltasToSubscribers() {
break slurp_loop
}
}
sortTxns(delta) // Sort them in increasing order of CommitTs.
// No need to sort the txn updates here. Alpha would sort them before
// applying.

// Let's ensure that we have all the commits up until the max here.
// Otherwise, we'll be sending commit timestamps out of order, which
// would cause Alphas to drop some of them, during writes to Badger.
waitFor := delta.MaxAssigned
for _, txn := range delta.Txns {
waitFor = x.Max(waitFor, txn.CommitTs)
}
if o.doneUntil.DoneUntil() < waitFor {
continue // The for loop doing blocking reads from o.updates.
// We need at least one entry from the updates channel to pick up a missing update.
// Don't goto slurp_loop, because it would break from select immediately.
}

o.Lock()
for id, ch := range o.subscribers {
select {
Expand Down
8 changes: 8 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type node struct {
gid uint32
closer *y.Closer

lastCommitTs uint64 // Only used to ensure that our commit Ts is monotonically increasing.

streaming int32 // Used to avoid calculating snapshot

canCampaign bool
Expand Down Expand Up @@ -449,8 +451,14 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
time.Sleep(10 * time.Millisecond)
}
}

for _, status := range delta.Txns {
if status.CommitTs > 0 && status.CommitTs < n.lastCommitTs {
glog.Errorf("Lastcommit %d > current %d. This would cause some commits to be lost.",
n.lastCommitTs, status.CommitTs)
}
toDisk(status.StartTs, status.CommitTs)
n.lastCommitTs = status.CommitTs
}
if err := writer.Flush(); err != nil {
x.Errorf("Error while flushing to disk: %v", err)
Expand Down
10 changes: 10 additions & 0 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package worker
import (
"fmt"
"math"
"sort"
"sync/atomic"
"time"

Expand Down Expand Up @@ -801,7 +802,16 @@ func (g *groupi) processOracleDeltaStream() {
elog.Errorf("No longer the leader of group %d. Exiting", g.groupId())
return
}

// We should always sort the txns before applying. Otherwise, we might lose some of
// these updates, becuase we never write over a new version.
sort.Slice(delta.Txns, func(i, j int) bool {
return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs
})
elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta)
if glog.V(2) {
glog.Infof("Batched %d updates. Proposing Delta: %v.", batch, delta)
}
for {
// Block forever trying to propose this.
err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta})
Expand Down

0 comments on commit af08300

Please sign in to comment.