diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 90f1b2ab09ef..af9fd6d62fcc 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -186,7 +186,6 @@ where enactment_state: Arc>>, revalidation_queue: Arc>, - most_recent_view: RwLock>, import_notification_sink: MultiViewImportNotificationSink>, // todo: this are coming from ValidatedPool, some of them maybe needed here @@ -222,7 +221,6 @@ where finalized_hash, ))), revalidation_queue: Arc::from(view_revalidation::RevalidationQueue::new()), - most_recent_view: RwLock::from(None), import_notification_sink, options: graph::Options::default(), }, @@ -268,7 +266,6 @@ where finalized_hash, ))), revalidation_queue: Arc::from(revalidation_queue), - most_recent_view: RwLock::from(None), import_notification_sink, options, } @@ -636,7 +633,8 @@ where // todo: probably API change to: // status(Hash) -> Option fn status(&self) -> PoolStatus { - self.most_recent_view + self.view_store + .most_recent_view .read() .map(|hash| self.view_store.status()[&hash].clone()) .unwrap_or(PoolStatus { ready: 0, ready_bytes: 0, future: 0, future_bytes: 0 }) @@ -662,24 +660,23 @@ where // todo: api change we should have at here? fn ready_transaction(&self, tx_hash: &TxHash) -> Option> { // unimplemented!() - let result = self - .most_recent_view - .read() + let most_recent_view = self.view_store.most_recent_view.read(); + let result = most_recent_view .map(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash)) .flatten(); log::trace!( target: LOG_TARGET, "[{tx_hash:?}] ready_transaction: {} {:?}", result.is_some(), - self.most_recent_view.read() + most_recent_view ); result } // todo: API change? ready at hash (not number)? fn ready_at(&self, at: ::Hash) -> PolledIterator { - if let Some(view) = self.view_store.views.read().get(&at) { - log::info!( target: LOG_TARGET, "fatp::ready_at {:?}", at); + if let Some((view, retracted)) = self.view_store.get_view_at(at, true) { + log::info!( target: LOG_TARGET, "fatp::ready_at {:?} (retracted:{:?})", at, retracted); let iterator: ReadyIteratorFor = Box::new(view.pool.validated_pool().ready()); return async move { iterator }.boxed(); } @@ -816,25 +813,8 @@ where let duration = start.elapsed(); log::info!(target: LOG_TARGET, "update_view_fork: at {at:?} took {duration:?}"); - //todo: refactor this: maybe single object with one mutex? - // shall be property of views_store + insert/remove/get wrappers? - let view = { - let mut most_recent_view_lock = self.most_recent_view.write(); - let views_to_be_removed = { - let views = self.view_store.views.read(); - std::iter::once(tree_route.common_block()) - .chain(tree_route.enacted().iter()) - .map(|block| block.hash) - .collect::>() - }; - self.view_store.views.write().retain(|v, _| !views_to_be_removed.contains(v)); - - let view = Arc::from(view); - self.view_store.views.write().insert(new_block_hash, view.clone()); - most_recent_view_lock.replace(view.at.hash); - view - }; - + let view = Arc::from(view); + self.view_store.insert_new_view(view.clone(), tree_route).await; Some(view) } @@ -1047,17 +1027,8 @@ where let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash)); log::info!(target: LOG_TARGET, "handle_finalized {finalized_number:?} tree_route: {tree_route:?}"); - let finalized_xts = self.view_store.finalize_route(finalized_hash, tree_route).await; + let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await; log::debug!(target: LOG_TARGET, "handle_finalized b:{:?}", self.views_len()); - { - //clean up older then finalized - let mut views = self.view_store.views.write(); - views.retain(|hash, v| match finalized_number { - Err(_) | Ok(None) => *hash == finalized_hash, - Ok(Some(n)) if v.at.number == n => *hash == finalized_hash, - Ok(Some(n)) => v.at.number > n, - }) - } self.mempool.purge_finalized_transactions(&finalized_xts).await; self.import_notification_sink.clean_filter(&finalized_xts).await; diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index c48e914e2761..534057366890 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -39,6 +39,7 @@ pub type TxStatusStream = Pin, BlockHas enum ListenerAction { ViewAdded(BlockHash, TxStatusStream), + ViewRemoved(BlockHash), InvalidateTransaction, FinalizeTransaction(BlockHash, TxIndex), } @@ -50,6 +51,7 @@ where fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { ListenerAction::ViewAdded(h, _) => write!(f, "ListenerAction::ViewAdded({})", h), + ListenerAction::ViewRemoved(h) => write!(f, "ListenerAction::ViewRemoved({})", h), ListenerAction::InvalidateTransaction => { write!(f, "ListenerAction::InvalidateTransaction") }, @@ -174,6 +176,11 @@ where self.fused.get_mut().insert(block_hash, stream); trace!(target: LOG_TARGET, "[{:?}] ViewAdded view: {:?} views:{:?}", self.tx_hash, block_hash, self.fused.get_ref().keys().collect::>()); } + + fn remove_view(&mut self, block_hash: BlockHash) { + self.fused.get_mut().remove(&block_hash); + trace!(target: LOG_TARGET, "[{:?}] ViewRemoved view: {:?} views:{:?}", self.tx_hash, block_hash, self.fused.get_ref().keys().collect::>()); + } } impl MultiViewListener @@ -226,6 +233,9 @@ where Some(ListenerAction::ViewAdded(h,stream)) => { ctx.add_stream(h, stream); }, + Some(ListenerAction::ViewRemoved(h)) => { + ctx.remove_view(h); + }, Some(ListenerAction::InvalidateTransaction) => { if ctx.handle_invalidate_transaction() { log::debug!(target: LOG_TARGET, "[{:?}] sending out: Invalid", ctx.tx_hash); @@ -268,6 +278,19 @@ where } } + pub(crate) async fn remove_view(&self, block_hash: BlockHash) { + let controllers = self.controllers.write().await; + for (tx_hash, sender) in controllers.iter() { + match sender.send(ListenerAction::ViewRemoved(block_hash)).await { + Err(mpsc::error::SendError(e)) => { + log::trace!(target: LOG_TARGET, "[{:?}] remove_view: SendError: {:?}", tx_hash, e); + // todo: + // controllers.remove(&tx_hash); + }, + Ok(_) => {}, + } + } + } pub(crate) async fn invalidate_transactions(&self, invalid_hashes: Vec>) { use futures::future::FutureExt; let mut controllers = self.controllers.write().await; diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index d16911f25a09..32ae295a812c 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -25,11 +25,11 @@ use std::{collections::HashMap, sync::Arc}; use crate::graph::ExtrinsicHash; use sc_transaction_pool_api::{PoolStatus, TransactionSource}; -use sp_runtime::traits::Block as BlockT; use super::multi_view_listener::{MultiViewListener, TxStatusStream}; use crate::{ReadyIteratorFor, LOG_TARGET}; use sp_blockchain::TreeRoute; +use sp_runtime::{generic::BlockId, traits::Block as BlockT}; use super::view::View; @@ -40,7 +40,9 @@ where { pub(super) api: Arc, pub(super) views: RwLock>>>, + pub(super) retracted_views: RwLock>>>, pub(super) listener: Arc>, + pub(super) most_recent_view: RwLock>, } impl ViewStore @@ -50,7 +52,13 @@ where ::Hash: Unpin, { pub(super) fn new(api: Arc, listener: Arc>) -> Self { - Self { api, views: Default::default(), listener } + Self { + api, + views: Default::default(), + retracted_views: Default::default(), + listener, + most_recent_view: RwLock::from(None), + } } /// Imports a bunch of unverified extrinsics to every view @@ -258,4 +266,90 @@ where .get(&at) .and_then(|v| v.pool.validated_pool().ready_by_hash(tx_hash)) } + + pub(super) async fn insert_new_view( + &self, + view: Arc>, + tree_route: &TreeRoute, + ) { + //todo: refactor this: maybe single object with one mutex? + // shall be property of views_store + insert/remove/get wrappers? + let views_to_be_removed = { + let mut most_recent_view_lock = self.most_recent_view.write(); + let mut views_to_be_removed = { + std::iter::once(tree_route.common_block()) + .chain(tree_route.enacted().iter()) + .map(|block| block.hash) + .collect::>() + }; + let mut views = self.views.write(); + let mut retracted_views = self.retracted_views.write(); + views_to_be_removed.retain(|hash| { + let view = views.remove(hash); + if let Some(view) = view { + retracted_views.insert(*hash, view); + true + } else { + false + } + }); + views.insert(view.at.hash, view.clone()); + most_recent_view_lock.replace(view.at.hash); + views_to_be_removed + }; + { + log::debug!(target:LOG_TARGET,"insert_new_view: retracted_views: {:?}", self.retracted_views.read().keys()); + } + for hash in &views_to_be_removed { + self.listener.remove_view(*hash).await; + } + } + + pub(super) fn get_view_at( + &self, + at: Block::Hash, + allow_retracted: bool, + ) -> Option<(Arc>, bool)> { + if let Some(view) = self.views.read().get(&at) { + return Some((view.clone(), false)); + } + if allow_retracted { + if let Some(view) = self.retracted_views.read().get(&at) { + return Some((view.clone(), true)) + } + }; + None + } + + pub(crate) async fn handle_finalized( + &self, + finalized_hash: Block::Hash, + tree_route: &[Block::Hash], + ) -> Vec> { + let finalized_xts = self.finalize_route(finalized_hash, tree_route).await; + + let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash)); + + //clean up older then finalized + { + let mut views = self.views.write(); + views.retain(|hash, v| match finalized_number { + Err(_) | Ok(None) => *hash == finalized_hash, + Ok(Some(n)) if v.at.number == n => *hash == finalized_hash, + Ok(Some(n)) => v.at.number > n, + }); + } + + { + let mut retracted_views = self.retracted_views.write(); + retracted_views.retain(|_, v| match finalized_number { + Err(_) | Ok(None) => false, + Ok(Some(n)) => v.at.number > n, + }); + + log::debug!(target:LOG_TARGET,"finalize_route: retracted_views: {:?}", retracted_views.keys()); + } + + finalized_xts + } }