From 47dd180f9caa1be3fa8e11ef41a7d729f75bd44d Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 21 Sep 2018 14:54:14 -0700 Subject: [PATCH] Fix a bug in Raft.Run loop. (#2606) When retrieving a snapshot, wait until all the pending updates are applied via the applyCh. Otherwise, these updates would get applied after applying the snapshot, which would cause Dgraph to change the state of the data in unpredictable ways. Also send the entire list of Committed Entries in one shot to applyCh, instead of sending each entry one by one. This is less contentious for Go channels. --- edgraph/server.go | 2 +- worker/draft.go | 75 +++++++++++++++++++++++------------------------ 2 files changed, 38 insertions(+), 39 deletions(-) diff --git a/edgraph/server.go b/edgraph/server.go index a0f7c0894ca..fb32b1585d5 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -133,7 +133,7 @@ func (s *ServerState) initStorage() { // Write Ahead Log directory x.Checkf(os.MkdirAll(Config.WALDir, 0700), "Error while creating WAL dir.") opt := badger.LSMOnlyOptions - opt.ValueLogMaxEntries = 1000 // Allow for easy space reclamation. + opt.ValueLogMaxEntries = 10000 // Allow for easy space reclamation. opt = setBadgerOptions(opt, Config.WALDir) glog.Infof("Opening write-ahead log BadgerDB with options: %+v\n", opt) diff --git a/worker/draft.go b/worker/draft.go index e2c51a7dce2..3039f218134 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -43,7 +43,7 @@ type node struct { *conn.Node // Fields which are never changed after init. - applyCh chan raftpb.Entry + applyCh chan []raftpb.Entry ctx context.Context gid uint32 closer *y.Closer @@ -73,7 +73,7 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) * gid: gid, // processConfChange etc are not throttled so some extra delta, so that we don't // block tick when applyCh is full - applyCh: make(chan raftpb.Entry, Config.NumPendingProposals+1000), + applyCh: make(chan []raftpb.Entry, 100), elog: trace.NewEventLog("Dgraph", "ApplyCh"), closer: y.NewCloser(2), // Matches CLOSER:1 } @@ -421,16 +421,27 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error { func (n *node) processApplyCh() { defer n.closer.Done() // CLOSER:1 - for e := range n.applyCh { - proposal := &intern.Proposal{} - if err := proposal.Unmarshal(e.Data); err != nil { - x.Fatalf("Unable to unmarshal proposal: %v %q\n", err, e.Data) - } + for entries := range n.applyCh { + for _, e := range entries { + switch { + case e.Type == raftpb.EntryConfChange: + // Already handled in the main Run loop. + case len(e.Data) == 0: + n.elog.Printf("Found empty data at index: %d", e.Index) + n.Applied.Done(e.Index) + default: + proposal := &intern.Proposal{} + if err := proposal.Unmarshal(e.Data); err != nil { + x.Fatalf("Unable to unmarshal proposal: %v %q\n", err, e.Data) + } - err := n.applyCommitted(proposal, e.Index) - n.elog.Printf("Applied proposal with key: %s, index: %d. Err: %v", proposal.Key, e.Index, err) - n.Proposals.Done(proposal.Key, err) - n.Applied.Done(e.Index) + err := n.applyCommitted(proposal, e.Index) + n.elog.Printf("Applied proposal with key: %s, index: %d. Err: %v", + proposal.Key, e.Index, err) + n.Proposals.Done(proposal.Key, err) + n.Applied.Done(e.Index) + } + } } } @@ -644,14 +655,19 @@ func (n *node) Run() { rc := snap.GetContext() x.AssertTrue(rc.GetGroup() == n.gid) if rc.Id != n.Id { - // NOTE: Retrieving snapshot here is OK, after storing it above in WAL, because - // rc.Id != n.Id. - x.Printf("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id) - // It's ok to block tick while retrieving snapshot, since it's a follower + // We are getting a new snapshot from leader. We need to wait for the applyCh to + // finish applying the updates, otherwise, we'll end up overwriting the data + // from the new snapshot that we retrieved. + maxIndex := n.Applied.LastIndex() + glog.Infof("Waiting for applyCh to reach %d before taking snapshot\n", maxIndex) + n.Applied.WaitForMark(context.Background(), maxIndex) + + // It's ok to block ticks while retrieving snapshot, since it's a follower. + glog.Infof("-------> SNAPSHOT [%d] from %d\n", n.gid, rc.Id) n.retryUntilSuccess(n.retrieveSnapshot, 100*time.Millisecond) - x.Printf("-------> SNAPSHOT [%d]. DONE.\n", n.gid) + glog.Infof("-------> SNAPSHOT [%d]. DONE.\n", n.gid) } else { - x.Printf("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id) + glog.Infof("-------> SNAPSHOT [%d] from %d [SELF]. Ignoring.\n", n.gid, rc.Id) } } if tr != nil { @@ -659,7 +675,7 @@ func (n *node) Run() { } // Now schedule or apply committed entries. - for idx, entry := range rd.CommittedEntries { + for _, entry := range rd.CommittedEntries { // Need applied watermarks for schema mutation also for read linearazibility // Applied watermarks needs to be emitted as soon as possible sequentially. // If we emit Mark{4, false} and Mark{4, true} before emitting Mark{3, false} @@ -673,28 +689,11 @@ func (n *node) Run() { // Not present in proposal map. n.Applied.Done(entry.Index) groups().triggerMembershipSync() - - } else if len(entry.Data) == 0 { - // TODO: Say something. Do something. - tr.LazyPrintf("Found empty data at index: %d", entry.Index) - tr.SetError() - n.Applied.Done(entry.Index) - - } else { - // When applyCh fills up, this would automatically block. - // TODO: Instead of sending each entry, we should just send - // the whole array of CommittedEntries. It would avoid - // blocking this loop. - n.applyCh <- entry - } - - // Move to debug log later. - // Sometimes after restart there are too many entries to replay, so log so that we - // know Run loop is replaying them. - if tr != nil && idx%5000 == 4999 { - tr.LazyPrintf("Handling committed entries. At idx: [%v]\n", idx) } } + // Send the whole lot to applyCh in one go, instead of sending entries one by one. + n.applyCh <- rd.CommittedEntries + if tr != nil { tr.LazyPrintf("Handled %d committed entries.", len(rd.CommittedEntries)) }