Skip to content

Commit

Permalink
fatp: avoid some of empty blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Jun 18, 2024
1 parent b64120a commit 62b08b0
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ where
enactment_state: Arc<Mutex<EnactmentState<Block>>>,
revalidation_queue: Arc<view_revalidation::RevalidationQueue<PoolApi, Block>>,

most_recent_view: RwLock<Option<Block::Hash>>,
import_notification_sink:
MultiViewImportNotificationSink<Block::Hash, graph::ExtrinsicHash<PoolApi>>,
// todo: this are coming from ValidatedPool, some of them maybe needed here
Expand Down Expand Up @@ -222,7 +221,6 @@ where
finalized_hash,
))),
revalidation_queue: Arc::from(view_revalidation::RevalidationQueue::new()),
most_recent_view: RwLock::from(None),
import_notification_sink,
options: graph::Options::default(),
},
Expand Down Expand Up @@ -268,7 +266,6 @@ where
finalized_hash,
))),
revalidation_queue: Arc::from(revalidation_queue),
most_recent_view: RwLock::from(None),
import_notification_sink,
options,
}
Expand Down Expand Up @@ -636,7 +633,8 @@ where
// todo: probably API change to:
// status(Hash) -> Option<PoolStatus>
fn status(&self) -> PoolStatus {
self.most_recent_view
self.view_store
.most_recent_view
.read()
.map(|hash| self.view_store.status()[&hash].clone())
.unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 })
Expand All @@ -662,24 +660,23 @@ where
// todo: api change we should have at here?
fn ready_transaction(&self, tx_hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
// unimplemented!()
let result = self
.most_recent_view
.read()
let most_recent_view = self.view_store.most_recent_view.read();
let result = most_recent_view
.map(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash))
.flatten();
log::trace!(
target: LOG_TARGET,
"[{tx_hash:?}] ready_transaction: {} {:?}",
result.is_some(),
self.most_recent_view.read()
most_recent_view
);
result
}

// todo: API change? ready at hash (not number)?
fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> PolledIterator<PoolApi> {
if let Some(view) = self.view_store.views.read().get(&at) {
log::info!( target: LOG_TARGET, "fatp::ready_at {:?}", at);
if let Some((view, retracted)) = self.view_store.get_view_at(at, true) {
log::info!( target: LOG_TARGET, "fatp::ready_at {:?} (retracted:{:?})", at, retracted);
let iterator: ReadyIteratorFor<PoolApi> = Box::new(view.pool.validated_pool().ready());
return async move { iterator }.boxed();
}
Expand Down Expand Up @@ -816,25 +813,8 @@ where
let duration = start.elapsed();
log::info!(target: LOG_TARGET, "update_view_fork: at {at:?} took {duration:?}");

//todo: refactor this: maybe single object with one mutex?
// shall be property of views_store + insert/remove/get wrappers?
let view = {
let mut most_recent_view_lock = self.most_recent_view.write();
let views_to_be_removed = {
let views = self.view_store.views.read();
std::iter::once(tree_route.common_block())
.chain(tree_route.enacted().iter())
.map(|block| block.hash)
.collect::<Vec<_>>()
};
self.view_store.views.write().retain(|v, _| !views_to_be_removed.contains(v));

let view = Arc::from(view);
self.view_store.views.write().insert(new_block_hash, view.clone());
most_recent_view_lock.replace(view.at.hash);
view
};

let view = Arc::from(view);
self.view_store.insert_new_view(view.clone(), tree_route).await;
Some(view)
}

Expand Down Expand Up @@ -1047,17 +1027,8 @@ where
let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));
log::info!(target: LOG_TARGET, "handle_finalized {finalized_number:?} tree_route: {tree_route:?}");

let finalized_xts = self.view_store.finalize_route(finalized_hash, tree_route).await;
let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await;
log::debug!(target: LOG_TARGET, "handle_finalized b:{:?}", self.views_len());
{
//clean up older then finalized
let mut views = self.view_store.views.write();
views.retain(|hash, v| match finalized_number {
Err(_) | Ok(None) => *hash == finalized_hash,
Ok(Some(n)) if v.at.number == n => *hash == finalized_hash,
Ok(Some(n)) => v.at.number > n,
})
}

self.mempool.purge_finalized_transactions(&finalized_xts).await;
self.import_notification_sink.clean_filter(&finalized_xts).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub type TxStatusStream<T> = Pin<Box<TransactionStatusStream<TxHash<T>, BlockHas

enum ListenerAction<PoolApi: ChainApi> {
ViewAdded(BlockHash<PoolApi>, TxStatusStream<PoolApi>),
ViewRemoved(BlockHash<PoolApi>),
InvalidateTransaction,
FinalizeTransaction(BlockHash<PoolApi>, TxIndex),
}
Expand All @@ -50,6 +51,7 @@ where
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ListenerAction::ViewAdded(h, _) => write!(f, "ListenerAction::ViewAdded({})", h),
ListenerAction::ViewRemoved(h) => write!(f, "ListenerAction::ViewRemoved({})", h),
ListenerAction::InvalidateTransaction => {
write!(f, "ListenerAction::InvalidateTransaction")
},
Expand Down Expand Up @@ -174,6 +176,11 @@ where
self.fused.get_mut().insert(block_hash, stream);
trace!(target: LOG_TARGET, "[{:?}] ViewAdded view: {:?} views:{:?}", self.tx_hash, block_hash, self.fused.get_ref().keys().collect::<Vec<_>>());
}

fn remove_view(&mut self, block_hash: BlockHash<PoolApi>) {
self.fused.get_mut().remove(&block_hash);
trace!(target: LOG_TARGET, "[{:?}] ViewRemoved view: {:?} views:{:?}", self.tx_hash, block_hash, self.fused.get_ref().keys().collect::<Vec<_>>());
}
}

impl<PoolApi> MultiViewListener<PoolApi>
Expand Down Expand Up @@ -226,6 +233,9 @@ where
Some(ListenerAction::ViewAdded(h,stream)) => {
ctx.add_stream(h, stream);
},
Some(ListenerAction::ViewRemoved(h)) => {
ctx.remove_view(h);
},
Some(ListenerAction::InvalidateTransaction) => {
if ctx.handle_invalidate_transaction() {
log::debug!(target: LOG_TARGET, "[{:?}] sending out: Invalid", ctx.tx_hash);
Expand Down Expand Up @@ -268,6 +278,19 @@ where
}
}

pub(crate) async fn remove_view(&self, block_hash: BlockHash<PoolApi>) {
let controllers = self.controllers.write().await;
for (tx_hash, sender) in controllers.iter() {
match sender.send(ListenerAction::ViewRemoved(block_hash)).await {
Err(mpsc::error::SendError(e)) => {
log::trace!(target: LOG_TARGET, "[{:?}] remove_view: SendError: {:?}", tx_hash, e);
// todo:
// controllers.remove(&tx_hash);
},
Ok(_) => {},
}
}
}
pub(crate) async fn invalidate_transactions(&self, invalid_hashes: Vec<TxHash<PoolApi>>) {
use futures::future::FutureExt;
let mut controllers = self.controllers.write().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use std::{collections::HashMap, sync::Arc};

use crate::graph::ExtrinsicHash;
use sc_transaction_pool_api::{PoolStatus, TransactionSource};
use sp_runtime::traits::Block as BlockT;

use super::multi_view_listener::{MultiViewListener, TxStatusStream};
use crate::{ReadyIteratorFor, LOG_TARGET};
use sp_blockchain::TreeRoute;
use sp_runtime::{generic::BlockId, traits::Block as BlockT};

use super::view::View;

Expand All @@ -40,7 +40,9 @@ where
{
pub(super) api: Arc<PoolApi>,
pub(super) views: RwLock<HashMap<Block::Hash, Arc<View<PoolApi>>>>,
pub(super) retracted_views: RwLock<HashMap<Block::Hash, Arc<View<PoolApi>>>>,
pub(super) listener: Arc<MultiViewListener<PoolApi>>,
pub(super) most_recent_view: RwLock<Option<Block::Hash>>,
}

impl<PoolApi, Block> ViewStore<PoolApi, Block>
Expand All @@ -50,7 +52,13 @@ where
<Block as BlockT>::Hash: Unpin,
{
pub(super) fn new(api: Arc<PoolApi>, listener: Arc<MultiViewListener<PoolApi>>) -> Self {
Self { api, views: Default::default(), listener }
Self {
api,
views: Default::default(),
retracted_views: Default::default(),
listener,
most_recent_view: RwLock::from(None),
}
}

/// Imports a bunch of unverified extrinsics to every view
Expand Down Expand Up @@ -258,4 +266,90 @@ where
.get(&at)
.and_then(|v| v.pool.validated_pool().ready_by_hash(tx_hash))
}

pub(super) async fn insert_new_view(
&self,
view: Arc<View<PoolApi>>,
tree_route: &TreeRoute<Block>,
) {
//todo: refactor this: maybe single object with one mutex?
// shall be property of views_store + insert/remove/get wrappers?
let views_to_be_removed = {
let mut most_recent_view_lock = self.most_recent_view.write();
let mut views_to_be_removed = {
std::iter::once(tree_route.common_block())
.chain(tree_route.enacted().iter())
.map(|block| block.hash)
.collect::<Vec<_>>()
};
let mut views = self.views.write();
let mut retracted_views = self.retracted_views.write();
views_to_be_removed.retain(|hash| {
let view = views.remove(hash);
if let Some(view) = view {
retracted_views.insert(*hash, view);
true
} else {
false
}
});
views.insert(view.at.hash, view.clone());
most_recent_view_lock.replace(view.at.hash);
views_to_be_removed
};
{
log::debug!(target:LOG_TARGET,"insert_new_view: retracted_views: {:?}", self.retracted_views.read().keys());
}
for hash in &views_to_be_removed {
self.listener.remove_view(*hash).await;
}
}

pub(super) fn get_view_at(
&self,
at: Block::Hash,
allow_retracted: bool,
) -> Option<(Arc<View<PoolApi>>, bool)> {
if let Some(view) = self.views.read().get(&at) {
return Some((view.clone(), false));
}
if allow_retracted {
if let Some(view) = self.retracted_views.read().get(&at) {
return Some((view.clone(), true))
}
};
None
}

pub(crate) async fn handle_finalized(
&self,
finalized_hash: Block::Hash,
tree_route: &[Block::Hash],
) -> Vec<ExtrinsicHash<PoolApi>> {
let finalized_xts = self.finalize_route(finalized_hash, tree_route).await;

let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash));

//clean up older then finalized
{
let mut views = self.views.write();
views.retain(|hash, v| match finalized_number {
Err(_) | Ok(None) => *hash == finalized_hash,
Ok(Some(n)) if v.at.number == n => *hash == finalized_hash,
Ok(Some(n)) => v.at.number > n,
});
}

{
let mut retracted_views = self.retracted_views.write();
retracted_views.retain(|_, v| match finalized_number {
Err(_) | Ok(None) => false,
Ok(Some(n)) => v.at.number > n,
});

log::debug!(target:LOG_TARGET,"finalize_route: retracted_views: {:?}", retracted_views.keys());
}

finalized_xts
}
}

0 comments on commit 62b08b0

Please sign in to comment.