Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
7ee1652
tokio rwlock + sync code bridge
michalkucharczyk May 14, 2025
2153a5d
wip: new async trackedmap
michalkucharczyk May 15, 2025
32caa5e
report_invalid is async now
michalkucharczyk May 16, 2025
2650303
clean-up + HRTB fun
michalkucharczyk May 16, 2025
b2f6764
debug_assert enabled
michalkucharczyk May 19, 2025
d6418fb
sync <> async bridghe + todos cleanup
michalkucharczyk May 19, 2025
d4275bf
rename
michalkucharczyk May 19, 2025
d0e28a3
tracked_map: removed unused methods
michalkucharczyk May 19, 2025
6d5c3e9
transaction-network: consts updated
michalkucharczyk Mar 4, 2025
a507ff8
FullChainApi: async_trait used, tokio locking primitives
michalkucharczyk May 21, 2025
8c71aaf
ChainAPi: async_trait + fixes
michalkucharczyk May 21, 2025
f3292bb
tx_mem_pool: limit adjusted to x2
michalkucharczyk May 21, 2025
7c57cd8
view_store: most_recent_view is a ref to view now
michalkucharczyk May 21, 2025
d563da9
view_store: apply_pending_tx_replacements bug fix
michalkucharczyk May 21, 2025
9ead46d
debug levels adjusted
michalkucharczyk May 21, 2025
74dafae
fatp: most_recent_view is a view ref
michalkucharczyk May 21, 2025
9cbffa2
VadlidationFuture removed
michalkucharczyk May 21, 2025
c1570e3
fatp: self review cleanup
michalkucharczyk May 21, 2025
4870135
tests: blocking tasks spawned
michalkucharczyk May 21, 2025
1cf3616
view_store: doc
michalkucharczyk May 21, 2025
6caa9d8
sliding_stat helper added
michalkucharczyk May 23, 2025
cf1f050
fatp: stats added
michalkucharczyk May 23, 2025
1b71082
fatp: attempt_transaction_replacement_inner_inner -> post_attempt_tra…
michalkucharczyk May 23, 2025
e8ac935
submit_and_watch + submit_at -> inner versions
michalkucharczyk May 23, 2025
addf0de
dropped stats
michalkucharczyk May 26, 2025
b025e5c
common: validate tx api stats
michalkucharczyk May 27, 2025
35d1097
more stats added
michalkucharczyk May 27, 2025
4d66dfc
fix
michalkucharczyk May 27, 2025
69dfbc3
instrument macro added
michalkucharczyk May 27, 2025
48fa685
debugs
michalkucharczyk May 27, 2025
0f29287
ChainApi: ValidateTransactionPriority added
michalkucharczyk May 29, 2025
ac8feb3
transaction-network: consts updated - 2
michalkucharczyk May 30, 2025
860ed1f
tx_mem_pool: limit multiplier removed
michalkucharczyk May 30, 2025
5ebd88f
doc fix
michalkucharczyk May 30, 2025
2c6e0c5
Revert "transaction-network: consts updated - 2"
michalkucharczyk May 30, 2025
045bc6d
Revert "transaction-network: consts updated"
michalkucharczyk May 30, 2025
1387f9f
Merge remote-tracking branch 'origin/master' into mku-txpool-limits-fix
michalkucharczyk Jun 2, 2025
113bf22
post merge fix
michalkucharczyk Jun 2, 2025
e963dba
Cargo.lock
michalkucharczyk Jun 2, 2025
abd9a58
fatp: ready_at_light: better debug
michalkucharczyk Jun 3, 2025
0bf0b96
txpoolstat target added
michalkucharczyk Jun 6, 2025
1e7cbb3
mempool: send to view is now throttled
michalkucharczyk Jun 12, 2025
5f1bacc
maintain queue is used for revalidation / pending replacements
michalkucharczyk Jun 12, 2025
a25d50b
revalidation: debugs
michalkucharczyk Jun 12, 2025
030a857
Apply suggestions from code review
michalkucharczyk Jun 12, 2025
b11cf8e
Merge remote-tracking branch 'origin/master' into mku-txpool-limits-fix
michalkucharczyk Jun 12, 2025
90c54c2
Revert "report_invalid is async now"
michalkucharczyk Jun 12, 2025
a7e70e0
Update from github-actions[bot] running command 'prdoc --bump minor -…
github-actions[bot] Jun 12, 2025
2ef575d
Update prdoc/pr_8596.prdoc
michalkucharczyk Jun 12, 2025
0714273
Update prdoc/pr_8596.prdoc
michalkucharczyk Jun 12, 2025
71c5ce7
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Jun 12, 2025
48eed5f
fix
michalkucharczyk Jun 12, 2025
86be44a
clippy happy now
michalkucharczyk Jun 12, 2025
2b36584
Revert "TransactionPool::ChainApi is now async"
michalkucharczyk Jun 17, 2025
ad943a8
format
michalkucharczyk Jun 17, 2025
5eb0413
ValidateTransactionPriority: doc
michalkucharczyk Jun 18, 2025
8201098
tx_mem_pool_map: naming fixed
michalkucharczyk Jun 18, 2025
870147a
fixing ChainApi revert
michalkucharczyk Jun 18, 2025
be46fff
tx_mem_pool_map: derive(Ord,PartialOrd)
michalkucharczyk Jun 18, 2025
67ee638
spelling
michalkucharczyk Jun 18, 2025
0dcb0ac
fmt
michalkucharczyk Jun 24, 2025
dab9113
comment added: 8912
michalkucharczyk Jun 24, 2025
ad3c17e
todo comment removed
michalkucharczyk Jun 24, 2025
9da6712
Merge branch 'master' into mku-txpool-limits-fix
michalkucharczyk Jun 24, 2025
13c7cc6
txmempool: sync bridge note added
michalkucharczyk Jun 24, 2025
a069036
rust doc
michalkucharczyk Jun 24, 2025
2aa0670
Merge remote-tracking branch 'origin/master' into mku-txpool-limits-fix
michalkucharczyk Jun 24, 2025
1bd13ed
random md fixed
michalkucharczyk Jun 24, 2025
4fc51f5
Merge branch 'master' into mku-txpool-limits-fix
michalkucharczyk Jun 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions prdoc/pr_8596.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
title: '`fatxpool`: limits handling optimizations and fixes'
doc:
- audience: Node Dev
description: |-
This PR adds some optimization and fixes in handling limits in fork-aware transaction pool.
crates:
- name: sc-transaction-pool
bump: minor
1 change: 1 addition & 0 deletions substrate/client/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tracing = { workspace = true, default-features = true }
anyhow = { workspace = true }
assert_matches = { workspace = true }
criterion = { workspace = true, default-features = true }
rstest = { workspace = true }
sc-block-builder = { workspace = true, default-features = true }
sp-consensus = { workspace = true, default-features = true }
substrate-test-runtime = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion substrate/client/transaction-pool/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ impl ChainApi for TestApi {
fn validate_transaction(
&self,
at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
_: TransactionSource,
uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
_: ValidateTransactionPriority,
) -> Self::ValidationFuture {
let uxt = (*uxt).clone();
let transfer = TransferData::try_from(&uxt)
Expand Down
150 changes: 125 additions & 25 deletions substrate/client/transaction-pool/src/common/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

//! Chain api required for the transaction pool.

use crate::LOG_TARGET;
use codec::Encode;
use futures::{
channel::{mpsc, oneshot},
future::{ready, Future, FutureExt, Ready},
lock::Mutex,
SinkExt, StreamExt,
use crate::{
common::{sliding_stat::DurationSlidingStats, STAT_SLIDING_WINDOW},
graph::ValidateTransactionPriority,
insert_and_log_throttled, LOG_TARGET, LOG_TARGET_STAT,
};
use std::{marker::PhantomData, pin::Pin, sync::Arc};

use codec::Encode;
use futures::future::{ready, Future, FutureExt, Ready};
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_client_api::{blockchain::HeaderBackend, BlockBackend};
use sp_api::{ApiExt, ProvideRuntimeApi};
Expand All @@ -39,38 +36,85 @@ use sp_runtime::{
transaction_validity::{TransactionSource, TransactionValidity},
};
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use std::{
marker::PhantomData,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::{mpsc, oneshot, Mutex};

use super::{
error::{self, Error},
metrics::{ApiMetrics, ApiMetricsExt},
};
use crate::graph;
use tracing::{trace, warn};
use tracing::{trace, warn, Level};

/// The transaction pool logic for full client.
pub struct FullChainApi<Client, Block> {
client: Arc<Client>,
_marker: PhantomData<Block>,
metrics: Option<Arc<ApiMetrics>>,
validation_pool: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
validation_pool_normal: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
validation_pool_maintained: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
validate_transaction_normal_stats: DurationSlidingStats,
validate_transaction_maintained_stats: DurationSlidingStats,
}

/// Spawn a validation task that will be used by the transaction pool to validate transactions.
fn spawn_validation_pool_task(
name: &'static str,
receiver: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
receiver_normal: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
receiver_maintained: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
spawner: &impl SpawnEssentialNamed,
stats: DurationSlidingStats,
blocking_stats: DurationSlidingStats,
) {
spawner.spawn_essential_blocking(
name,
Some("transaction-pool"),
async move {
loop {
let task = receiver.lock().await.next().await;
match task {
None => return,
Some(task) => task.await,
}
let start = Instant::now();

let task = {
let receiver_maintained = receiver_maintained.clone();
let receiver_normal = receiver_normal.clone();
tokio::select! {
Some(task) = async {
receiver_maintained.lock().await.recv().await
} => { task }
Some(task) = async {
receiver_normal.lock().await.recv().await
} => { task }
else => {
return
}
}
};

let blocking_duration = {
let start = Instant::now();
task.await;
start.elapsed()
};

insert_and_log_throttled!(
Level::DEBUG,
target:LOG_TARGET_STAT,
prefix:format!("validate_transaction_inner_stats"),
stats,
start.elapsed().into()
);
insert_and_log_throttled!(
Level::DEBUG,
target:LOG_TARGET_STAT,
prefix:format!("validate_transaction_blocking_stats"),
blocking_stats,
blocking_duration.into()
);
trace!(target:LOG_TARGET, duration=?start.elapsed(), "spawn_validation_pool_task");
}
}
.boxed(),
Expand All @@ -84,6 +128,9 @@ impl<Client, Block> FullChainApi<Client, Block> {
prometheus: Option<&PrometheusRegistry>,
spawner: &impl SpawnEssentialNamed,
) -> Self {
let stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
let blocking_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));

let metrics = prometheus.map(ApiMetrics::register).and_then(|r| match r {
Err(error) => {
warn!(
Expand All @@ -96,13 +143,41 @@ impl<Client, Block> FullChainApi<Client, Block> {
Ok(api) => Some(Arc::new(api)),
});

let (sender, receiver) = mpsc::channel(0);
let (sender, receiver) = mpsc::channel(1);
let (sender_maintained, receiver_maintained) = mpsc::channel(1);

let receiver = Arc::new(Mutex::new(receiver));
spawn_validation_pool_task("transaction-pool-task-0", receiver.clone(), spawner);
spawn_validation_pool_task("transaction-pool-task-1", receiver, spawner);

FullChainApi { client, validation_pool: sender, _marker: Default::default(), metrics }
let receiver_maintained = Arc::new(Mutex::new(receiver_maintained));
spawn_validation_pool_task(
"transaction-pool-task-0",
receiver.clone(),
receiver_maintained.clone(),
spawner,
stats.clone(),
blocking_stats.clone(),
);
spawn_validation_pool_task(
"transaction-pool-task-1",
receiver,
receiver_maintained,
spawner,
stats.clone(),
blocking_stats.clone(),
);

FullChainApi {
client,
validation_pool_normal: sender,
validation_pool_maintained: sender_maintained,
_marker: Default::default(),
metrics,
validate_transaction_normal_stats: DurationSlidingStats::new(Duration::from_secs(
STAT_SLIDING_WINDOW,
)),
validate_transaction_maintained_stats: DurationSlidingStats::new(Duration::from_secs(
STAT_SLIDING_WINDOW,
)),
}
}
}

Expand Down Expand Up @@ -132,10 +207,25 @@ where
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<Self>,
validation_priority: ValidateTransactionPriority,
) -> Self::ValidationFuture {
let start = Instant::now();
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
let mut validation_pool = self.validation_pool.clone();
let (stats, validation_pool, prefix) =
if validation_priority == ValidateTransactionPriority::Maintained {
(
self.validate_transaction_maintained_stats.clone(),
self.validation_pool_maintained.clone(),
"validate_transaction_maintained_stats",
)
} else {
(
self.validate_transaction_normal_stats.clone(),
self.validation_pool_normal.clone(),
"validate_transaction_stats",
)
};
let metrics = self.metrics.clone();

async move {
Expand All @@ -155,10 +245,20 @@ where
.map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
}

match rx.await {
let validity = match rx.await {
Ok(r) => r,
Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
}
};

insert_and_log_throttled!(
Level::DEBUG,
target:LOG_TARGET_STAT,
prefix:prefix,
stats,
start.elapsed().into()
);

validity
}
.boxed()
}
Expand Down
4 changes: 4 additions & 0 deletions substrate/client/transaction-pool/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ pub(crate) mod api;
pub(crate) mod enactment_state;
pub(crate) mod error;
pub(crate) mod metrics;
pub(crate) mod sliding_stat;
#[cfg(test)]
pub(crate) mod tests;
pub(crate) mod tracing_log_xt;

use futures::StreamExt;
use std::sync::Arc;

/// Stat sliding window, in seconds for per-transaction activities.
pub(crate) const STAT_SLIDING_WINDOW: u64 = 3;

/// Inform the transaction pool about imported and finalized blocks.
pub async fn notification_future<Client, Pool, Block>(client: Arc<Client>, txpool: Arc<Pool>)
where
Expand Down
Loading
Loading