Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(

update_peer_info(
&event,
network.clone(),
&network,
state_manager.chain_store().clone(),
&genesis,
);
Expand All @@ -169,8 +169,8 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
NetworkEvent::HelloResponseOutbound { request, source } => {
let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
get_full_tipset(
network.clone(),
state_manager.chain_store().clone(),
&network,
state_manager.chain_store(),
Some(source),
&tipset_keys,
)
Expand All @@ -180,13 +180,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
NetworkEvent::PubsubMessage { message } => match message {
PubsubMessage::Block(b) => {
let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
get_full_tipset(
network.clone(),
state_manager.chain_store().clone(),
None,
&key,
)
.await
get_full_tipset(&network, state_manager.chain_store(), None, &key).await
}
PubsubMessage::Message(m) => {
if let Err(why) = mem_pool.add(m) {
Expand Down Expand Up @@ -252,22 +246,22 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
// insert task into tasks. If task is already in tasks, skip. If it is not, spawn it.
let new = tasks_set.insert(task.clone());
if new {
let tasks_clone = tasks.clone();
let action = task.clone().execute(
network.clone(),
state_manager.clone(),
stateless_mode,
bad_block_cache.clone(),
);
tokio::spawn({
let tasks = tasks.clone();
let state_machine = state_machine.clone();
let state_changed = state_changed.clone();
async move {
if let Some(event) = action.await {
state_machine.lock().update(event);
state_changed.notify_one();
}
tasks_clone.lock().remove(&task);
tasks.lock().remove(&task);
}
});
}
Expand Down Expand Up @@ -361,7 +355,7 @@ fn inc_gossipsub_event_metrics(event: &NetworkEvent) {
// Keep our peer manager up to date.
fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
event: &NetworkEvent,
network: SyncNetworkContext<DB>,
network: &SyncNetworkContext<DB>,
chain_store: Arc<ChainStore<DB>>,
genesis: &Tipset,
) {
Expand All @@ -370,7 +364,7 @@ fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
let genesis_cid = *genesis.block_headers().first().cid();
// Spawn and immediately move on to the next event
tokio::task::spawn(handle_peer_connected_event(
network,
network.clone(),
chain_store,
*peer_id,
genesis_cid,
Expand Down Expand Up @@ -422,21 +416,21 @@ async fn handle_peer_connected_event<DB: Blockstore + Sync + Send + 'static>(
}

fn handle_peer_disconnected_event<DB: Blockstore + Sync + Send + 'static>(
network: SyncNetworkContext<DB>,
network: &SyncNetworkContext<DB>,
peer_id: PeerId,
) {
network.peer_manager().remove_peer(&peer_id);
network.peer_manager().unmark_peer_bad(&peer_id);
}

pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
network: SyncNetworkContext<DB>,
chain_store: Arc<ChainStore<DB>>,
network: &SyncNetworkContext<DB>,
chain_store: &ChainStore<DB>,
peer_id: Option<PeerId>,
tipset_keys: &TipsetKey,
) -> anyhow::Result<FullTipset> {
// Attempt to load from the store
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
return Ok(full_tipset);
}
// Load from the network
Expand All @@ -450,13 +444,13 @@ pub async fn get_full_tipset<DB: Blockstore + Sync + Send + 'static>(
}

async fn get_full_tipset_batch<DB: Blockstore + Sync + Send + 'static>(
network: SyncNetworkContext<DB>,
chain_store: Arc<ChainStore<DB>>,
network: &SyncNetworkContext<DB>,
chain_store: &ChainStore<DB>,
peer_id: Option<PeerId>,
tipset_keys: &TipsetKey,
) -> anyhow::Result<Vec<FullTipset>> {
// Attempt to load from the store
if let Ok(full_tipset) = load_full_tipset(&chain_store, tipset_keys) {
if let Ok(full_tipset) = load_full_tipset(chain_store, tipset_keys) {
return Ok(vec![full_tipset]);
}
// Load from the network
Expand Down Expand Up @@ -868,7 +862,7 @@ impl SyncTask {
}
}
SyncTask::FetchTipset(key, epoch) => {
match get_full_tipset_batch(network.clone(), cs.clone(), None, &key).await {
match get_full_tipset_batch(&network, cs, None, &key).await {
Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)),
Err(e) => {
tracing::warn!(%key, %epoch, "failed to fetch tipset: {e}");
Expand Down
Loading