diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index a7af2f7b..9a22d7c5 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -107,7 +107,7 @@ fn next_ents(r: &mut Raft, s: &MemStorage) -> Vec { s.wl().append(&unstable).expect(""); r.on_persist_entries(last_idx, last_term); } - let ents = r.raft_log.next_entries(); + let ents = r.raft_log.next_entries(None); r.commit_apply(r.raft_log.committed); ents.unwrap_or_else(Vec::new) } diff --git a/harness/tests/integration_cases/test_raft_paper.rs b/harness/tests/integration_cases/test_raft_paper.rs index fbda6519..3e435839 100644 --- a/harness/tests/integration_cases/test_raft_paper.rs +++ b/harness/tests/integration_cases/test_raft_paper.rs @@ -482,7 +482,7 @@ fn test_leader_commit_entry() { assert_eq!(r.raft_log.committed, li + 1); let wents = vec![new_entry(1, li + 1, SOME_DATA)]; - assert_eq!(r.raft_log.next_entries(), Some(wents)); + assert_eq!(r.raft_log.next_entries(None), Some(wents)); let mut msgs = r.read_messages(); msgs.sort_by_key(|m| format!("{:?}", m)); for (i, m) in msgs.drain(..).enumerate() { @@ -572,7 +572,7 @@ fn test_leader_commit_preceding_entries() { empty_entry(3, li + 1), new_entry(3, li + 2, SOME_DATA), ]); - let g = r.raft_log.next_entries(); + let g = r.raft_log.next_entries(None); let wg = Some(tt); if g != wg { panic!("#{}: ents = {:?}, want {:?}", i, g, wg); @@ -629,7 +629,7 @@ fn test_follower_commit_entry() { ); } let wents = Some(ents[..commit as usize].to_vec()); - let g = r.raft_log.next_entries(); + let g = r.raft_log.next_entries(None); if g != wents { panic!("#{}: next_ents = {:?}, want {:?}", i, g, wents); } diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index 81987401..d9248572 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -1581,3 +1581,140 @@ fn test_async_ready_multiple_snapshot() { raw_node.advance_apply_to(20); } + +#[test] +fn test_committed_entries_pagination() { + let l = default_logger(); + let s = new_storage(); + let mut raw_node = new_raw_node(1, vec![1, 2, 3], 10, 1, s, &l); + + let mut entries = vec![]; + for i in 2..10 { + entries.push(new_entry(1, i, None)); + } + let mut msg = new_message_with_entries(3, 1, MessageType::MsgAppend, entries.to_vec()); + msg.set_term(1); + msg.set_index(1); + msg.set_log_term(1); + msg.set_commit(9); + raw_node.step(msg).unwrap(); + + // Test unpersisted entries won't be fetched. + // NOTE: maybe it's better to allow fetching unpersisted committed entries. + let rd = raw_node.ready(); + assert!(rd.committed_entries().is_empty()); + assert!(raw_node.has_ready()); + + // Persist entries. + assert!(!rd.entries().is_empty()); + raw_node.store().wl().append(rd.entries()).unwrap(); + + // Advance the ready, and we can get committed_entries as expected. + // Test using 0 as `committed_entries_max_size` works as expected. + raw_node.raft.set_max_committed_size_per_ready(0); + let rd = raw_node.advance(rd); + // `MemStorage::entries` uses `util::limit_size` to limit size of committed entries. + // So there will be at least one entry. + assert_eq!(rd.committed_entries().len(), 1); + + // Fetch a `Ready` again without size limit for committed entries. + assert!(raw_node.has_ready()); + raw_node.raft.set_max_committed_size_per_ready(u64::MAX); + let rd = raw_node.ready(); + assert_eq!(rd.committed_entries().len(), 7); + + // No more `Ready`s. + assert!(!raw_node.has_ready()); +} + +/// Test with `commit_since_index`, committed entries can be fetched correctly after restart. +/// +/// Case steps: +/// - Node learns that index 10 is committed +/// - `next_entries` returns entries [2..11) in committed_entries (but index 10 already +/// exceeds maxBytes), which isn't noticed internally by Raft +/// - Commit index gets bumped to 10 +/// - The node persists the `HardState`, but crashes before applying the entries +/// - Upon restart, the storage returns the same entries, but `slice` takes a +/// different code path and removes the last entry. +/// - Raft does not emit a HardState, but when the app calls advance(), it bumps +/// its internal applied index cursor to 10 (when it should be 9) +/// - The next `Ready` asks the app to apply index 11 (omitting index 10), losing a +/// write. +#[test] +fn test_committed_entries_pagination_after_restart() { + let l = default_logger(); + let s = IgnoreSizeHintMemStorage::default(); + s.inner + .wl() + .apply_snapshot(new_snapshot(1, 1, vec![1, 2, 3])) + .unwrap(); + + let (mut entries, mut size) = (vec![], 0); + for i in 2..=10 { + let e = new_entry(1, i, Some("test data")); + size += e.compute_size() as u64; + entries.push(e); + } + s.inner.wl().append(&entries).unwrap(); + s.inner.wl().mut_hard_state().commit = 10; + + s.inner + .wl() + .append(&[new_entry(1, 11, Some("boom"))]) + .unwrap(); + + let config = new_test_config(1, 10, 1); + let mut raw_node = RawNode::new(&config, s, &l).unwrap(); + + // `IgnoreSizeHintMemStorage` will ignore `max_committed_size_per_ready` but + // `RaftLog::slice won't.` + raw_node.raft.set_max_committed_size_per_ready(size - 1); + + let mut highest_applied = 1; + while highest_applied != 11 { + let mut rd = raw_node.ready(); + let committed_entries = rd.take_committed_entries(); + let next = committed_entries.first().map(|x| x.index).unwrap(); + assert_eq!(highest_applied + 1, next); + + highest_applied = committed_entries.last().map(|x| x.index).unwrap(); + raw_node.raft.raft_log.commit_to(11); + } +} + +#[derive(Default)] +struct IgnoreSizeHintMemStorage { + inner: MemStorage, +} + +impl Storage for IgnoreSizeHintMemStorage { + fn initial_state(&self) -> Result { + self.inner.initial_state() + } + + fn entries( + &self, + low: u64, + high: u64, + _max_size: impl Into>, + ) -> Result> { + self.inner.entries(low, high, u64::MAX) + } + + fn term(&self, idx: u64) -> Result { + self.inner.term(idx) + } + + fn first_index(&self) -> Result { + self.inner.first_index() + } + + fn last_index(&self) -> Result { + self.inner.last_index() + } + + fn snapshot(&self, request_index: u64) -> Result { + self.inner.snapshot(request_index) + } +} diff --git a/src/config.rs b/src/config.rs index 5d170a8b..392540db 100644 --- a/src/config.rs +++ b/src/config.rs @@ -95,6 +95,9 @@ pub struct Config { /// Specify maximum of uncommitted entry size. /// When this limit is reached, all proposals to append new log will be dropped pub max_uncommitted_size: u64, + + /// Max size for committed entries in a `Ready`. + pub max_committed_size_per_ready: u64, } impl Default for Config { @@ -116,6 +119,7 @@ impl Default for Config { batch_append: false, priority: 0, max_uncommitted_size: NO_LIMIT, + max_committed_size_per_ready: NO_LIMIT, } } } diff --git a/src/log_unstable.rs b/src/log_unstable.rs index 58b8de01..c4491a97 100644 --- a/src/log_unstable.rs +++ b/src/log_unstable.rs @@ -150,20 +150,18 @@ impl Unstable { let after = ents[0].index; if after == self.offset + self.entries.len() as u64 { // after is the next index in the self.entries, append directly - self.entries.extend_from_slice(ents); } else if after <= self.offset { // The log is being truncated to before our current offset // portion, so set the offset and replace the entries self.offset = after; self.entries.clear(); - self.entries.extend_from_slice(ents); } else { // truncate to after and copy to self.entries then append let off = self.offset; self.must_check_outofbounds(off, after); self.entries.truncate((after - off) as usize); - self.entries.extend_from_slice(ents); } + self.entries.extend_from_slice(ents); } /// Returns a slice of entries between the high and low. diff --git a/src/raft.rs b/src/raft.rs index c540bfd7..8d93fe38 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -257,6 +257,9 @@ pub struct RaftCore { /// Track uncommitted log entry on this node uncommitted_state: UncommittedState, + + /// Max size per committed entries in a `Read`. + pub(crate) max_committed_size_per_ready: u64, } /// A struct that represents the raft consensus itself. Stores details concerning the current @@ -361,6 +364,7 @@ impl Raft { uncommitted_size: 0, last_log_tail_index: 0, }, + max_committed_size_per_ready: c.max_committed_size_per_ready, }, }; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; @@ -586,6 +590,11 @@ impl Raft { .term(self.raft_log.applied) .map_or(false, |t| t == self.term) } + + /// Set `max_committed_size_per_ready` to `size`. + pub fn set_max_committed_size_per_ready(&mut self, size: u64) { + self.max_committed_size_per_ready = size; + } } impl RaftCore { diff --git a/src/raft_log.rs b/src/raft_log.rs index 15412faa..dae4c03e 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -414,11 +414,11 @@ impl RaftLog { } /// Returns committed and persisted entries since max(`since_idx` + 1, first_index). - pub fn next_entries_since(&self, since_idx: u64) -> Option> { + pub fn next_entries_since(&self, since_idx: u64, max_size: Option) -> Option> { let offset = cmp::max(since_idx + 1, self.first_index()); let high = cmp::min(self.committed, self.persisted) + 1; if high > offset { - match self.slice(offset, high, None) { + match self.slice(offset, high, max_size) { Ok(vec) => return Some(vec), Err(e) => fatal!(self.unstable.logger, "{}", e), } @@ -429,8 +429,8 @@ impl RaftLog { /// Returns all the available entries for execution. /// If applied is smaller than the index of snapshot, it returns all committed /// entries after the index of snapshot. - pub fn next_entries(&self) -> Option> { - self.next_entries_since(self.applied) + pub fn next_entries(&self, max_size: Option) -> Option> { + self.next_entries_since(self.applied, max_size) } /// Returns whether there are committed and persisted entries since @@ -1126,7 +1126,7 @@ mod test { ); } - let next_entries = raft_log.next_entries(); + let next_entries = raft_log.next_entries(None); if next_entries != expect_entries.map(|n| n.to_vec()) { panic!( "#{}: next_entries = {:?}, want {:?}", diff --git a/src/raw_node.rs b/src/raw_node.rs index bc407fb8..c7f77dfe 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -413,10 +413,11 @@ impl RawNode { /// Generates a LightReady that has the committed entries and messages but no commit index. fn gen_light_ready(&mut self) -> LightReady { let mut rd = LightReady::default(); + let max_size = Some(self.raft.max_committed_size_per_ready); let raft = &mut self.raft; rd.committed_entries = raft .raft_log - .next_entries_since(self.commit_since_index) + .next_entries_since(self.commit_since_index, max_size) .unwrap_or_default(); // Update raft uncommitted entries size raft.reduce_uncommitted_size(&rd.committed_entries); diff --git a/src/util.rs b/src/util.rs index 28f870d0..d8cb92de 100644 --- a/src/util.rs +++ b/src/util.rs @@ -64,11 +64,10 @@ pub fn limit_size(entries: &mut Vec, max: Option) .take_while(|&e| { if size == 0 { size += u64::from(e.compute_size()); - true - } else { - size += u64::from(e.compute_size()); - size <= max + return true; } + size += u64::from(e.compute_size()); + size <= max }) .count();