diff --git a/Cargo.lock b/Cargo.lock index 5f069b7..62740d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3311,7 +3311,7 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "substrate-txtesttool" -version = "0.5.0" +version = "0.6.0" dependencies = [ "async-trait", "average", diff --git a/Cargo.toml b/Cargo.toml index a4080d8..05c2a93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "substrate-txtesttool" -version = "0.5.0" +version = "0.6.0" edition = "2021" description = "A library and CLI tool for sending transactions to substrate-based chains, enabling developers to test and monitor transaction scenarios." license = "Apache-2.0 OR GPL-3.0" diff --git a/bin/main.rs b/bin/main.rs index 8657f2d..e6b6b29 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -59,6 +59,7 @@ async fn main() -> Result<(), Box> { send_threshold, remark, tip, + use_legacy_backend, } => match chain { ChainType::Fake => { unimplemented!() @@ -71,6 +72,7 @@ async fn main() -> Result<(), Box> { .with_block_monitoring(*block_monitor) .with_watched_txs(!unwatched) .with_installed_ctrlc_stop_hook(true) + .with_legacy_backend(*use_legacy_backend) .with_tip(*tip); scenario_builder = populate_scenario_builder!(scenario_builder, scenario); @@ -90,6 +92,7 @@ async fn main() -> Result<(), Box> { .with_block_monitoring(*block_monitor) .with_watched_txs(!unwatched) .with_installed_ctrlc_stop_hook(true) + .with_legacy_backend(*use_legacy_backend) .with_tip(*tip); scenario_builder = populate_scenario_builder!(scenario_builder, scenario); @@ -118,6 +121,7 @@ async fn main() -> Result<(), Box> { desc, generate_ecdsa_keypair, None, + false, ) .await; let account = @@ -136,6 +140,7 @@ async fn main() -> Result<(), Box> { desc, generate_sr25519_keypair, None, + false, ) .await; let account = @@ -153,10 +158,11 @@ async fn main() -> Result<(), Box> { runtime_apis.call_raw("Metadata_metadata", None).await?; println!("{meta:#?}"); }, - CliCommand::BlockMonitor { chain, ws } => { + CliCommand::BlockMonitor { chain, ws, display } => { match chain { ChainType::Sub => { - let block_monitor = BlockMonitor::::new(ws).await; + let block_monitor = + BlockMonitor::::new_with_options(ws, *display).await; async { loop { tokio::time::sleep(Duration::from_secs(10)).await @@ -165,7 +171,8 @@ async fn main() -> Result<(), Box> { .await; }, ChainType::Eth => { - let block_monitor = BlockMonitor::::new(ws).await; + let block_monitor = + BlockMonitor::::new_with_options(ws, *display).await; async { loop { tokio::time::sleep(Duration::from_secs(10)).await diff --git a/src/block_monitor.rs b/src/block_monitor.rs index c378ede..8968cec 100644 --- a/src/block_monitor.rs +++ b/src/block_monitor.rs @@ -6,6 +6,7 @@ use std::{collections::HashMap, marker::PhantomData, pin::Pin}; use crate::transaction::TransactionMonitor; use async_trait::async_trait; +use clap::ValueEnum; use futures::Future; use subxt::{blocks::Block, OnlineClient}; use subxt_core::config::Header; @@ -23,6 +24,22 @@ type TxFoundListener = oneshot::Receiver; type TxFoundListenerTrigger = oneshot::Sender; type HashOf = ::Hash; +#[derive(ValueEnum, Copy, Clone, Debug)] +pub enum BlockMonitorDisplayOptions { + Best, + Finalized, + All, +} + +impl BlockMonitorDisplayOptions { + fn display_best(&self) -> bool { + matches!(self, Self::Best | Self::All) + } + fn display_finalized(&self) -> bool { + matches!(self, Self::Finalized | Self::All) + } +} + #[derive(Clone)] pub struct BlockMonitor { listener_request_tx: mpsc::Sender<(HashOf, TxFoundListenerTrigger>)>, @@ -45,7 +62,17 @@ impl BlockMonitor { .await .expect("should connect to rpc client"); let (listener_request_tx, rx) = mpsc::channel(100); - tokio::spawn(async { Self::run(api, rx).await }); + tokio::spawn(async { Self::run(api, rx, BlockMonitorDisplayOptions::All).await }); + Self { listener_request_tx, _p: Default::default() } + } + + pub async fn new_with_options(uri: &str, options: BlockMonitorDisplayOptions) -> Self { + trace!(uri, "BlockNumber::new"); + let api = OnlineClient::::from_insecure_url(uri) + .await + .expect("should connect to rpc client"); + let (listener_request_tx, rx) = mpsc::channel(100); + tokio::spawn(async move { Self::run(api, rx, options).await }); Self { listener_request_tx, _p: Default::default() } } @@ -62,6 +89,7 @@ impl BlockMonitor { async fn handle_block( callbacks: &mut HashMap, TxFoundListenerTrigger>>, block: Block>, + options: BlockMonitorDisplayOptions, finalized: bool, ) -> Result<(), Box> { let block_number: u64 = block.header().number().into(); @@ -77,8 +105,10 @@ impl BlockMonitor { trigger.send(block_hash).unwrap(); } } - info!(block_number, extrinsics_count, "FINALIZED block"); - } else { + if options.display_finalized() { + info!(block_number, extrinsics_count, "FINALIZED block"); + } + } else if options.display_best() { info!(block_number, extrinsics_count, " BEST block"); } Ok(()) @@ -87,6 +117,7 @@ impl BlockMonitor { async fn block_monitor_inner( api: OnlineClient, mut listener_request_rx: mpsc::Receiver<(HashOf, TxFoundListenerTrigger>)>, + options: BlockMonitorDisplayOptions, ) -> Result<(), Box> { let mut finalized_blocks_sub = api.blocks().subscribe_finalized().await?; let mut best_blocks_sub = api.blocks().subscribe_best().await?; @@ -95,11 +126,11 @@ impl BlockMonitor { loop { select! { Some(Ok(block)) = finalized_blocks_sub.next() => { - Self::handle_block(&mut callbacks, block, true).await?; + Self::handle_block(&mut callbacks, block, options, true).await?; } Some(Ok(block)) = best_blocks_sub.next() => { - Self::handle_block(&mut callbacks, block, false).await?; + Self::handle_block(&mut callbacks, block, options, false).await?; } Some((hash, tx)) = listener_request_rx.recv() => { @@ -113,7 +144,8 @@ impl BlockMonitor { async fn run( api: OnlineClient, listener_requrest_rx: mpsc::Receiver<(HashOf, TxFoundListenerTrigger>)>, + options: BlockMonitorDisplayOptions, ) { - let _ = Self::block_monitor_inner(api, listener_requrest_rx).await; + let _ = Self::block_monitor_inner(api, listener_requrest_rx, options).await; } } diff --git a/src/cli.rs b/src/cli.rs index ca67961..0e64185 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -2,10 +2,12 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. +use crate::{ + block_monitor::BlockMonitorDisplayOptions, + scenario::{ChainType, ScenarioType}, +}; use clap::{Parser, Subcommand}; -use crate::scenario::{ChainType, ScenarioType}; - #[derive(Parser)] #[clap(name = "txtt")] pub struct Cli { @@ -48,6 +50,9 @@ pub enum CliCommand { /// Accounts range used for building/seding transactions. #[clap(subcommand)] scenario: ScenarioType, + /// Use legacy backend + #[clap(long, default_value_t = false)] + use_legacy_backend: bool, }, /// Check nonce for given account. CheckNonce { @@ -76,6 +81,8 @@ pub enum CliCommand { /// The RPC endpoint of the node to be used. #[clap(long, default_value = "ws://127.0.0.1:9933")] ws: String, + #[clap(long, default_value = "all")] + display: BlockMonitorDisplayOptions, }, /// Load and inspect existing log file. LoadLog { diff --git a/src/fake_transaction_sink.rs b/src/fake_transaction_sink.rs index a166e1f..372eed0 100644 --- a/src/fake_transaction_sink.rs +++ b/src/fake_transaction_sink.rs @@ -59,7 +59,7 @@ impl TransactionsSink for FakeTransactionsSink { } ///Current count of transactions being processed by sink - async fn count(&self) -> usize { + async fn pending_extrinsics(&self) -> usize { self.txs.read().len() } @@ -85,9 +85,9 @@ mod test { let t: Box> = Box::from(t); let events = rpc.submit_and_watch(&*t).await.unwrap(); - assert_eq!(rpc.count().await, 1); + assert_eq!(rpc.pending_extrinsics().await, 1); let v = events.collect::>().await; - assert_eq!(rpc.count().await, 0); + assert_eq!(rpc.pending_extrinsics().await, 0); assert_eq!( v, vec![ @@ -149,14 +149,14 @@ mod test { let f = || async { tokio::time::sleep(Duration::from_millis(200)).await; - rpc.count().await + rpc.pending_extrinsics().await }; let result = join3(rpc.submit(&*t1), rpc.submit(&*t2), f()).await; let r1 = result.0; let r2 = result.1; assert_eq!(result.2, 2); - assert_eq!(rpc.count().await, 0); + assert_eq!(rpc.pending_extrinsics().await, 0); assert_eq!(r1.unwrap(), 111u32.to_le_bytes().into()); assert_eq!(r2.unwrap(), 222u32.to_le_bytes().into()); } diff --git a/src/helpers.rs b/src/helpers.rs index 529f7a6..3971ea4 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -6,7 +6,7 @@ pub use jsonrpsee::{ client_transport::ws::{self, EitherStream, Url, WsTransportClientBuilder}, core::client::{Client, Error}, }; -use std::pin::Pin; +use std::{pin::Pin, time::Duration}; use tokio_util::compat::Compat; /// Helper type for a futures stream. @@ -22,7 +22,7 @@ pub(crate) async fn client(url: &str) -> Result { let (sender, receiver) = ws_transport(url).await?; Ok(Client::builder() .max_buffer_capacity_per_subscription(4096) - .max_concurrent_requests(128000) + .max_concurrent_requests(1280000) .build_with_tokio(sender, receiver)) } @@ -31,6 +31,7 @@ async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> { WsTransportClientBuilder::default() .max_request_size(400 * 1024 * 1024) .max_response_size(400 * 1024 * 1024) + .connection_timeout(Duration::from_secs(600)) .build(url) .await .map_err(|e| Error::Transport(e.into())) diff --git a/src/runner.rs b/src/runner.rs index a590776..02f3316 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -22,7 +22,7 @@ use tokio::{ select, sync::mpsc::{channel, Receiver, Sender}, }; -use tracing::{debug, info, instrument, trace, Span}; +use tracing::{debug, info, instrument, trace, warn, Span}; const LOG_TARGET: &str = "runner"; @@ -122,9 +122,9 @@ impl + Send> TxTask for DefaultTxTask continue; } } - //shall not happen to be here - panic!(); - // ExecutionResult::Error(self.tx().hash()) + //shall not happen to be here, return error. + warn!(target:LOG_TARGET,tx=?self,"stream error"); + ExecutionResult::Error(self.tx().hash()) }, Err(e) => { info!(nonce=?self.tx().nonce(),"submit_and_watch: error: {e:?}"); @@ -184,7 +184,7 @@ impl DefaultTxTask { /// Holds the logic that handles multiple transactions execution on a specific chain. pub struct Runner>> { - initial_tasks: usize, + send_threshold: usize, logs: Logs, transactions: Vec, done: Vec>, @@ -206,7 +206,7 @@ where { /// Instantiates a new transactions [`Runner`]. pub fn new( - initial_tasks: usize, + send_threshold: usize, rpc: Sink, transactions: Vec, log_file_name: Option, @@ -229,7 +229,7 @@ where ( tx, Self { - initial_tasks, + send_threshold, logs, transactions, rpc: rpc.into(), @@ -257,11 +257,17 @@ where &mut self, workers: &mut FuturesUnordered> + Send>>>, ) { - let current_count = self.rpc.count().await; - let to_consume = self - .initial_tasks - .saturating_sub(current_count) - .saturating_sub(self.event_counters.buffered()); + let (to_consume, current_count) = if self.send_threshold == usize::MAX { + (usize::MAX, None) + } else { + let current_count = self.rpc.pending_extrinsics().await; + ( + self.send_threshold + .saturating_sub(current_count) + .saturating_sub(self.event_counters.buffered()), + Some(current_count), + ) + }; let mut pushed = 0; let counters_displayed = format!("{}", self.event_counters); let mut nonces = vec![]; @@ -286,9 +292,17 @@ where } else { true }; + if display { self.last_displayed = Some(Instant::now()); - info!(current_count, pushed, min_nonce_sent, "consume pending {}", counters_displayed,); + if let Some(current_count) = current_count { + info!( + current_count, + pushed, min_nonce_sent, "consume pending {}", counters_displayed, + ); + } else { + info!(pushed, min_nonce_sent, "consume pending {}", counters_displayed,); + } }; } @@ -325,7 +339,7 @@ where let original_transactions_count = self.transactions.len(); let mut workers = FuturesUnordered::new(); - for _ in 0..self.initial_tasks { + for _ in 0..self.send_threshold { if let Some(t) = self.transactions.pop() { let t = Box::new(t); let log = self.logs[&t.tx().hash()].clone(); @@ -482,7 +496,7 @@ mod tests { // let api = OnlineClient::::from_insecure_url("ws://127.0.0.1:9933") // .await // .unwrap(); - let api = subxt_api_connector::connect("ws://127.0.0.1:9933").await.unwrap(); + let api = subxt_api_connector::connect("ws://127.0.0.1:9933", false).await.unwrap(); let rpc = EthTransactionsSink::new().await; diff --git a/src/scenario.rs b/src/scenario.rs index 5686fab..6f9bb9c 100644 --- a/src/scenario.rs +++ b/src/scenario.rs @@ -180,6 +180,7 @@ pub struct ScenarioBuilder { log_file_name_prefix: Option, base_dir_path: Option, timeout: Option, + use_legacy_backend: bool, } impl Default for ScenarioBuilder { @@ -213,6 +214,7 @@ impl ScenarioBuilder { log_file_name_prefix: None, base_dir_path: None, timeout: None, + use_legacy_backend: false, } } @@ -312,6 +314,9 @@ impl ScenarioBuilder { /// Specifies how many transactions in transaction pool on the node side will be maintained at /// the fork of the best chain. + /// + /// `usize::MAX` means that the count of `pending_extrinsics` on node side is not called, and an + /// executor will send as much as possible. pub fn with_send_threshold(mut self, send_threshold: usize) -> Self { self.send_threshold = Some(send_threshold); self @@ -351,6 +356,13 @@ impl ScenarioBuilder { self } + /// Use legacy backend. In some scenarios using this may help overcome some RPC related + /// problems. Shall be removed in some point in future. + pub fn with_legacy_backend(mut self, use_legacy_backend: bool) -> Self { + self.use_legacy_backend = use_legacy_backend; + self + } + /// Returns a set of tasks that handle transaction execution. async fn build_transactions(&self, builder: B, sink: S) -> Vec> where @@ -471,6 +483,7 @@ impl ScenarioBuilder { } else { None }, + self.use_legacy_backend, ); let sink = new_with_uri_with_accounts_description.await; let txs = self.build_transactions(builder, sink.clone()).await; @@ -499,6 +512,7 @@ impl ScenarioBuilder { } else { None }, + self.use_legacy_backend, ) .await; let txs = self.build_transactions(builder, sink.clone()).await; diff --git a/src/subxt_api_connector.rs b/src/subxt_api_connector.rs index 9e71592..60bcee8 100644 --- a/src/subxt_api_connector.rs +++ b/src/subxt_api_connector.rs @@ -15,17 +15,26 @@ const RETRY_DELAY: Duration = Duration::from_secs(1); /// Connect to a RPC node. pub(crate) async fn connect( url: &str, + use_legacy_backend: bool, ) -> Result, Box> { for i in 0..MAX_ATTEMPTS { info!("Attempt #{}: Connecting to {}", i, url); - let backend = subxt::backend::chain_head::ChainHeadBackend::builder() - .transaction_timeout(6 * 3600) - //note: This required new subxt release - //.submit_transactions_ignoring_follow_events() - .build_with_background_driver(subxt::backend::rpc::RpcClient::new( - helpers::client(url).await?, - )); - let maybe_client = OnlineClient::from_backend(Arc::new(backend)).await; + let maybe_client = if use_legacy_backend { + let backend = subxt::backend::legacy::LegacyBackend::builder() + .build(subxt::backend::rpc::RpcClient::new(helpers::client(url).await?)); + OnlineClient::from_backend(Arc::new(backend)).await + } else { + let backend = subxt::backend::chain_head::ChainHeadBackend::builder() + .transaction_timeout(6 * 3600) + //note: This required new subxt release + // subxt 0.42.1: + // .submit_transactions_ignoring_follow_events() + .build_with_background_driver(subxt::backend::rpc::RpcClient::new( + helpers::client(url).await?, + )); + + OnlineClient::from_backend(Arc::new(backend)).await + }; // let maybe_client = OnlineClient::::from_rpc_client(client); match maybe_client { diff --git a/src/subxt_transaction.rs b/src/subxt_transaction.rs index 8a1aa46..d3810ef 100644 --- a/src/subxt_transaction.rs +++ b/src/subxt_transaction.rs @@ -123,7 +123,7 @@ where { pub async fn new() -> Self { Self { - api: crate::subxt_api_connector::connect("ws://127.0.0.1:9933") + api: crate::subxt_api_connector::connect("ws://127.0.0.1:9933", false) .await .expect(EXPECT_CONNECT), from_accounts: Default::default(), @@ -137,7 +137,7 @@ where pub async fn new_with_uri(uri: &String) -> Self { Self { - api: crate::subxt_api_connector::connect(uri).await.expect(EXPECT_CONNECT), + api: crate::subxt_api_connector::connect(uri, false).await.expect(EXPECT_CONNECT), from_accounts: Default::default(), to_accounts: Default::default(), nonces: Default::default(), @@ -152,6 +152,7 @@ where accounts_description: AccountsDescription, generate_pair: G, transaction_monitor: Option>, + use_legacy_backend: bool, ) -> Self where G: GenerateKeyPairFunction, @@ -160,7 +161,9 @@ where derive_accounts(accounts_description.clone(), SENDER_SEED, generate_pair); let to_accounts = derive_accounts(accounts_description, RECEIVER_SEED, generate_pair); Self { - api: crate::subxt_api_connector::connect(uri).await.expect(EXPECT_CONNECT), + api: crate::subxt_api_connector::connect(uri, use_legacy_backend) + .await + .expect(EXPECT_CONNECT), from_accounts: Arc::from(RwLock::from(from_accounts)), to_accounts: Arc::from(RwLock::from(to_accounts)), nonces: Default::default(), @@ -217,15 +220,16 @@ where async fn update_count(&self) { let i = Instant::now(); - let xts = self + let xts_len = self .rpc_client .request::>( "author_pendingExtrinsics", subxt_rpcs::rpc_params!(), ) .await - .expect("author_pendingExtrinsics should not fail"); - *self.current_pending_extrinsics.write() = Some((i, xts.len())); + .expect("author_pendingExtrinsics should not fail") + .len(); + *self.current_pending_extrinsics.write() = Some((i, xts_len)); } } @@ -285,7 +289,7 @@ where } /// Current count of transactions being processed by sink. - async fn count(&self) -> usize { + async fn pending_extrinsics(&self) -> usize { let current_pending_extrinsics = { *self.current_pending_extrinsics.read() }; if let Some((ts, _)) = current_pending_extrinsics { if ts.elapsed() > Duration::from_millis(1000) { diff --git a/src/transaction.rs b/src/transaction.rs index ced39cb..10647e4 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -266,7 +266,7 @@ pub trait TransactionsSink: Send + Sync { async fn submit(&self, tx: &dyn Transaction) -> Result; ///Current count of transactions being processed by sink - async fn count(&self) -> usize; + async fn pending_extrinsics(&self) -> usize; fn transaction_monitor(&self) -> Option<&dyn TransactionMonitor>; }