Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove #[derive(Clone)] from Synchronizer #4161

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::types::{ActiveChain, BlockNumberAndHash, HeaderIndex, HeaderIndexView, IBDState};
use crate::SyncShared;
use ckb_constant::sync::{
BLOCK_DOWNLOAD_WINDOW, CHECK_POINT_WINDOW, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
};
Expand All @@ -9,34 +9,38 @@ use ckb_network::PeerIndex;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::packed;
use std::cmp::min;
use std::sync::Arc;

pub struct BlockFetcher<'a> {
synchronizer: &'a Synchronizer,
pub struct BlockFetcher {
sync_shared: Arc<SyncShared>,
peer: PeerIndex,
active_chain: ActiveChain,
ibd: IBDState,
}

impl<'a> BlockFetcher<'a> {
pub fn new(synchronizer: &'a Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self {
let active_chain = synchronizer.shared.active_chain();
impl BlockFetcher {
pub fn new(sync_shared: Arc<SyncShared>, peer: PeerIndex, ibd: IBDState) -> Self {
let active_chain = sync_shared.active_chain();
BlockFetcher {
sync_shared,
peer,
synchronizer,
active_chain,
ibd,
}
}

pub fn reached_inflight_limit(&self) -> bool {
let inflight = self.synchronizer.shared().state().read_inflight_blocks();
let inflight = self.sync_shared.state().read_inflight_blocks();

// Can't download any more from this peer
inflight.peer_can_fetch_count(self.peer) == 0
}

pub fn peer_best_known_header(&self) -> Option<HeaderIndex> {
self.synchronizer.peers().get_best_known_header(self.peer)
self.sync_shared
.state()
.peers()
.get_best_known_header(self.peer)
}

pub fn update_last_common_header(
Expand All @@ -45,23 +49,28 @@ impl<'a> BlockFetcher<'a> {
) -> Option<BlockNumberAndHash> {
// Bootstrap quickly by guessing an ancestor of our best tip is forking point.
// Guessing wrong in either direction is not a problem.
let mut last_common =
if let Some(header) = self.synchronizer.peers().get_last_common_header(self.peer) {
header
} else {
let tip_header = self.active_chain.tip_header();
let guess_number = min(tip_header.number(), best_known.number());
let guess_hash = self.active_chain.get_block_hash(guess_number)?;
(guess_number, guess_hash).into()
};
let mut last_common = if let Some(header) = self
.sync_shared
.state()
.peers()
.get_last_common_header(self.peer)
{
header
} else {
let tip_header = self.active_chain.tip_header();
let guess_number = min(tip_header.number(), best_known.number());
let guess_hash = self.active_chain.get_block_hash(guess_number)?;
(guess_number, guess_hash).into()
};

// If the peer reorganized, our previous last_common_header may not be an ancestor
// of its current tip anymore. Go back enough to fix that.
last_common = self
.active_chain
.last_common_ancestor(&last_common, best_known)?;

self.synchronizer
self.sync_shared
.state()
.peers()
.set_last_common_header(self.peer, last_common.clone());

Expand All @@ -80,13 +89,13 @@ impl<'a> BlockFetcher<'a> {
// Update `best_known_header` based on `unknown_header_list`. It must be involved before
// our acquiring the newest `best_known_header`.
if let IBDState::In = self.ibd {
let state = self.synchronizer.shared.state();
let state = self.sync_shared.state();
// unknown list is an ordered list, sorted from highest to lowest,
// when header hash unknown, break loop is ok
while let Some(hash) = state.peers().take_unknown_last(self.peer) {
// Here we need to first try search from headermap, if not, fallback to search from the db.
// if not search from db, it can stuck here when the headermap may have been removed just as the block was downloaded
if let Some(header) = self.synchronizer.shared.get_header_index_view(&hash, false) {
if let Some(header) = self.sync_shared.get_header_index_view(&hash, false) {
state
.peers()
.may_set_best_known_header(self.peer, header.as_header_index());
Expand Down Expand Up @@ -114,7 +123,8 @@ impl<'a> BlockFetcher<'a> {
// specially advance this peer's last_common_header at the case of both us on the same
// active chain.
if self.active_chain.is_main_chain(&best_known.hash()) {
self.synchronizer
self.sync_shared
.state()
.peers()
.set_last_common_header(self.peer, best_known.number_and_hash());
}
Expand All @@ -128,7 +138,7 @@ impl<'a> BlockFetcher<'a> {
return None;
}

let state = self.synchronizer.shared().state();
let state = self.sync_shared.state();
let mut inflight = state.write_inflight_blocks();
let mut start = last_common.number() + 1;
let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW);
Expand Down Expand Up @@ -156,7 +166,8 @@ impl<'a> BlockFetcher<'a> {
if status.contains(BlockStatus::BLOCK_STORED) {
// If the block is stored, its ancestor must on store
// So we can skip the search of this space directly
self.synchronizer
self.sync_shared
.state()
.peers()
.set_last_common_header(self.peer, header.number_and_hash());
end = min(best_known.number(), header.number() + BLOCK_DOWNLOAD_WINDOW);
Expand All @@ -172,8 +183,7 @@ impl<'a> BlockFetcher<'a> {

status = self.active_chain.get_block_status(&parent_hash);
header = self
.synchronizer
.shared
.sync_shared
.get_header_index_view(&parent_hash, false)?;
}

Expand Down
21 changes: 11 additions & 10 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ enum FetchCMD {
}

struct BlockFetchCMD {
sync: Synchronizer,
sync_shared: Arc<SyncShared>,
p2p_control: ServiceControl,
recv: channel::Receiver<FetchCMD>,
can_start: CanStart,
Expand All @@ -93,15 +93,17 @@ impl BlockFetchCMD {
FetchCMD::Fetch((peers, state)) => match self.can_start() {
CanStart::Ready => {
for peer in peers {
if let Some(fetch) = BlockFetcher::new(&self.sync, peer, state).fetch() {
if let Some(fetch) =
BlockFetcher::new(Arc::clone(&self.sync_shared), peer, state).fetch()
{
for item in fetch {
BlockFetchCMD::send_getblocks(item, &self.p2p_control, peer);
}
}
}
}
CanStart::MinWorkNotReach => {
let best_known = self.sync.shared.state().shared_best_header_ref();
let best_known = self.sync_shared.state().shared_best_header_ref();
let number = best_known.number();
if number != self.number && (number - self.number) % 10000 == 0 {
self.number = number;
Expand All @@ -111,12 +113,12 @@ impl BlockFetchCMD {
then start to download block",
number,
best_known.total_difficulty(),
self.sync.shared.state().min_chain_work()
self.sync_shared.state().min_chain_work()
);
}
}
CanStart::AssumeValidNotFound => {
let state = self.sync.shared.state();
let state = self.sync_shared.state();
let best_known = state.shared_best_header_ref();
let number = best_known.number();
let assume_valid_target: Byte32 = state
Expand Down Expand Up @@ -161,7 +163,7 @@ impl BlockFetchCMD {
return self.can_start;
}

let state = self.sync.shared.state();
let state = self.sync_shared.state();

let min_work_reach = |flag: &mut CanStart| {
if state.min_chain_work_ready() {
Expand Down Expand Up @@ -230,7 +232,6 @@ impl BlockFetchCMD {
}

/// Sync protocol handle
#[derive(Clone)]
pub struct Synchronizer {
pub(crate) chain: ChainController,
/// Sync shared state
Expand Down Expand Up @@ -364,7 +365,7 @@ impl Synchronizer {
peer: PeerIndex,
ibd: IBDState,
) -> Option<Vec<Vec<packed::Byte32>>> {
BlockFetcher::new(self, peer, ibd).fetch()
BlockFetcher::new(Arc::to_owned(self.shared()), peer, ibd).fetch()
}

pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
Expand Down Expand Up @@ -632,20 +633,20 @@ impl Synchronizer {
}
None => {
let p2p_control = raw.clone();
let sync = self.clone();
let (sender, recv) = channel::bounded(2);
let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
sender.send(FetchCMD::Fetch((peers, ibd))).unwrap();
self.fetch_channel = Some(sender);
let thread = ::std::thread::Builder::new();
let number = self.shared.state().shared_best_header_ref().number();
const THREAD_NAME: &str = "BlockDownload";
let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
let blockdownload_jh = thread
.name(THREAD_NAME.into())
.spawn(move || {
let stop_signal = new_crossbeam_exit_rx();
BlockFetchCMD {
sync,
sync_shared,
p2p_control,
recv,
number,
Expand Down
2 changes: 1 addition & 1 deletion sync/src/tests/synchronizer/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ fn test_fix_last_common_header() {
}

let expected = fix_last_common.map(|mark| mark.to_string());
let actual = BlockFetcher::new(&synchronizer, peer, IBDState::In)
let actual = BlockFetcher::new(Arc::clone(&synchronizer.shared), peer, IBDState::In)
.update_last_common_header(&best_known_header.number_and_hash())
.map(|header| {
if graph
Expand Down
Loading