Skip to content

Commit 4b4e533

Browse files
arun-koshymwtian
andauthored
[CP] [consensus] Send excluded ancestors with new block messages (#20896) (#20924)
In an effort to continue using smart ancestor selection we have to ensure that we are not sacrificing on block propagation. This PR adds excluded ancestors as part of the message sent when a new block is created which can then be optimistically fetched by peers if they don't have these block refs. --- Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: Co-authored-by: Mingwei Tian <[email protected]>
1 parent 82d0a22 commit 4b4e533

17 files changed

+638
-163
lines changed

consensus/core/src/authority_service.rs

+97-15
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ use tokio_util::sync::ReusableBoxFuture;
1414
use tracing::{debug, info, warn};
1515

1616
use crate::{
17-
block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND},
17+
block::{BlockAPI as _, BlockRef, ExtendedBlock, SignedBlock, VerifiedBlock, GENESIS_ROUND},
1818
block_verifier::BlockVerifier,
1919
commit::{CommitAPI as _, CommitRange, TrustedCommit},
2020
commit_vote_monitor::CommitVoteMonitor,
2121
context::Context,
2222
core_thread::CoreThreadDispatcher,
2323
dag_state::DagState,
2424
error::{ConsensusError, ConsensusResult},
25-
network::{BlockStream, NetworkService},
25+
network::{BlockStream, ExtendedSerializedBlock, NetworkService},
2626
stake_aggregator::{QuorumThreshold, StakeAggregator},
2727
storage::Store,
2828
synchronizer::SynchronizerHandle,
@@ -38,7 +38,7 @@ pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
3838
block_verifier: Arc<dyn BlockVerifier>,
3939
synchronizer: Arc<SynchronizerHandle>,
4040
core_dispatcher: Arc<C>,
41-
rx_block_broadcaster: broadcast::Receiver<VerifiedBlock>,
41+
rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
4242
subscription_counter: Arc<SubscriptionCounter>,
4343
dag_state: Arc<RwLock<DagState>>,
4444
store: Arc<dyn Store>,
@@ -51,7 +51,7 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
5151
commit_vote_monitor: Arc<CommitVoteMonitor>,
5252
synchronizer: Arc<SynchronizerHandle>,
5353
core_dispatcher: Arc<C>,
54-
rx_block_broadcaster: broadcast::Receiver<VerifiedBlock>,
54+
rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
5555
dag_state: Arc<RwLock<DagState>>,
5656
store: Arc<dyn Store>,
5757
) -> Self {
@@ -78,15 +78,15 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
7878
async fn handle_send_block(
7979
&self,
8080
peer: AuthorityIndex,
81-
serialized_block: Bytes,
81+
serialized_block: ExtendedSerializedBlock,
8282
) -> ConsensusResult<()> {
8383
fail_point_async!("consensus-rpc-response");
8484

8585
let peer_hostname = &self.context.committee.authority(peer).hostname;
8686

8787
// TODO: dedup block verifications, here and with fetched blocks.
8888
let signed_block: SignedBlock =
89-
bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
89+
bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
9090

9191
// Reject blocks not produced by the peer.
9292
if peer != signed_block.author() {
@@ -113,7 +113,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
113113
info!("Invalid block from {}: {}", peer, e);
114114
return Err(e);
115115
}
116-
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
116+
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
117117
let block_ref = verified_block.reference();
118118
debug!("Received block {} via send block.", block_ref);
119119

@@ -225,6 +225,75 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
225225
}
226226
}
227227

228+
// ------------ After processing the block, process the excluded ancestors ------------
229+
230+
let mut excluded_ancestors = serialized_block
231+
.excluded_ancestors
232+
.into_iter()
233+
.map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
234+
.collect::<Result<Vec<BlockRef>, bcs::Error>>()
235+
.map_err(ConsensusError::MalformedBlock)?;
236+
237+
let excluded_ancestors_limit = self.context.committee.size() * 2;
238+
if excluded_ancestors.len() > excluded_ancestors_limit {
239+
debug!(
240+
"Dropping {} excluded ancestor(s) from {} {} due to size limit",
241+
excluded_ancestors.len() - excluded_ancestors_limit,
242+
peer,
243+
peer_hostname,
244+
);
245+
excluded_ancestors.truncate(excluded_ancestors_limit);
246+
}
247+
248+
self.context
249+
.metrics
250+
.node_metrics
251+
.network_received_excluded_ancestors_from_authority
252+
.with_label_values(&[peer_hostname])
253+
.inc_by(excluded_ancestors.len() as u64);
254+
255+
for excluded_ancestor in &excluded_ancestors {
256+
let excluded_ancestor_hostname = &self
257+
.context
258+
.committee
259+
.authority(excluded_ancestor.author)
260+
.hostname;
261+
self.context
262+
.metrics
263+
.node_metrics
264+
.network_excluded_ancestors_count_by_authority
265+
.with_label_values(&[excluded_ancestor_hostname])
266+
.inc();
267+
}
268+
269+
let missing_excluded_ancestors = self
270+
.core_dispatcher
271+
.check_block_refs(excluded_ancestors)
272+
.await
273+
.map_err(|_| ConsensusError::Shutdown)?;
274+
275+
if !missing_excluded_ancestors.is_empty() {
276+
self.context
277+
.metrics
278+
.node_metrics
279+
.network_excluded_ancestors_sent_to_fetch
280+
.with_label_values(&[peer_hostname])
281+
.inc_by(missing_excluded_ancestors.len() as u64);
282+
283+
let synchronizer = self.synchronizer.clone();
284+
tokio::spawn(async move {
285+
// schedule the fetching of them from this peer in the background
286+
if let Err(err) = synchronizer
287+
.fetch_blocks(missing_excluded_ancestors, peer)
288+
.await
289+
{
290+
warn!(
291+
"Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
292+
);
293+
}
294+
});
295+
}
296+
228297
Ok(())
229298
}
230299

@@ -243,7 +312,10 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
243312
dag_state
244313
.get_cached_blocks(self.context.own_index, last_received + 1)
245314
.into_iter()
246-
.map(|block| block.serialized().clone()),
315+
.map(|block| ExtendedSerializedBlock {
316+
block: block.serialized().clone(),
317+
excluded_ancestors: vec![],
318+
}),
247319
);
248320

249321
let broadcasted_blocks = BroadcastedBlockStream::new(
@@ -254,7 +326,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
254326

255327
// Return a stream of blocks that first yields missed blocks as requested, then new blocks.
256328
Ok(Box::pin(missed_blocks.chain(
257-
broadcasted_blocks.map(|block| block.serialized().clone()),
329+
broadcasted_blocks.map(ExtendedSerializedBlock::from),
258330
)))
259331
}
260332

@@ -423,7 +495,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
423495
.get_last_cached_block_per_authority(Round::MAX);
424496
let highest_accepted_rounds = blocks
425497
.into_iter()
426-
.map(|block| block.round())
498+
.map(|(block, _)| block.round())
427499
.collect::<Vec<_>>();
428500

429501
// Own blocks do not go through the core dispatcher, so they need to be set separately.
@@ -516,7 +588,7 @@ impl SubscriptionCounter {
516588

517589
/// Each broadcasted block stream wraps a broadcast receiver for blocks.
518590
/// It yields blocks that are broadcasted after the stream is created.
519-
type BroadcastedBlockStream = BroadcastStream<VerifiedBlock>;
591+
type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
520592

521593
/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
522594
/// this tolerates lags with only logging, without yielding errors.
@@ -612,15 +684,14 @@ async fn make_recv_future<T: Clone>(
612684
mod tests {
613685
use crate::{
614686
authority_service::AuthorityService,
615-
block::BlockAPI,
616-
block::{BlockRef, SignedBlock, TestBlock, VerifiedBlock},
687+
block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
617688
commit::CommitRange,
618689
commit_vote_monitor::CommitVoteMonitor,
619690
context::Context,
620691
core_thread::{CoreError, CoreThreadDispatcher},
621692
dag_state::DagState,
622693
error::ConsensusResult,
623-
network::{BlockStream, NetworkClient, NetworkService},
694+
network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
624695
round_prober::QuorumRound,
625696
storage::mem_store::MemStore,
626697
synchronizer::Synchronizer,
@@ -664,6 +735,13 @@ mod tests {
664735
Ok(block_refs)
665736
}
666737

738+
async fn check_block_refs(
739+
&self,
740+
_block_refs: Vec<BlockRef>,
741+
) -> Result<BTreeSet<BlockRef>, CoreError> {
742+
Ok(BTreeSet::new())
743+
}
744+
667745
async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
668746
Ok(())
669747
}
@@ -797,7 +875,11 @@ mod tests {
797875
);
798876

799877
let service = authority_service.clone();
800-
let serialized = input_block.serialized().clone();
878+
let serialized = ExtendedSerializedBlock {
879+
block: input_block.serialized().clone(),
880+
excluded_ancestors: vec![],
881+
};
882+
801883
tokio::spawn(async move {
802884
service
803885
.handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)

consensus/core/src/block.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use serde::{Deserialize, Serialize};
1919
use shared_crypto::intent::{Intent, IntentMessage, IntentScope};
2020

2121
use crate::{
22-
commit::CommitVote, context::Context, ensure, error::ConsensusError, error::ConsensusResult,
22+
commit::CommitVote,
23+
context::Context,
24+
ensure,
25+
error::{ConsensusError, ConsensusResult},
2326
};
2427

2528
/// Round number of a block.
@@ -638,6 +641,15 @@ impl fmt::Debug for VerifiedBlock {
638641
}
639642
}
640643

644+
/// Block with extended additional information, such as
645+
/// local blocks that are excluded from the block's ancestors.
646+
/// The extended information do not need to be certified or forwarded to other authorities.
647+
#[derive(Clone, Debug)]
648+
pub(crate) struct ExtendedBlock {
649+
pub block: VerifiedBlock,
650+
pub excluded_ancestors: Vec<BlockRef>,
651+
}
652+
641653
/// Generates the genesis blocks for the current Committee.
642654
/// The blocks are returned in authority index order.
643655
pub(crate) fn genesis_blocks(context: Arc<Context>) -> Vec<VerifiedBlock> {

0 commit comments

Comments
 (0)