Skip to content

Commit

Permalink
Remove Clone from Synchronizer
Browse files Browse the repository at this point in the history
Signed-off-by: Eval EXEC <[email protected]>
  • Loading branch information
eval-exec committed Sep 17, 2023
1 parent 5ba499a commit b65176e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 36 deletions.
62 changes: 36 additions & 26 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::types::{ActiveChain, BlockNumberAndHash, HeaderIndex, HeaderIndexView, IBDState};
use crate::SyncShared;
use ckb_constant::sync::{
BLOCK_DOWNLOAD_WINDOW, CHECK_POINT_WINDOW, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
};
Expand All @@ -9,34 +9,38 @@ use ckb_network::PeerIndex;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::packed;
use std::cmp::min;
use std::sync::Arc;

pub struct BlockFetcher<'a> {
synchronizer: &'a Synchronizer,
pub struct BlockFetcher {
sync_shared: Arc<SyncShared>,
peer: PeerIndex,
active_chain: ActiveChain,
ibd: IBDState,
}

impl<'a> BlockFetcher<'a> {
pub fn new(synchronizer: &'a Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self {
let active_chain = synchronizer.shared.active_chain();
impl BlockFetcher {
pub fn new(sync_shared: Arc<SyncShared>, peer: PeerIndex, ibd: IBDState) -> Self {
let active_chain = sync_shared.active_chain();
BlockFetcher {
sync_shared,
peer,
synchronizer,
active_chain,
ibd,
}
}

pub fn reached_inflight_limit(&self) -> bool {
let inflight = self.synchronizer.shared().state().read_inflight_blocks();
let inflight = self.sync_shared.state().read_inflight_blocks();

// Can't download any more from this peer
inflight.peer_can_fetch_count(self.peer) == 0
}

pub fn peer_best_known_header(&self) -> Option<HeaderIndex> {
self.synchronizer.peers().get_best_known_header(self.peer)
self.sync_shared
.state()
.peers()
.get_best_known_header(self.peer)
}

pub fn update_last_common_header(
Expand All @@ -45,23 +49,28 @@ impl<'a> BlockFetcher<'a> {
) -> Option<BlockNumberAndHash> {
// Bootstrap quickly by guessing an ancestor of our best tip is forking point.
// Guessing wrong in either direction is not a problem.
let mut last_common =
if let Some(header) = self.synchronizer.peers().get_last_common_header(self.peer) {
header
} else {
let tip_header = self.active_chain.tip_header();
let guess_number = min(tip_header.number(), best_known.number());
let guess_hash = self.active_chain.get_block_hash(guess_number)?;
(guess_number, guess_hash).into()
};
let mut last_common = if let Some(header) = self
.sync_shared
.state()
.peers()
.get_last_common_header(self.peer)
{
header
} else {
let tip_header = self.active_chain.tip_header();
let guess_number = min(tip_header.number(), best_known.number());
let guess_hash = self.active_chain.get_block_hash(guess_number)?;
(guess_number, guess_hash).into()
};

// If the peer reorganized, our previous last_common_header may not be an ancestor
// of its current tip anymore. Go back enough to fix that.
last_common = self
.active_chain
.last_common_ancestor(&last_common, best_known)?;

self.synchronizer
self.sync_shared
.state()
.peers()
.set_last_common_header(self.peer, last_common.clone());

Expand All @@ -80,13 +89,13 @@ impl<'a> BlockFetcher<'a> {
// Update `best_known_header` based on `unknown_header_list`. It must be involved before
// our acquiring the newest `best_known_header`.
if let IBDState::In = self.ibd {
let state = self.synchronizer.shared.state();
let state = self.sync_shared.state();
// unknown list is an ordered list, sorted from highest to lowest,
// when header hash unknown, break loop is ok
while let Some(hash) = state.peers().take_unknown_last(self.peer) {
// Here we need to first try search from headermap, if not, fallback to search from the db.
// if not search from db, it can stuck here when the headermap may have been removed just as the block was downloaded
if let Some(header) = self.synchronizer.shared.get_header_index_view(&hash, false) {
if let Some(header) = self.sync_shared.get_header_index_view(&hash, false) {
state
.peers()
.may_set_best_known_header(self.peer, header.as_header_index());
Expand Down Expand Up @@ -114,7 +123,8 @@ impl<'a> BlockFetcher<'a> {
// specially advance this peer's last_common_header at the case of both us on the same
// active chain.
if self.active_chain.is_main_chain(&best_known.hash()) {
self.synchronizer
self.sync_shared
.state()
.peers()
.set_last_common_header(self.peer, best_known.number_and_hash());
}
Expand All @@ -128,7 +138,7 @@ impl<'a> BlockFetcher<'a> {
return None;
}

let state = self.synchronizer.shared().state();
let state = self.sync_shared.state();
let mut inflight = state.write_inflight_blocks();
let mut start = last_common.number() + 1;
let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW);
Expand Down Expand Up @@ -156,7 +166,8 @@ impl<'a> BlockFetcher<'a> {
if status.contains(BlockStatus::BLOCK_STORED) {
// If the block is stored, its ancestor must on store
// So we can skip the search of this space directly
self.synchronizer
self.sync_shared
.state()
.peers()
.set_last_common_header(self.peer, header.number_and_hash());
end = min(best_known.number(), header.number() + BLOCK_DOWNLOAD_WINDOW);
Expand All @@ -172,8 +183,7 @@ impl<'a> BlockFetcher<'a> {

status = self.active_chain.get_block_status(&parent_hash);
header = self
.synchronizer
.shared
.sync_shared
.get_header_index_view(&parent_hash, false)?;
}

Expand Down
21 changes: 11 additions & 10 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ enum FetchCMD {
}

struct BlockFetchCMD {
sync: Synchronizer,
sync_shared: Arc<SyncShared>,
p2p_control: ServiceControl,
recv: channel::Receiver<FetchCMD>,
can_start: CanStart,
Expand All @@ -93,15 +93,17 @@ impl BlockFetchCMD {
FetchCMD::Fetch((peers, state)) => match self.can_start() {
CanStart::Ready => {
for peer in peers {
if let Some(fetch) = BlockFetcher::new(&self.sync, peer, state).fetch() {
if let Some(fetch) =
BlockFetcher::new(Arc::clone(&self.sync_shared), peer, state).fetch()
{
for item in fetch {
BlockFetchCMD::send_getblocks(item, &self.p2p_control, peer);
}
}
}
}
CanStart::MinWorkNotReach => {
let best_known = self.sync.shared.state().shared_best_header_ref();
let best_known = self.sync_shared.state().shared_best_header_ref();
let number = best_known.number();
if number != self.number && (number - self.number) % 10000 == 0 {
self.number = number;
Expand All @@ -111,12 +113,12 @@ impl BlockFetchCMD {
then start to download block",
number,
best_known.total_difficulty(),
self.sync.shared.state().min_chain_work()
self.sync_shared.state().min_chain_work()
);
}
}
CanStart::AssumeValidNotFound => {
let state = self.sync.shared.state();
let state = self.sync_shared.state();
let best_known = state.shared_best_header_ref();
let number = best_known.number();
let assume_valid_target: Byte32 = state
Expand Down Expand Up @@ -161,7 +163,7 @@ impl BlockFetchCMD {
return self.can_start;
}

let state = self.sync.shared.state();
let state = self.sync_shared.state();

let min_work_reach = |flag: &mut CanStart| {
if state.min_chain_work_ready() {
Expand Down Expand Up @@ -230,7 +232,6 @@ impl BlockFetchCMD {
}

/// Sync protocol handle
#[derive(Clone)]
pub struct Synchronizer {
pub(crate) chain: ChainController,
/// Sync shared state
Expand Down Expand Up @@ -364,7 +365,7 @@ impl Synchronizer {
peer: PeerIndex,
ibd: IBDState,
) -> Option<Vec<Vec<packed::Byte32>>> {
BlockFetcher::new(self, peer, ibd).fetch()
BlockFetcher::new(Arc::to_owned(self.shared()), peer, ibd).fetch()
}

pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
Expand Down Expand Up @@ -632,20 +633,20 @@ impl Synchronizer {
}
None => {
let p2p_control = raw.clone();
let sync = self.clone();
let (sender, recv) = channel::bounded(2);
let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
sender.send(FetchCMD::Fetch((peers, ibd))).unwrap();
self.fetch_channel = Some(sender);
let thread = ::std::thread::Builder::new();
let number = self.shared.state().shared_best_header_ref().number();
const THREAD_NAME: &str = "BlockDownload";
let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
let blockdownload_jh = thread
.name(THREAD_NAME.into())
.spawn(move || {
let stop_signal = new_crossbeam_exit_rx();
BlockFetchCMD {
sync,
sync_shared,
p2p_control,
recv,
number,
Expand Down

0 comments on commit b65176e

Please sign in to comment.