From 4081815b16f28ed4072b82345a9350218c97cab9 Mon Sep 17 00:00:00 2001 From: Vincent Lee Date: Mon, 25 Dec 2017 15:06:42 +0800 Subject: [PATCH] raft, etcdserver: allow Propose fail fast for dropping proposal in raft. --- etcdserver/server_test.go | 6 ++ etcdserver/v3_server.go | 2 +- raft/node.go | 54 +++++++++++++----- raft/node_test.go | 114 +++++++++++++++++++++++++++++++++++--- raft/raft.go | 58 ++++++++++--------- raft/raft_paper_test.go | 3 +- raft/raft_test.go | 7 ++- raft/rawnode_test.go | 3 +- 8 files changed, 197 insertions(+), 50 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index e3ea0f9250c7..ecc91e06ab7a 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1534,6 +1534,12 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error { n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}}) return nil } + +func (n *nodeRecorder) ProposeWithCancel(ctx context.Context, cancel context.CancelFunc, data []byte) error { + n.Record(testutil.Action{Name: "ProposeWithCancel", Params: []interface{}{data}}) + return nil +} + func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { n.Record(testutil.Action{Name: "ProposeConfChange"}) return nil diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 1f24f272faa6..c0b94e3af391 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -575,7 +575,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In defer cancel() start := time.Now() - s.r.Propose(cctx, data) + s.r.ProposeWithCancel(cctx, cancel, data) proposalsPending.Inc() defer proposalsPending.Dec() diff --git a/raft/node.go b/raft/node.go index 33a9db840012..78e3354264b8 100644 --- a/raft/node.go +++ b/raft/node.go @@ -118,6 +118,7 @@ type Node interface { Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error + ProposeWithCancel(ctx context.Context, cancel context.CancelFunc, data []byte) error // ProposeConfChange proposes config change. // At most one ConfChange can be in the process of going through consensus. // Application needs to call ApplyConfChange when applying EntryConfChange type entry. @@ -224,10 +225,15 @@ func RestartNode(c *Config) Node { return &n } +type messageWithCancel struct { + m pb.Message + cancel context.CancelFunc +} + // node is the canonical implementation of the Node interface type node struct { - propc chan pb.Message - recvc chan pb.Message + propc chan messageWithCancel + recvc chan messageWithCancel confc chan pb.ConfChange confstatec chan pb.ConfState readyc chan Ready @@ -242,8 +248,8 @@ type node struct { func newNode() node { return node{ - propc: make(chan pb.Message), - recvc: make(chan pb.Message), + propc: make(chan messageWithCancel), + recvc: make(chan messageWithCancel), confc: make(chan pb.ConfChange), confstatec: make(chan pb.ConfState), readyc: make(chan Ready), @@ -271,7 +277,7 @@ func (n *node) Stop() { } func (n *node) run(r *raft) { - var propc chan pb.Message + var propc chan messageWithCancel var readyc chan Ready var advancec chan struct{} var prevLastUnstablei, prevLastUnstablet uint64 @@ -314,13 +320,21 @@ func (n *node) run(r *raft) { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. - case m := <-propc: + case mc := <-propc: + m := mc.m m.From = r.id - r.Step(m) - case m := <-n.recvc: + err := r.Step(m) + if err == ErrProposalDropped && mc.cancel != nil { + mc.cancel() + } + case mc := <-n.recvc: + m := mc.m // filter out response message from unknown From. if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { - r.Step(m) // raft never returns an error + err := r.Step(m) // raft never returns an error + if err == ErrProposalDropped && mc.cancel != nil { + mc.cancel() + } } case cc := <-n.confc: if cc.NodeID == None { @@ -409,6 +423,13 @@ func (n *node) Propose(ctx context.Context, data []byte) error { return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) } +func (n *node) ProposeWithCancel(ctx context.Context, cancel context.CancelFunc, data []byte) error { + return n.stepWithCancel(ctx, cancel, pb.Message{ + Type: pb.MsgProp, + Entries: []pb.Entry{{Data: data}}, + }) +} + func (n *node) Step(ctx context.Context, m pb.Message) error { // ignore unexpected local messages receiving over network if IsLocalMsg(m.Type) { @@ -429,13 +450,18 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. func (n *node) step(ctx context.Context, m pb.Message) error { + return n.stepWithCancel(ctx, nil, m) +} + +func (n *node) stepWithCancel(ctx context.Context, cancel context.CancelFunc, m pb.Message) error { + mc := messageWithCancel{m: m, cancel: cancel} ch := n.recvc - if m.Type == pb.MsgProp { + if mc.m.Type == pb.MsgProp { ch = n.propc } select { - case ch <- m: + case ch <- mc: return nil case <-ctx.Done(): return ctx.Err() @@ -478,7 +504,7 @@ func (n *node) Status() Status { func (n *node) ReportUnreachable(id uint64) { select { - case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}: + case n.recvc <- messageWithCancel{m: pb.Message{Type: pb.MsgUnreachable, From: id}}: case <-n.done: } } @@ -487,7 +513,7 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { rej := status == SnapshotFailure select { - case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}: + case n.recvc <- messageWithCancel{m: pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}}: case <-n.done: } } @@ -495,7 +521,7 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) { select { // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership - case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}: + case n.recvc <- messageWithCancel{m: pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}}: case <-n.done: case <-ctx.Done(): } diff --git a/raft/node_test.go b/raft/node_test.go index 4401412e7743..62da226762b2 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "reflect" + "strings" "testing" "time" @@ -30,8 +31,8 @@ import ( func TestNodeStep(t *testing.T) { for i, msgn := range raftpb.MessageType_name { n := &node{ - propc: make(chan raftpb.Message, 1), - recvc: make(chan raftpb.Message, 1), + propc: make(chan messageWithCancel, 1), + recvc: make(chan messageWithCancel, 1), } msgt := raftpb.MessageType(i) n.Step(context.TODO(), raftpb.Message{Type: msgt}) @@ -64,7 +65,7 @@ func TestNodeStep(t *testing.T) { func TestNodeStepUnblock(t *testing.T) { // a node without buffer to block step n := &node{ - propc: make(chan raftpb.Message), + propc: make(chan messageWithCancel), done: make(chan struct{}), } @@ -109,8 +110,9 @@ func TestNodeStepUnblock(t *testing.T) { // TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft. func TestNodePropose(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } n := newNode() @@ -147,8 +149,9 @@ func TestNodePropose(t *testing.T) { // It also ensures that ReadState can be read out through ready chan. func TestNodeReadIndex(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} @@ -214,7 +217,10 @@ func TestDisableProposalForwarding(t *testing.T) { } // send proposal to r3(follower) where DisableProposalForwarding is true - r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries}) + err := r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries}) + if err != ErrProposalDropped { + t.Fatalf("should return drop proposal error while disable proposal forwarding") + } // verify r3(follower) does not forward the proposal when DisableProposalForwarding is true if len(r3.msgs) != 0 { @@ -284,8 +290,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) { // to the underlying raft. func TestNodeProposeConfig(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } n := newNode() @@ -426,6 +433,99 @@ func TestBlockProposal(t *testing.T) { } } +func TestNodeDropPropose(t *testing.T) { + msgs := []raftpb.Message{} + droppingMsg := []byte("test_dropping") + normalMsg := []byte("normal_message") + normalDoneCh := make(chan bool) + dropStep := func(r *raft, m raftpb.Message) error { + if m.Type == raftpb.MsgProp && strings.Contains(m.String(), string(droppingMsg)) { + t.Logf("dropping message: %v", m.String()) + return ErrProposalDropped + } + if m.Type == raftpb.MsgProp && strings.Contains(m.String(), string(normalMsg)) { + close(normalDoneCh) + } + msgs = append(msgs, m) + return nil + } + + n := newNode() + s := NewMemoryStorage() + r := newTestRaft(1, []uint64{1}, 10, 1, s) + go n.run(r) + n.Campaign(context.TODO()) + for { + rd := <-n.Ready() + s.Append(rd.Entries) + // change the step function to dropStep until this raft becomes leader + if rd.SoftState.Lead == r.id { + r.step = dropStep + n.Advance() + break + } + n.Advance() + } + + proposalTimeout := time.Second + ctx, cancel := context.WithTimeout(context.Background(), proposalTimeout) + // propose with cancel should be cancelled earyly if dropped + err := n.ProposeWithCancel(ctx, cancel, droppingMsg) + if err != nil { + t.Errorf("should propose success: %v", err) + } + select { + case <-ctx.Done(): + if ctx.Err() != context.Canceled { + t.Errorf("should cancel propose for dropped proposal with cancel") + } + case <-time.After(proposalTimeout / 2): + t.Errorf("should return early for dropped proposal") + } + cancel() + + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + // other propose should wait until timeout if dropped + err = n.Propose(ctx, droppingMsg) + if err != nil { + t.Errorf("should propose success: %v", err) + } + select { + case <-ctx.Done(): + if ctx.Err() != context.DeadlineExceeded { + t.Errorf("should timeout propose for dropped proposal with no cancel") + } + case <-time.After(proposalTimeout * 2): + t.Errorf("should return early for dropped proposal") + } + cancel() + + ctx, cancel = context.WithTimeout(context.Background(), proposalTimeout) + err = n.ProposeWithCancel(ctx, cancel, normalMsg) + if err != nil { + t.Errorf("should propose success: %v", err) + } + select { + case <-ctx.Done(): + t.Errorf("should not fail for normal proposal: %v", ctx.Err()) + case <-time.After(proposalTimeout): + t.Errorf("should return early for normal proposal") + case <-normalDoneCh: + } + cancel() + + n.Stop() + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) + } + if msgs[0].Type != raftpb.MsgProp { + t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp) + } + if !bytes.Equal(msgs[0].Entries[0].Data, normalMsg) { + t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, normalMsg) + } +} + // TestNodeTick ensures that node.Tick() will increase the // elapsed of the underlying raft state machine. func TestNodeTick(t *testing.T) { diff --git a/raft/raft.go b/raft/raft.go index b4c0f0248ca2..d0f92819cdcb 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -67,6 +67,8 @@ const ( campaignTransfer CampaignType = "CampaignTransfer" ) +var ErrProposalDropped = errors.New("raft proposal dropped") + // lockedRand is a small wrapper around rand.Rand to provide // synchronization. Only the methods needed by the code are exposed // (e.g. Intn). @@ -865,25 +867,28 @@ func (r *raft) Step(m pb.Message) error { } default: - r.step(r, m) + err := r.step(r, m) + if err != nil { + return err + } } return nil } -type stepFunc func(r *raft, m pb.Message) +type stepFunc func(r *raft, m pb.Message) error -func stepLeader(r *raft, m pb.Message) { +func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { case pb.MsgBeat: r.bcastHeartbeat() - return + return nil case pb.MsgCheckQuorum: if !r.checkQuorumActive() { r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) } - return + return nil case pb.MsgProp: if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) @@ -892,11 +897,11 @@ func stepLeader(r *raft, m pb.Message) { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. - return + return ErrProposalDropped } if r.leadTransferee != None { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) - return + return ErrProposalDropped } for i, e := range m.Entries { @@ -910,12 +915,12 @@ func stepLeader(r *raft, m pb.Message) { } r.appendEntry(m.Entries...) r.bcastAppend() - return + return nil case pb.MsgReadIndex: if r.quorum() > 1 { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. - return + return nil } // thinking: use an interally defined context instead of the user given context. @@ -937,14 +942,14 @@ func stepLeader(r *raft, m pb.Message) { r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } - return + return nil } // All other message types require a progress for m.From (pr). pr := r.getProgress(m.From) if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) - return + return nil } switch m.Type { case pb.MsgAppResp: @@ -1000,12 +1005,12 @@ func stepLeader(r *raft, m pb.Message) { } if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { - return + return nil } ackCount := r.readOnly.recvAck(m) if ackCount < r.quorum() { - return + return nil } rss := r.readOnly.advance(m) @@ -1019,7 +1024,7 @@ func stepLeader(r *raft, m pb.Message) { } case pb.MsgSnapStatus: if pr.State != ProgressStateSnapshot { - return + return nil } if !m.Reject { pr.becomeProbe() @@ -1043,7 +1048,7 @@ func stepLeader(r *raft, m pb.Message) { case pb.MsgTransferLeader: if pr.IsLearner { r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id) - return + return nil } leadTransferee := m.From lastLeadTransferee := r.leadTransferee @@ -1051,14 +1056,14 @@ func stepLeader(r *raft, m pb.Message) { if lastLeadTransferee == leadTransferee { r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x", r.id, r.Term, leadTransferee, leadTransferee) - return + return nil } r.abortLeaderTransfer() r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee) } if leadTransferee == r.id { r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id) - return + return nil } // Transfer leadership to third party. r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee) @@ -1072,11 +1077,12 @@ func stepLeader(r *raft, m pb.Message) { r.sendAppend(leadTransferee) } } + return nil } // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is // whether they respond to MsgVoteResp or MsgPreVoteResp. -func stepCandidate(r *raft, m pb.Message) { +func stepCandidate(r *raft, m pb.Message) error { // Only handle vote responses corresponding to our candidacy (while in // StateCandidate, we may get stale MsgPreVoteResp messages in this term from // our pre-candidate state). @@ -1089,7 +1095,7 @@ func stepCandidate(r *raft, m pb.Message) { switch m.Type { case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) - return + return ErrProposalDropped case pb.MsgApp: r.becomeFollower(r.Term, m.From) r.handleAppendEntries(m) @@ -1116,17 +1122,18 @@ func stepCandidate(r *raft, m pb.Message) { case pb.MsgTimeoutNow: r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) } + return nil } -func stepFollower(r *raft, m pb.Message) { +func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgProp: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) - return + return ErrProposalDropped } else if r.disableProposalForwarding { r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) - return + return ErrProposalDropped } m.To = r.lead r.send(m) @@ -1145,7 +1152,7 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgTransferLeader: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term) - return + return nil } m.To = r.lead r.send(m) @@ -1162,17 +1169,18 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgReadIndex: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) - return + return nil } m.To = r.lead r.send(m) case pb.MsgReadIndexResp: if len(m.Entries) != 1 { r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) - return + return nil } r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) } + return nil } func (r *raft) handleAppendEntries(m pb.Message) { diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 2911e8aa333a..71a7d14ace16 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -79,8 +79,9 @@ func testUpdateTermFromMessage(t *testing.T, state StateType) { // Reference: section 5.1 func TestRejectStaleTermMessage(t *testing.T) { called := false - fakeStep := func(r *raft, m pb.Message) { + fakeStep := func(r *raft, m pb.Message) error { called = true + return nil } r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) r.step = fakeStep diff --git a/raft/raft_test.go b/raft/raft_test.go index c1cf7cc557cd..49fbb3916e74 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1231,8 +1231,9 @@ func TestPastElectionTimeout(t *testing.T) { // actual stepX function. func TestStepIgnoreOldTermMsg(t *testing.T) { called := false - fakeStep := func(r *raft, m pb.Message) { + fakeStep := func(r *raft, m pb.Message) error { called = true + return nil } sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) sm.step = fakeStep @@ -3250,6 +3251,10 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { } nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + if err != ErrProposalDropped { + t.Fatalf("should return drop proposal error while transferring") + } if lead.prs[1].Match != 1 { t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 4ccf72de45a7..f034521be575 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -190,8 +190,9 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { // to the underlying raft. It also ensures that ReadState can be read out. func TestRawNodeReadIndex(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}