From 536fff746c95acff2f4004a8f67569917650052e Mon Sep 17 00:00:00 2001 From: Arun Koshy <97870774+arun-koshy@users.noreply.github.com> Date: Mon, 13 Jan 2025 12:54:27 -0800 Subject: [PATCH] [consensus] cherrypick pr#20845 (#20867) ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --- consensus/core/src/core.rs | 96 +++++++++++++++++++++++++------- consensus/core/src/dag_state.rs | 98 +++++++++++++++++++++++++++++---- 2 files changed, 163 insertions(+), 31 deletions(-) diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 87e2eacdf50d7..7aae08598d537 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -699,8 +699,24 @@ impl Core { received_quorum_rounds: Vec, accepted_quorum_rounds: Vec, ) { - info!("Received quorum round per authority in ancestor state manager set to: {received_quorum_rounds:?}"); - info!("Accepted quorum round per authority in ancestor state manager set to: {accepted_quorum_rounds:?}"); + info!( + "Received quorum round per authority in ancestor state manager set to: {}", + self.context + .committee + .authorities() + .zip(received_quorum_rounds.iter()) + .map(|((i, _), rounds)| format!("{i}: {rounds:?}")) + .join(", ") + ); + info!( + "Accepted quorum round per authority in ancestor state manager set to: {}", + self.context + .committee + .authorities() + .zip(accepted_quorum_rounds.iter()) + .map(|((i, _), rounds)| format!("{i}: {rounds:?}")) + .join(", ") + ); self.ancestor_state_manager .set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds); info!("Propagation round delay set to: {delay}"); @@ -831,10 +847,8 @@ impl Core { clock_round: Round, smart_select: bool, ) -> Vec { - let _s = self - .context - .metrics - .node_metrics + let node_metrics = &self.context.metrics.node_metrics; + let _s = node_metrics .scope_processing_time .with_label_values(&["Core::smart_ancestors_to_propose"]) .start_timer(); @@ -904,7 +918,7 @@ impl Core { } if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) { - self.context.metrics.node_metrics.smart_selection_wait.inc(); + node_metrics.smart_selection_wait.inc(); debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake()); return vec![]; } @@ -924,32 +938,76 @@ impl Core { debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}"); parent_round_quorum.add(ancestor.author(), &self.context.committee); ancestors_to_propose.push(ancestor); - self.context - .metrics - .node_metrics + node_metrics .included_excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname, "strong"]) + .with_label_values(&[block_hostname, "timeout"]) .inc(); } else { excluded_ancestors.push((score, ancestor)); } } - assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); - + // Include partially propagated blocks from excluded authorities, to help propagate the blocks + // across the network with less latency impact. + // TODO: use a separate mechanism to propagate excluded ancestor blocks and remove this logic. for (score, ancestor) in excluded_ancestors.iter() { let excluded_author = ancestor.author(); let block_hostname = &self.context.committee.authority(excluded_author).hostname; + // A quorum of validators reported to have accepted blocks from the excluded_author up to the low quorum round. + let mut accepted_low_quorum_round = self + .ancestor_state_manager + .accepted_quorum_round_per_authority[excluded_author] + .0; + // If the accepted quorum round of this ancestor is greater than or equal + // to the clock round then we want to make sure to set it to clock_round - 1 + // as that is the max round the new block can include as an ancestor. + accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round); + + let last_included_round = self.last_included_ancestors[excluded_author] + .map(|block_ref| block_ref.round) + .unwrap_or(GENESIS_ROUND); + if last_included_round >= accepted_low_quorum_round { + trace!( + "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {} >= accepted low quorum round {}", + ancestor.reference(), last_included_round, accepted_low_quorum_round, + ); + node_metrics + .excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname]) + .inc(); + continue; + } - trace!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}"); - self.context - .metrics - .node_metrics - .excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname]) + // Include the ancestor block as it has been seen & accepted by a strong quorum. + let ancestor = if ancestor.round() == accepted_low_quorum_round { + ancestor.clone() + } else { + // Only cached blocks need to be propagated. Committed and GC'ed blocks do not need to be propagated. + let Some(ancestor) = self.dag_state.read().get_last_cached_block_in_range( + excluded_author, + last_included_round + 1, + accepted_low_quorum_round + 1, + ) else { + trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: no suitable block found", ancestor.reference()); + node_metrics + .excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname]) + .inc(); + continue; + }; + ancestor + }; + self.last_included_ancestors[excluded_author] = Some(ancestor.reference()); + ancestors_to_propose.push(ancestor.clone()); + trace!("Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}", ancestor.reference()); + node_metrics + .included_excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname, "quorum"]) .inc(); } + assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); + info!( "Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}", ancestors_to_propose.len(), diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index b4981b2ae2e66..4c93cc59d701f 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -502,10 +502,38 @@ impl DagState { blocks } - /// Returns the last block proposed per authority with `round < end_round`. + // Retrieves the cached block within the range [start_round, end_round) from a given authority. + // NOTE: end_round must be greater than GENESIS_ROUND. + pub(crate) fn get_last_cached_block_in_range( + &self, + authority: AuthorityIndex, + start_round: Round, + end_round: Round, + ) -> Option { + if end_round == GENESIS_ROUND { + panic!( + "Attempted to retrieve blocks earlier than the genesis round which is impossible" + ); + } + + let block_ref = self.recent_refs_by_authority[authority] + .range(( + Included(BlockRef::new(start_round, authority, BlockDigest::MIN)), + Excluded(BlockRef::new( + end_round, + AuthorityIndex::MIN, + BlockDigest::MIN, + )), + )) + .last()?; + + self.recent_blocks.get(block_ref).cloned() + } + + /// Returns the last block proposed per authority with `evicted round < round < end_round`. /// The method is guaranteed to return results only when the `end_round` is not earlier of the - /// available cached data for each authority, otherwise the method will panic - it's the caller's - /// responsibility to ensure that is not requesting filtering for earlier rounds . + /// available cached data for each authority (evicted round + 1), otherwise the method will panic. + /// It's the caller's responsibility to ensure that is not requesting for earlier rounds. /// In case of equivocation for an authority's last slot only one block will be returned (the last in order). pub(crate) fn get_last_cached_block_per_authority( &self, @@ -1957,7 +1985,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_get_cached_last_block_per_authority(#[values(0, 1)] gc_depth: u32) { + async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) { // GIVEN const CACHED_ROUNDS: Round = 2; let (mut context, _) = Context::new_for_test(4); @@ -2010,14 +2038,46 @@ mod test { // WHEN search for the latest blocks let end_round = 4; + let expected_rounds = vec![0, 1, 2, 3]; + + // THEN let last_blocks = dag_state.get_last_cached_block_per_authority(end_round); + assert_eq!( + last_blocks.iter().map(|b| b.round()).collect::>(), + expected_rounds + ); + + // THEN + for (i, expected_round) in expected_rounds.iter().enumerate() { + let round = dag_state + .get_last_cached_block_in_range( + context.committee.to_authority_index(i).unwrap(), + 0, + end_round, + ) + .map(|b| b.round()) + .unwrap_or_default(); + assert_eq!(round, *expected_round, "Authority {i}"); + } + + // WHEN starting from round 2 + let start_round = 2; + let expected_rounds = [0, 0, 2, 3]; // THEN - assert_eq!(last_blocks[0].round(), 0); - assert_eq!(last_blocks[1].round(), 1); - assert_eq!(last_blocks[2].round(), 2); - assert_eq!(last_blocks[3].round(), 3); + for (i, expected_round) in expected_rounds.iter().enumerate() { + let round = dag_state + .get_last_cached_block_in_range( + context.committee.to_authority_index(i).unwrap(), + start_round, + end_round, + ) + .map(|b| b.round()) + .unwrap_or_default(); + assert_eq!(round, *expected_round, "Authority {i}"); + } + // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND. // @@ -2027,13 +2087,27 @@ mod test { // AND we request before round 3 let end_round = 3; + let expected_rounds = vec![0, 1, 2, 2]; + + // THEN let last_blocks = dag_state.get_last_cached_block_per_authority(end_round); + assert_eq!( + last_blocks.iter().map(|b| b.round()).collect::>(), + expected_rounds + ); // THEN - assert_eq!(last_blocks[0].round(), 0); - assert_eq!(last_blocks[1].round(), 1); - assert_eq!(last_blocks[2].round(), 2); - assert_eq!(last_blocks[3].round(), 2); + for (i, expected_round) in expected_rounds.iter().enumerate() { + let round = dag_state + .get_last_cached_block_in_range( + context.committee.to_authority_index(i).unwrap(), + 0, + end_round, + ) + .map(|b| b.round()) + .unwrap_or_default(); + assert_eq!(round, *expected_round, "Authority {i}"); + } } #[tokio::test]