Skip to content

Commit

Permalink
[consensus] cherrypick pr#20845 (#20867)
Browse files Browse the repository at this point in the history
## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
arun-koshy authored Jan 13, 2025
1 parent c07e414 commit 536fff7
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 31 deletions.
96 changes: 77 additions & 19 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,24 @@ impl Core {
received_quorum_rounds: Vec<QuorumRound>,
accepted_quorum_rounds: Vec<QuorumRound>,
) {
info!("Received quorum round per authority in ancestor state manager set to: {received_quorum_rounds:?}");
info!("Accepted quorum round per authority in ancestor state manager set to: {accepted_quorum_rounds:?}");
info!(
"Received quorum round per authority in ancestor state manager set to: {}",
self.context
.committee
.authorities()
.zip(received_quorum_rounds.iter())
.map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
.join(", ")
);
info!(
"Accepted quorum round per authority in ancestor state manager set to: {}",
self.context
.committee
.authorities()
.zip(accepted_quorum_rounds.iter())
.map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
.join(", ")
);
self.ancestor_state_manager
.set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
info!("Propagation round delay set to: {delay}");
Expand Down Expand Up @@ -831,10 +847,8 @@ impl Core {
clock_round: Round,
smart_select: bool,
) -> Vec<VerifiedBlock> {
let _s = self
.context
.metrics
.node_metrics
let node_metrics = &self.context.metrics.node_metrics;
let _s = node_metrics
.scope_processing_time
.with_label_values(&["Core::smart_ancestors_to_propose"])
.start_timer();
Expand Down Expand Up @@ -904,7 +918,7 @@ impl Core {
}

if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
self.context.metrics.node_metrics.smart_selection_wait.inc();
node_metrics.smart_selection_wait.inc();
debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake());
return vec![];
}
Expand All @@ -924,32 +938,76 @@ impl Core {
debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}");
parent_round_quorum.add(ancestor.author(), &self.context.committee);
ancestors_to_propose.push(ancestor);
self.context
.metrics
.node_metrics
node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "strong"])
.with_label_values(&[block_hostname, "timeout"])
.inc();
} else {
excluded_ancestors.push((score, ancestor));
}
}

assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core.");

// Include partially propagated blocks from excluded authorities, to help propagate the blocks
// across the network with less latency impact.
// TODO: use a separate mechanism to propagate excluded ancestor blocks and remove this logic.
for (score, ancestor) in excluded_ancestors.iter() {
let excluded_author = ancestor.author();
let block_hostname = &self.context.committee.authority(excluded_author).hostname;
// A quorum of validators reported to have accepted blocks from the excluded_author up to the low quorum round.
let mut accepted_low_quorum_round = self
.ancestor_state_manager
.accepted_quorum_round_per_authority[excluded_author]
.0;
// If the accepted quorum round of this ancestor is greater than or equal
// to the clock round then we want to make sure to set it to clock_round - 1
// as that is the max round the new block can include as an ancestor.
accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);

let last_included_round = self.last_included_ancestors[excluded_author]
.map(|block_ref| block_ref.round)
.unwrap_or(GENESIS_ROUND);
if last_included_round >= accepted_low_quorum_round {
trace!(
"Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {} >= accepted low quorum round {}",
ancestor.reference(), last_included_round, accepted_low_quorum_round,
);
node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.inc();
continue;
}

trace!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
// Include the ancestor block as it has been seen & accepted by a strong quorum.
let ancestor = if ancestor.round() == accepted_low_quorum_round {
ancestor.clone()
} else {
// Only cached blocks need to be propagated. Committed and GC'ed blocks do not need to be propagated.
let Some(ancestor) = self.dag_state.read().get_last_cached_block_in_range(
excluded_author,
last_included_round + 1,
accepted_low_quorum_round + 1,
) else {
trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: no suitable block found", ancestor.reference());
node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.inc();
continue;
};
ancestor
};
self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor.clone());
trace!("Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}", ancestor.reference());
node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "quorum"])
.inc();
}

assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core.");

info!(
"Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}",
ancestors_to_propose.len(),
Expand Down
98 changes: 86 additions & 12 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,38 @@ impl DagState {
blocks
}

/// Returns the last block proposed per authority with `round < end_round`.
// Retrieves the cached block within the range [start_round, end_round) from a given authority.
// NOTE: end_round must be greater than GENESIS_ROUND.
pub(crate) fn get_last_cached_block_in_range(
&self,
authority: AuthorityIndex,
start_round: Round,
end_round: Round,
) -> Option<VerifiedBlock> {
if end_round == GENESIS_ROUND {
panic!(
"Attempted to retrieve blocks earlier than the genesis round which is impossible"
);
}

let block_ref = self.recent_refs_by_authority[authority]
.range((
Included(BlockRef::new(start_round, authority, BlockDigest::MIN)),
Excluded(BlockRef::new(
end_round,
AuthorityIndex::MIN,
BlockDigest::MIN,
)),
))
.last()?;

self.recent_blocks.get(block_ref).cloned()
}

/// Returns the last block proposed per authority with `evicted round < round < end_round`.
/// The method is guaranteed to return results only when the `end_round` is not earlier of the
/// available cached data for each authority, otherwise the method will panic - it's the caller's
/// responsibility to ensure that is not requesting filtering for earlier rounds .
/// available cached data for each authority (evicted round + 1), otherwise the method will panic.
/// It's the caller's responsibility to ensure that is not requesting for earlier rounds.
/// In case of equivocation for an authority's last slot only one block will be returned (the last in order).
pub(crate) fn get_last_cached_block_per_authority(
&self,
Expand Down Expand Up @@ -1957,7 +1985,7 @@ mod test {

#[rstest]
#[tokio::test]
async fn test_get_cached_last_block_per_authority(#[values(0, 1)] gc_depth: u32) {
async fn test_get_last_cached_block(#[values(0, 1)] gc_depth: u32) {
// GIVEN
const CACHED_ROUNDS: Round = 2;
let (mut context, _) = Context::new_for_test(4);
Expand Down Expand Up @@ -2010,14 +2038,46 @@ mod test {

// WHEN search for the latest blocks
let end_round = 4;
let expected_rounds = vec![0, 1, 2, 3];

// THEN
let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
assert_eq!(
last_blocks.iter().map(|b| b.round()).collect::<Vec<_>>(),
expected_rounds
);

// THEN
for (i, expected_round) in expected_rounds.iter().enumerate() {
let round = dag_state
.get_last_cached_block_in_range(
context.committee.to_authority_index(i).unwrap(),
0,
end_round,
)
.map(|b| b.round())
.unwrap_or_default();
assert_eq!(round, *expected_round, "Authority {i}");
}

// WHEN starting from round 2
let start_round = 2;
let expected_rounds = [0, 0, 2, 3];

// THEN
assert_eq!(last_blocks[0].round(), 0);
assert_eq!(last_blocks[1].round(), 1);
assert_eq!(last_blocks[2].round(), 2);
assert_eq!(last_blocks[3].round(), 3);
for (i, expected_round) in expected_rounds.iter().enumerate() {
let round = dag_state
.get_last_cached_block_in_range(
context.committee.to_authority_index(i).unwrap(),
start_round,
end_round,
)
.map(|b| b.round())
.unwrap_or_default();
assert_eq!(round, *expected_round, "Authority {i}");
}

// WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
// WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
// a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
//
Expand All @@ -2027,13 +2087,27 @@ mod test {

// AND we request before round 3
let end_round = 3;
let expected_rounds = vec![0, 1, 2, 2];

// THEN
let last_blocks = dag_state.get_last_cached_block_per_authority(end_round);
assert_eq!(
last_blocks.iter().map(|b| b.round()).collect::<Vec<_>>(),
expected_rounds
);

// THEN
assert_eq!(last_blocks[0].round(), 0);
assert_eq!(last_blocks[1].round(), 1);
assert_eq!(last_blocks[2].round(), 2);
assert_eq!(last_blocks[3].round(), 2);
for (i, expected_round) in expected_rounds.iter().enumerate() {
let round = dag_state
.get_last_cached_block_in_range(
context.committee.to_authority_index(i).unwrap(),
0,
end_round,
)
.map(|b| b.round())
.unwrap_or_default();
assert_eq!(round, *expected_round, "Authority {i}");
}
}

#[tokio::test]
Expand Down

0 comments on commit 536fff7

Please sign in to comment.