Skip to content

Commit

Permalink
move Config ownership when creating raft and port etcd #10063 fixing
Browse files Browse the repository at this point in the history
Signed-off-by: Fullstop000 <[email protected]>
  • Loading branch information
Fullstop000 committed Jun 11, 2020
1 parent 592a95a commit bfc4ea9
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 54 deletions.
2 changes: 1 addition & 1 deletion benches/suites/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn quick_raft(voters: usize, learners: usize, logger: &slog::Logger) -> Raft<Mem
let id = 1;
let storage = MemStorage::default();
let config = Config::new(id);
let mut raft = Raft::new(&config, storage, logger).unwrap();
let mut raft = Raft::new(config, storage, logger).unwrap();
(0..voters).for_each(|id| {
raft.add_node(id as u64).unwrap();
});
Expand Down
2 changes: 1 addition & 1 deletion harness/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Network {
nstorage.insert(*id, store.clone());
let mut config = config.clone();
config.id = *id;
let r = Raft::new(&config, store, l).unwrap().into();
let r = Raft::new(config, store, l).unwrap().into();
npeers.insert(*id, r);
}
Some(r) => {
Expand Down
26 changes: 13 additions & 13 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ fn test_progress_flow_control() {
cfg.max_inflight_msgs = 3;
cfg.max_size_per_msg = 2048;
let s = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
let mut r = new_test_raft_with_config(&cfg, s, &l);
let mut r = new_test_raft_with_config(cfg, s, &l);
r.become_candidate();
r.become_leader();

Expand Down Expand Up @@ -1203,7 +1203,7 @@ fn test_commit() {
hs.term = sm_term;
store.wl().set_hardstate(hs);
let cfg = new_test_config(1, 5, 1);
let mut sm = new_test_raft_with_config(&cfg, store, &l);
let mut sm = new_test_raft_with_config(cfg, store, &l);

for (j, v) in matches.iter().enumerate() {
let id = j as u64 + 1;
Expand Down Expand Up @@ -1348,7 +1348,7 @@ fn test_handle_heartbeat() {
.append(&[empty_entry(1, 1), empty_entry(2, 2), empty_entry(3, 3)])
.unwrap();
let cfg = new_test_config(1, 5, 1);
let mut sm = new_test_raft_with_config(&cfg, store, &l);
let mut sm = new_test_raft_with_config(cfg, store, &l);
sm.become_follower(2, 2);
sm.raft_log.commit_to(commit);
sm.handle_heartbeat(m);
Expand Down Expand Up @@ -2491,7 +2491,7 @@ fn test_read_only_for_new_leader() {
if compact_index != 0 {
storage.wl().compact(compact_index).unwrap();
}
let i = new_test_raft_with_config(&cfg, storage, &l);
let i = new_test_raft_with_config(cfg, storage, &l);
peers.push(Some(i));
}
let mut nt = Network::new(peers, &l);
Expand Down Expand Up @@ -3472,11 +3472,11 @@ fn test_leader_transfer_to_learner() {
let l = default_logger();
let s = MemStorage::new_with_conf_state((vec![1], vec![2]));
let c = new_test_config(1, 10, 1);
let leader = new_test_raft_with_config(&c, s, &l);
let leader = new_test_raft_with_config(c, s, &l);

let s = MemStorage::new_with_conf_state((vec![1], vec![2]));
let c = new_test_config(2, 10, 1);
let learner = new_test_raft_with_config(&c, s, &l);
let learner = new_test_raft_with_config(c, s, &l);

let mut nt = Network::new(vec![Some(leader), Some(learner)], &l);
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
Expand Down Expand Up @@ -3753,7 +3753,7 @@ pub fn new_test_learner_raft(
storage.initialize_with_conf_state((peers, learners));
}
let cfg = new_test_config(id, election, heartbeat);
new_test_raft_with_config(&cfg, storage, logger)
new_test_raft_with_config(cfg, storage, logger)
}

// TestLearnerElectionTimeout verfies that the leader should not start election
Expand Down Expand Up @@ -4187,7 +4187,7 @@ fn test_election_tick_range() {
let l = default_logger();
let mut cfg = new_test_config(1, 10, 1);
let s = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![]));
let mut raft = new_test_raft_with_config(&cfg, s, &l).raft.unwrap();
let mut raft = new_test_raft_with_config(cfg.clone(), s, &l).raft.unwrap();
for _ in 0..1000 {
raft.reset_randomized_election_timeout();
let randomized_timeout = raft.randomized_election_timeout();
Expand All @@ -4209,7 +4209,7 @@ fn test_election_tick_range() {
cfg.validate().unwrap_err();

cfg.max_election_tick = cfg.election_tick + 1;
raft = new_test_raft_with_config(&cfg, new_storage(), &l)
raft = new_test_raft_with_config(cfg.clone(), new_storage(), &l)
.raft
.unwrap();
for _ in 0..100 {
Expand Down Expand Up @@ -4277,7 +4277,7 @@ fn test_prevote_with_check_quorum() {
cfg.pre_vote = true;
cfg.check_quorum = true;
let s = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![]));
let mut i = new_test_raft_with_config(&cfg, s, &l);
let mut i = new_test_raft_with_config(cfg, s, &l);
i.become_follower(1, INVALID_ID);
i
};
Expand Down Expand Up @@ -4342,7 +4342,7 @@ fn test_prevote_with_check_quorum() {
fn test_new_raft_with_bad_config_errors() {
let invalid_config = new_test_config(INVALID_ID, 1, 1);
let s = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
let raft = Raft::new(&invalid_config, s, &default_logger());
let raft = Raft::new(invalid_config, s, &default_logger());
assert!(raft.is_err())
}

Expand Down Expand Up @@ -4762,7 +4762,7 @@ fn test_group_commit() {
hs.term = 1;
store.wl().set_hardstate(hs);
let cfg = new_test_config(1, 5, 1);
let mut sm = new_test_raft_with_config(&cfg, store, &l);
let mut sm = new_test_raft_with_config(cfg, store, &l);

let mut groups = vec![];
for (j, (m, g)) in matches.into_iter().zip(group_ids).enumerate() {
Expand Down Expand Up @@ -4890,7 +4890,7 @@ fn test_group_commit_consistent() {
store.wl().set_hardstate(hs);
let mut cfg = new_test_config(1, 5, 1);
cfg.applied = applied;
let mut sm = new_test_raft_with_config(&cfg, store, &l);
let mut sm = new_test_raft_with_config(cfg, store, &l);
sm.state = role;

let mut groups = vec![];
Expand Down
14 changes: 7 additions & 7 deletions harness/tests/integration_cases/test_raft_paper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ fn test_leader_commit_preceding_entries() {
let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![]));
store.wl().append(&tt).unwrap();
let cfg = new_test_config(1, 10, 1);
new_test_raft_with_config(&cfg, store, &l)
new_test_raft_with_config(cfg, store, &l)
};
r.load_state(&hard_state(2, 0, 0));
r.become_candidate();
Expand Down Expand Up @@ -661,7 +661,7 @@ fn test_follower_check_msg_append() {
let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![]));
store.wl().append(&ents).unwrap();
let cfg = new_test_config(1, 10, 1);
new_test_raft_with_config(&cfg, store, &l)
new_test_raft_with_config(cfg, store, &l)
};
r.load_state(&hard_state(0, 1, 0));
r.become_follower(2, 2);
Expand Down Expand Up @@ -733,7 +733,7 @@ fn test_follower_append_entries() {
.append(&[empty_entry(1, 1), empty_entry(2, 2)])
.unwrap();
let cfg = new_test_config(1, 10, 1);
new_test_raft_with_config(&cfg, store, &l)
new_test_raft_with_config(cfg, store, &l)
};
r.become_follower(2, 2);

Expand Down Expand Up @@ -852,7 +852,7 @@ fn test_leader_sync_follower_log() {
let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![]));
store.wl().append(&ents).unwrap();
let cfg = new_test_config(1, 10, 1);
new_test_raft_with_config(&cfg, store, &l)
new_test_raft_with_config(cfg, store, &l)
};
let last_index = lead.raft_log.last_index();
lead.load_state(&hard_state(term, last_index, 0));
Expand All @@ -861,7 +861,7 @@ fn test_leader_sync_follower_log() {
let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![]));
store.wl().append(&tt).unwrap();
let cfg = new_test_config(2, 10, 1);
new_test_raft_with_config(&cfg, store, &l)
new_test_raft_with_config(cfg, store, &l)
};
follower.load_state(&hard_state(term - 1, 0, 0));

Expand Down Expand Up @@ -971,7 +971,7 @@ fn test_voter() {
let s = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
s.wl().append(&ents).unwrap();
let cfg = new_test_config(1, 10, 1);
let mut r = new_test_raft_with_config(&cfg, s, &l);
let mut r = new_test_raft_with_config(cfg, s, &l);

let mut m = new_message(2, 1, MessageType::MsgRequestVote, 0);
m.term = 3;
Expand Down Expand Up @@ -1016,7 +1016,7 @@ fn test_leader_only_commits_log_from_current_term() {
let store = MemStorage::new_with_conf_state((vec![1, 2], vec![]));
store.wl().append(&ents).unwrap();
let cfg = new_test_config(1, 10, 1);
new_test_raft_with_config(&cfg, store, &l)
new_test_raft_with_config(cfg, store, &l)
};
r.load_state(&hard_state(2, 0, 0));

Expand Down
69 changes: 65 additions & 4 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn new_raw_node(
.apply_snapshot(new_snapshot(1, 1, peers))
.unwrap();
}
RawNode::new(&config, storage, logger).unwrap()
RawNode::new(config, storage, logger).unwrap()
}

// test_raw_node_step ensures that RawNode.Step ignore local message.
Expand Down Expand Up @@ -392,7 +392,7 @@ fn test_raw_node_restart_from_snapshot() {
store.wl().apply_snapshot(snap).unwrap();
store.wl().append(&entries).unwrap();
store.wl().set_hardstate(hard_state(1, 3, 0));
RawNode::new(&new_test_config(1, 10, 1), store, &l).unwrap()
RawNode::new(new_test_config(1, 10, 1), store, &l).unwrap()
};

let rd = raw_node.ready();
Expand All @@ -409,7 +409,7 @@ fn test_skip_bcast_commit() {
let mut config = new_test_config(1, 10, 1);
config.skip_bcast_commit = true;
let s = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![]));
let r1 = new_test_raft_with_config(&config, s, &l);
let r1 = new_test_raft_with_config(config, s, &l);
let r2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l);
let r3 = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut nt = Network::new(vec![Some(r1), Some(r2), Some(r3)], &l);
Expand Down Expand Up @@ -525,7 +525,7 @@ fn test_commit_pagination() {
let storage = MemStorage::new_with_conf_state((vec![1], vec![]));
let mut config = new_test_config(1, 10, 1);
config.max_committed_size_per_ready = 2048;
let mut raw_node = RawNode::new(&config, storage, &l).unwrap();
let mut raw_node = RawNode::new(config, storage, &l).unwrap();
raw_node.campaign().unwrap();
let rd = raw_node.ready();
let committed_len = rd.committed_entries.as_ref().unwrap().len();
Expand Down Expand Up @@ -561,3 +561,64 @@ fn test_commit_pagination() {
raw_node.mut_store().wl().append(rd.entries()).unwrap();
raw_node.advance(rd);
}


// test_commit_pagination_after_restart regression tests a scenario in which the
// Storage's Entries size limitation is slightly more permissive than Raft's
// internal one
//
// - node learns that index 11 is committed
// - next_entries returns index 1..10 in committed_entries (but index 10 already
// exceeds maxBytes), which isn't noticed internally by Raft
// - Commit index gets bumped to 10
// - the node persists the HardState, but crashes before applying the entries
// - upon restart, the storage returns the same entries, but `slice` takes a
// different code path and removes the last entry.
// - Raft does not emit a HardState, but when the app calls advance(), it bumps
// its internal applied index cursor to 10 (when it should be 9)
// - the next Ready asks the app to apply index 11 (omitting index 10), losing a
// write.
#[test]
fn test_commit_pagination_after_restart() {
let mut persisted_hard_state = HardState::default();
persisted_hard_state.set_term(1);
persisted_hard_state.set_vote(1);
persisted_hard_state.set_commit(10);
let s = IgnoreSizeHintMemStorage::default();
s.inner.wl().set_hardstate(persisted_hard_state);
let ents_count = 10;
let mut ents = Vec::with_capacity(ents_count);
let mut size = 0u64;
for i in 0..ents_count {
let e = new_entry(1, i as u64 + 1, Some("a"));
size += u64::from(e.compute_size());
ents.push(e);
}
s.inner.wl().append(&ents).unwrap();

let mut cfg = new_test_config(1, 10, 1);
// Set a max_size_per_msg that would suggest to Raft that the last committed entry should
// not be included in the initial rd.committed_entries. However, our storage will ignore
// this and *will* return it (which is how the Commit index ended up being 10 initially).
cfg.max_size_per_msg = size - 1;

s.inner.wl().append(&vec![new_entry(1, 11, Some("boom"))]).unwrap();
let mut raw_node = RawNode::with_default_logger(cfg, s).unwrap();
let mut highest_applied = 0;
while highest_applied != 11 {
let rd = raw_node.ready();
dbg!(&rd);
let committed_entries = rd.committed_entries.clone().unwrap();
assert!(committed_entries.len() > 0, "stop applying entries at index {}", highest_applied);
let next = committed_entries.first().unwrap().get_index();
if highest_applied != 0 {
assert_eq!(highest_applied + 1, next, "attempting to apply index {} after index {}, leaving a gap", next, highest_applied)
}
highest_applied = rd.committed_entries.as_ref().unwrap().last().unwrap().get_index();
raw_node.advance(rd);
let mut m = new_message(1, 1, MessageType::MsgHeartbeat, 0);
m.set_term(1);
m.set_commit(11); // leader learns commit index is 11
raw_node.step(m).unwrap();
}
}
40 changes: 36 additions & 4 deletions harness/tests/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn new_test_raft(
if !peers.is_empty() && !storage.initial_state().unwrap().initialized() {
storage.initialize_with_conf_state((peers, vec![]));
}
new_test_raft_with_config(&config, storage, l)
new_test_raft_with_config(config, storage, l)
}

pub fn new_test_raft_with_prevote(
Expand All @@ -82,7 +82,7 @@ pub fn new_test_raft_with_prevote(
if !peers.is_empty() && !storage.initial_state().unwrap().initialized() {
storage.initialize_with_conf_state((peers, vec![]));
}
new_test_raft_with_config(&config, storage, l)
new_test_raft_with_config(config, storage, l)
}

pub fn new_test_raft_with_logs(
Expand All @@ -102,10 +102,10 @@ pub fn new_test_raft_with_logs(
storage.initialize_with_conf_state((peers, vec![]));
}
storage.wl().append(logs).unwrap();
new_test_raft_with_config(&config, storage, l)
new_test_raft_with_config(config, storage, l)
}

pub fn new_test_raft_with_config(config: &Config, storage: MemStorage, l: &Logger) -> Interface {
pub fn new_test_raft_with_config(config: Config, storage: MemStorage, l: &Logger) -> Interface {
Interface::new(Raft::new(config, storage, l).unwrap())
}

Expand Down Expand Up @@ -163,3 +163,35 @@ pub fn new_snapshot(index: u64, term: u64, voters: Vec<u64>) -> Snapshot {
s.mut_metadata().mut_conf_state().voters = voters;
s
}

#[derive(Default)]
pub struct IgnoreSizeHintMemStorage {
pub inner: MemStorage
}

impl Storage for IgnoreSizeHintMemStorage {
fn initial_state(&self) -> Result<RaftState> {
self.inner.initial_state()
}

fn entries(&self, low: u64, high: u64, _max_size: impl Into<Option<u64>>) -> Result<Vec<Entry>> {
self.inner.entries(low, high, u64::MAX)
}

fn term(&self, idx: u64) -> Result<u64> {
self.inner.term(idx)
}

fn first_index(&self) -> Result<u64> {
self.inner.first_index()
}

fn last_index(&self) -> Result<u64> {
self.inner.last_index()
}

fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
self.inner.snapshot(request_index)
}
}

Loading

0 comments on commit bfc4ea9

Please sign in to comment.