From 592a95af2085bc005b177204a589d0a197f731b0 Mon Sep 17 00:00:00 2001 From: Fullstop000 Date: Thu, 2 Apr 2020 11:34:52 +0800 Subject: [PATCH] immplement committed entries pagination Signed-off-by: Fullstop000 --- harness/src/network.rs | 13 ++- .../tests/integration_cases/test_raw_node.rs | 89 +++++++++++++++++++ src/config.rs | 10 ++- src/raft.rs | 4 +- src/raft_log.rs | 8 +- src/raw_node.rs | 10 ++- 6 files changed, 128 insertions(+), 6 deletions(-) diff --git a/harness/src/network.rs b/harness/src/network.rs index 1da2dc624..ffa130bce 100644 --- a/harness/src/network.rs +++ b/harness/src/network.rs @@ -49,6 +49,9 @@ pub struct Network { dropm: HashMap, /// Drop messages of type `MessageType`. ignorem: HashMap, + /// msg_hook is called for each message sent. It may inspect the + /// message and return true to send it or false to drop it. + pub msg_hook: Option bool>>, } impl Network { @@ -141,7 +144,15 @@ impl Network { }) .cloned() .unwrap_or(0f64); - rand::random::() >= perc + if rand::random::() < perc { + return false; + } + if let Some(hook) = &self.msg_hook { + if !hook(&m) { + return false; + } + } + true }) .collect() } diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index 384b23b39..c73b4b386 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -472,3 +472,92 @@ fn test_skip_bcast_commit() { assert_eq!(nt.peers[&2].raft_log.committed, 6); assert_eq!(nt.peers[&3].raft_log.committed, 6); } + +// test_append_pagination ensures that a message will never be sent with entries size overflowing the `max_msg_size` +#[test] +fn test_append_pagination() { + use std::cell::Cell; + use std::rc::Rc; + let l = default_logger(); + let mut config = new_test_config(1, 10, 1); + let max_size_per_msg = 2048; + config.max_size_per_msg = max_size_per_msg; + let mut nt = Network::new_with_config(vec![None, None, None], &config, &l); + let seen_full_msg = Rc::new(Cell::new(false)); + let b = seen_full_msg.clone(); + nt.msg_hook = Some(Box::new(move |m: &Message| -> bool { + if m.msg_type == MessageType::MsgAppend { + let total_size = m.entries.iter().fold(0, |acc, e| acc + e.data.len()); + if total_size as u64 > max_size_per_msg { + panic!("sent MsgApp that is too large: {} bytes", total_size); + } + if total_size as u64 > max_size_per_msg / 2 { + b.set(true); + } + } + true + })); + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + nt.isolate(1); + for _ in 0..5 { + let data = "a".repeat(1000); + nt.send(vec![new_message_with_entries( + 1, + 1, + MessageType::MsgPropose, + vec![new_entry(0, 0, Some(&data))], + )]); + } + nt.recover(); + // After the partition recovers, tick the clock to wake everything + // back up and send the messages. + nt.send(vec![new_message(1, 1, MessageType::MsgBeat, 0)]); + assert!( + seen_full_msg.get(), + "didn't see any messages more than half the max size; something is wrong with this test" + ); +} + +// test_commit_pagination ensures that the max size of committed entries must be limit under `max_committed_size_per_ready` to per ready +#[test] +fn test_commit_pagination() { + let l = default_logger(); + let storage = MemStorage::new_with_conf_state((vec![1], vec![])); + let mut config = new_test_config(1, 10, 1); + config.max_committed_size_per_ready = 2048; + let mut raw_node = RawNode::new(&config, storage, &l).unwrap(); + raw_node.campaign().unwrap(); + let rd = raw_node.ready(); + let committed_len = rd.committed_entries.as_ref().unwrap().len(); + assert_eq!( + committed_len, 1, + "expected 1 (empty) entry, got {}", + committed_len + ); + raw_node.mut_store().wl().append(rd.entries()).unwrap(); + raw_node.advance(rd); + let blob = "a".repeat(1000).into_bytes(); + for _ in 0..3 { + raw_node.propose(vec![], blob.clone()).unwrap(); + } + // The 3 proposals will commit in two batches. + let rd = raw_node.ready(); + let committed_len = rd.committed_entries.as_ref().unwrap().len(); + assert_eq!( + committed_len, 2, + "expected 2 entries in first batch, got {}", + committed_len + ); + raw_node.mut_store().wl().append(rd.entries()).unwrap(); + raw_node.advance(rd); + + let rd = raw_node.ready(); + let committed_len = rd.committed_entries.as_ref().unwrap().len(); + assert_eq!( + committed_len, 1, + "expected 1 entry in second batch, got {}", + committed_len + ); + raw_node.mut_store().wl().append(rd.entries()).unwrap(); + raw_node.advance(rd); +} diff --git a/src/config.rs b/src/config.rs index 37de52b3f..effa0c8ce 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,7 +17,7 @@ pub use super::read_only::{ReadOnlyOption, ReadState}; use super::{ errors::{Error, Result}, - INVALID_ID, + INVALID_ID, NO_LIMIT, }; /// Config contains the parameters to start a raft. @@ -48,9 +48,14 @@ pub struct Config { /// Limit the max size of each append message. Smaller value lowers /// the raft recovery cost(initial probing and message lost during normal operation). /// On the other side, it might affect the throughput during normal replication. - /// Note: math.MaxUusize64 for unlimited, 0 for at most one entry per message. + /// Note: raft::NO_LIMIT for unlimited, 0 for at most one entry per message. pub max_size_per_msg: u64, + /// MaxCommittedSizePerReady limits the size of the committed entries which + /// can be applied. + /// If not set, this is same as `max_size_per_msg` + pub max_committed_size_per_ready: u64, + /// Limit the max number of in-flight append messages during optimistic /// replication phase. The application transportation layer usually has its own sending /// buffer over TCP/UDP. Set to avoid overflowing that sending buffer. @@ -98,6 +103,7 @@ impl Default for Config { heartbeat_tick: HEARTBEAT_TICK, applied: 0, max_size_per_msg: 0, + max_committed_size_per_ready: NO_LIMIT, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, diff --git a/src/raft.rs b/src/raft.rs index 7c10550bc..dbd1e40ec 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -221,11 +221,13 @@ impl Raft { let conf_state = &raft_state.conf_state; let voters = &conf_state.voters; let learners = &conf_state.learners; + let mut raft_log = RaftLog::new(store, logger.clone()); + raft_log.max_next_ents_size = c.max_committed_size_per_ready; let mut r = Raft { id: c.id, read_states: Default::default(), - raft_log: RaftLog::new(store, logger.clone()), + raft_log, max_inflight: c.max_inflight_msgs, max_msg_size: c.max_size_per_msg, prs: Some(ProgressSet::with_capacity( diff --git a/src/raft_log.rs b/src/raft_log.rs index 33c560c5b..5dec3cc85 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -44,6 +44,10 @@ pub struct RaftLog { /// /// Invariant: applied <= committed pub applied: u64, + + /// max_next_ents_size is the maximum number aggregate byte size of the messages + /// returned from calls to nextEnts. + pub max_next_ents_size: u64, } impl ToString for RaftLog @@ -73,6 +77,8 @@ impl RaftLog { committed: first_index - 1, applied: first_index - 1, unstable: Unstable::new(last_index + 1, logger), + // use NO_LIMIT as default to keep the same with default max_committed_size_per_ready + max_next_ents_size: NO_LIMIT, } } @@ -359,7 +365,7 @@ impl RaftLog { let offset = cmp::max(since_idx + 1, self.first_index()); let committed = self.committed; if committed + 1 > offset { - match self.slice(offset, committed + 1, None) { + match self.slice(offset, committed + 1, self.max_next_ents_size) { Ok(vec) => return Some(vec), Err(e) => fatal!(self.unstable.logger, "{}", e), } diff --git a/src/raw_node.rs b/src/raw_node.rs index 55bbc1f5c..e7c2d78c2 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -134,11 +134,19 @@ impl Ready { if &ss != prev_ss { rd.ss = Some(ss); } - let hs = raft.hard_state(); + let mut hs = raft.hard_state(); if &hs != prev_hs { if hs.vote != prev_hs.vote || hs.term != prev_hs.term { rd.must_sync = true; } + // If we hit a size limit when loading committed_entries, clamp + // our hard_state.commit to what we're actually returning. This is + // also used as our cursor to resume for the next Ready. + if let Some(last) = rd.committed_entries.as_ref().and_then(|c| c.last()) { + if last.index < hs.get_commit() { + hs.set_commit(last.index) + } + } rd.hs = Some(hs); } if raft.raft_log.unstable.snapshot.is_some() {