Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions beacon_node/network/src/sync/custody_backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ pub struct CustodyBackFillSync<T: BeaconChainTypes> {
skipped_batches: HashSet<BatchId>,

/// When a custody backfill sync fails, we keep track of whether a new fully synced peer has joined.
/// This signifies that we are able to attempt to restart a failed chain.
/// This signifies that we are able to attempt to restart a failed sync.
restart_failed_sync: bool,

/// Indicates that the custody backfill sync has failed and is waiting to be retried.
failed_sync: bool,

/// Reference to the beacon chain to obtain initial starting points for custody backfill sync.
beacon_chain: Arc<BeaconChain<T>>,

Expand All @@ -142,6 +145,7 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
current_processing_batch: None,
validated_batches: 0,
restart_failed_sync: false,
failed_sync: false,
beacon_chain,
network_globals,
}
Expand Down Expand Up @@ -202,6 +206,7 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
self.batches.clear();
self.skipped_batches.clear();
self.restart_failed_sync = false;
self.failed_sync = false;

// Reset all downloading and processing targets
// NOTE: Lets keep validated_batches for posterity
Expand Down Expand Up @@ -244,12 +249,16 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
}

if self.check_completed() {
self.failed_sync = false;
self.restart_failed_sync = false;
self.set_state(CustodyBackFillState::Completed);
return Ok(SyncStart::NotSyncing);
}
}
CustodyBackFillState::Pending(_) | CustodyBackFillState::Completed => {
if self.check_completed() {
self.failed_sync = false;
self.restart_failed_sync = false;
self.set_state(CustodyBackFillState::Completed);
return Ok(SyncStart::NotSyncing);
}
Expand All @@ -258,7 +267,18 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
if !self.should_start_custody_backfill_sync() {
return Ok(SyncStart::NotSyncing);
}
self.set_start_epoch();

// If the last custody backfill attempt failed, only restart once a new fully
// synced peer has joined and set `restart_failed_sync`.
if self.failed_sync {
if !self.restart_failed_sync {
return Ok(SyncStart::NotSyncing);
}
// We can now safely restart a failed sync with a fresh run id.
self.restart_sync();
} else {
self.set_start_epoch();
}
if self
.network_globals
.peers
Expand Down Expand Up @@ -734,7 +754,6 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
"Custody backfill sync completed"
);
self.batches.clear();
self.restart_failed_sync = false;
self.processing_target = self.current_start;
self.to_be_downloaded = self.current_start;
self.last_batch_downloaded = false;
Expand Down Expand Up @@ -1089,19 +1108,22 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
return Ok(());
}

// Mark this sync as failed and wait for a new fully synced peer before restarting.
self.failed_sync = true;
self.restart_failed_sync = false;

// Set the state
self.pause("Sync has failed".to_string());
// Remove all batches and active requests.
self.batches.clear();
self.restart_failed_sync = false;
self.skipped_batches.clear();

// Reset all downloading and processing targets
// NOTE: Lets keep validated_batches for posterity
self.processing_target = self.current_start;
self.to_be_downloaded = self.current_start;
self.last_batch_downloaded = false;
self.current_processing_batch = None;
self.restart_sync();

Err(error)
}
Expand All @@ -1116,10 +1138,10 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
}

/// A fully synced peer has joined us.
/// If we are in a failed state, update a local variable to indicate we are able to restart
/// the failed sync on the next attempt.
/// If the last custody backfill sync failed, update a local variable to indicate we are able
/// to restart the failed sync on the next attempt.
pub fn fully_synced_peer_joined(&mut self) {
if matches!(self.state(), CustodyBackFillState::Pending(_)) {
if self.failed_sync && matches!(self.state(), CustodyBackFillState::Pending(_)) {
self.restart_failed_sync = true;
}
}
Expand Down