Skip to content

Commit 17aadad

Browse files
committed
Transition block lookup sync to range sync
1 parent 99e53b8 commit 17aadad

File tree

8 files changed

+197
-31
lines changed

8 files changed

+197
-31
lines changed

beacon_node/beacon_chain/src/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2604,7 +2604,7 @@ pub struct MakeAttestationOptions {
26042604
pub fn build_log(level: slog::Level, enabled: bool) -> Logger {
26052605
let decorator = TermDecorator::new().build();
26062606
let drain = FullFormat::new(decorator).build().fuse();
2607-
let drain = Async::new(drain).build().fuse();
2607+
let drain = Async::new(drain).chan_size(10_000).build().fuse();
26082608

26092609
if enabled {
26102610
Logger::root(drain.filter_level(level).fuse(), o!())

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::sync::Arc;
2323
use std::time::Duration;
2424
use store::MemoryStore;
2525
use task_executor::TaskExecutor;
26+
use tokio::sync::mpsc::UnboundedSender;
2627
use tokio::sync::mpsc::{self, error::TrySendError};
2728
use types::*;
2829

@@ -751,7 +752,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
751752
/// Send a message to `sync_tx`.
752753
///
753754
/// Creates a log if there is an internal error.
754-
fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
755+
pub(crate) fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
755756
self.sync_tx.send(message).unwrap_or_else(|e| {
756757
debug!(self.log, "Could not send message to the sync service";
757758
"error" => %e)
@@ -779,6 +780,7 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
779780
// processor (but not much else).
780781
pub fn null_for_testing(
781782
network_globals: Arc<NetworkGlobals<E>>,
783+
sync_tx: UnboundedSender<SyncMessage<E>>,
782784
chain: Arc<BeaconChain<TestBeaconChainType<E>>>,
783785
executor: TaskExecutor,
784786
log: Logger,
@@ -791,7 +793,6 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
791793
} = <_>::default();
792794

793795
let (network_tx, _network_rx) = mpsc::unbounded_channel();
794-
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();
795796

796797
let network_beacon_processor = Self {
797798
beacon_processor_send: beacon_processor_tx,

beacon_node/network/src/sync/block_lookups/mod.rs

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
2828
use crate::metrics;
2929
use crate::sync::block_lookups::common::ResponseType;
3030
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
31+
use crate::sync::SyncMessage;
3132
use beacon_chain::block_verification_types::AsBlock;
3233
use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory;
3334
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
@@ -53,7 +54,10 @@ mod tests;
5354
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
5455
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
5556
/// is further back than the most recent head slot.
56-
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
57+
///
58+
/// Have the same value as range's sync tolerance to consider a peer synced. Once sync lookup
59+
/// reaches the maximum depth it will force trigger range sync.
60+
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE;
5761

5862
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
5963
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
@@ -252,22 +256,52 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
252256
// blocks on top of A forming A -> C. The malicious peer forces us to fetch C
253257
// from it, which will result in parent A hitting the chain_too_long error. Then
254258
// the valid chain A -> B is dropped too.
255-
if let Ok(block_to_drop) = find_oldest_fork_ancestor(parent_chains, chain_idx) {
256-
// Drop all lookups descending from the child of the too long parent chain
257-
if let Some((lookup_id, lookup)) = self
259+
//
260+
// `find_oldest_fork_ancestor` should never return Err, unwrapping to tip for
261+
// complete-ness
262+
let parent_chain_tip = parent_chain.tip;
263+
let block_to_drop =
264+
find_oldest_fork_ancestor(parent_chains, chain_idx).unwrap_or(parent_chain_tip);
265+
// Drop all lookups descending from the child of the too long parent chain
266+
if let Some((lookup_id, lookup)) = self
267+
.single_block_lookups
268+
.iter()
269+
.find(|(_, l)| l.block_root() == block_to_drop)
270+
{
271+
// If a lookup chain is too long, we can't distinguish a valid chain from a
272+
// malicious one. We must attempt to sync this chain to not lose liveness. If
273+
// the chain grows too long, we stop lookup sync and transition this head to
274+
// forward range sync. We need to tell range sync which head to sync to, and
275+
// from which peers. The lookup of the very tip of this chain may contain zero
276+
// peers if it's the parent-child lookup. So we do a bit of a trick here:
277+
// - Tell range sync to sync to the tip's root (if available, else its ancestor)
278+
// - But use all peers in the ancestor lookup, which should have at least one
279+
// peer, and its peer set is a strict superset of the tip's lookup.
280+
let (remote_head_root, remote_head_slot) = match self
258281
.single_block_lookups
259282
.iter()
260-
.find(|(_, l)| l.block_root() == block_to_drop)
283+
.find(|(_, l)| l.block_root() == parent_chain_tip)
261284
{
262-
for &peer_id in lookup.all_peers() {
263-
cx.report_peer(
264-
peer_id,
265-
PeerAction::LowToleranceError,
266-
"chain_too_long",
267-
);
285+
Some((_, tip_lookup)) => {
286+
(parent_chain_tip, tip_lookup.peek_downloaded_block_slot())
268287
}
269-
self.drop_lookup_and_children(*lookup_id);
270-
}
288+
None => (lookup.block_root(), lookup.peek_downloaded_block_slot()),
289+
};
290+
291+
cx.send_sync_message(SyncMessage::AddPeersForceRangeSync {
292+
peers: lookup.all_peers().copied().collect(),
293+
head_slot: remote_head_slot,
294+
head_root: remote_head_root,
295+
});
296+
297+
// Do not downscore peers here. Because we can't distinguish a valid chain from
298+
// a malicious one we may penalize honest peers for attempting to discover us a
299+
// valid chain. Until blocks_by_range allows to specify a tip, for example with
300+
// https://github.com/ethereum/consensus-specs/pull/3845 we will have poor
301+
// attributability. A peer can send us garbage blocks over blocks_by_root, and
302+
// then correct blocks via blocks_by_range.
303+
304+
self.drop_lookup_and_children(*lookup_id);
271305
}
272306

273307
return false;

beacon_node/network/src/sync/block_lookups/single_block_lookup.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::time::{Duration, Instant};
1616
use store::Hash256;
1717
use strum::IntoStaticStr;
1818
use types::blob_sidecar::FixedBlobSidecarList;
19-
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock};
19+
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};
2020

2121
// Dedicated enum for LookupResult to force its usage
2222
#[must_use = "LookupResult must be handled with on_lookup_result"]
@@ -92,6 +92,14 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
9292
}
9393
}
9494

95+
/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
96+
pub fn peek_downloaded_block_slot(&self) -> Option<Slot> {
97+
self.block_request_state
98+
.state
99+
.peek_downloaded_data()
100+
.map(|block| block.slot())
101+
}
102+
95103
/// Get the block root that is being requested.
96104
pub fn block_root(&self) -> Hash256 {
97105
self.block_root

beacon_node/network/src/sync/block_lookups/tests.rs

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::network_beacon_processor::NetworkBeaconProcessor;
22
use crate::sync::manager::{BlockProcessType, SyncManager};
3+
use crate::sync::range_sync::RangeSyncType;
34
use crate::sync::sampling::SamplingConfig;
45
use crate::sync::{SamplingId, SyncMessage};
56
use crate::NetworkMessage;
@@ -77,6 +78,8 @@ struct TestRig {
7778
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
7879
/// Stores all `NetworkMessage`s received from `network_recv`. (e.g. outgoing RPC requests)
7980
network_rx_queue: Vec<NetworkMessage<E>>,
81+
/// Receiver for `SyncMessage` from the network
82+
sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
8083
/// To send `SyncMessage`. For sending RPC responses or block processing results to sync.
8184
sync_manager: SyncManager<T>,
8285
/// To manipulate sync state and peer connection status
@@ -130,22 +133,20 @@ impl TestRig {
130133
let chain = harness.chain.clone();
131134

132135
let (network_tx, network_rx) = mpsc::unbounded_channel();
133-
// TODO(das): make the generation of the ENR use the deterministic rng to have consistent
134-
// column assignments
136+
let (sync_tx, sync_rx) = mpsc::unbounded_channel::<SyncMessage<E>>();
135137
let globals = Arc::new(NetworkGlobals::new_test_globals(
136138
Vec::new(),
137139
&log,
138140
chain.spec.clone(),
139141
));
140142
let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing(
141143
globals,
144+
sync_tx,
142145
chain.clone(),
143146
harness.runtime.task_executor.clone(),
144147
log.clone(),
145148
);
146149

147-
let (_sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<E>>();
148-
149150
let fork_name = chain.spec.fork_name_at_slot::<E>(chain.slot().unwrap());
150151

151152
// All current tests expect synced and EL online state
@@ -159,13 +160,15 @@ impl TestRig {
159160
beacon_processor_rx_queue: vec![],
160161
network_rx,
161162
network_rx_queue: vec![],
163+
sync_rx,
162164
rng,
163165
network_globals: beacon_processor.network_globals.clone(),
164166
sync_manager: SyncManager::new(
165167
chain,
166168
network_tx,
167169
beacon_processor.into(),
168-
sync_recv,
170+
// Pass empty recv not tied to any tx
171+
mpsc::unbounded_channel().1,
169172
SamplingConfig::Custom {
170173
required_successes: vec![SAMPLING_REQUIRED_SUCCESSES],
171174
},
@@ -232,6 +235,13 @@ impl TestRig {
232235
self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot))
233236
}
234237

238+
/// Drain all sync messages in the sync_rx attached to the beacon processor
239+
fn drain_sync_rx(&mut self) {
240+
while let Ok(sync_message) = self.sync_rx.try_recv() {
241+
self.send_sync_message(sync_message);
242+
}
243+
}
244+
235245
fn rand_block(&mut self) -> SignedBeaconBlock<E> {
236246
self.rand_block_and_blobs(NumBlobs::None).0
237247
}
@@ -288,6 +298,10 @@ impl TestRig {
288298
self.sync_manager.active_parent_lookups().len()
289299
}
290300

301+
fn active_range_sync_chain(&self) -> (RangeSyncType, Slot, Slot) {
302+
self.sync_manager.get_range_sync_chains().unwrap().unwrap()
303+
}
304+
291305
fn assert_single_lookups_count(&self, count: usize) {
292306
assert_eq!(
293307
self.active_single_lookups_count(),
@@ -1665,7 +1679,18 @@ fn test_parent_lookup_too_deep_grow_ancestor() {
16651679
)
16661680
}
16671681

1668-
rig.expect_penalty(peer_id, "chain_too_long");
1682+
// Should create a new syncing chain
1683+
rig.drain_sync_rx();
1684+
assert_eq!(
1685+
rig.active_range_sync_chain(),
1686+
(
1687+
RangeSyncType::Head,
1688+
Slot::new(0),
1689+
Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 1)
1690+
)
1691+
);
1692+
// Should not penalize peer, but network is not clear because of the blocks_by_range requests
1693+
rig.expect_no_penalty_for(peer_id);
16691694
rig.assert_failed_chain(chain_hash);
16701695
}
16711696

@@ -1689,7 +1714,18 @@ fn test_parent_lookup_too_deep_grow_tip() {
16891714
);
16901715
}
16911716

1692-
rig.expect_penalty(peer_id, "chain_too_long");
1717+
// Should create a new syncing chain
1718+
rig.drain_sync_rx();
1719+
assert_eq!(
1720+
rig.active_range_sync_chain(),
1721+
(
1722+
RangeSyncType::Head,
1723+
Slot::new(0),
1724+
Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 2)
1725+
)
1726+
);
1727+
// Should not penalize peer, but network is not clear because of the blocks_by_range requests
1728+
rig.expect_no_penalty_for(peer_id);
16931729
rig.assert_failed_chain(tip.canonical_root());
16941730
}
16951731

0 commit comments

Comments
 (0)