From b25c2f2de346b2503a295534437c666065a3e03a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 7 Jul 2020 11:33:20 +0200 Subject: [PATCH 1/5] Improve transaction submission Before this pr the transaction pool validated each transaction, even if the transaction was already known to the pool. This pr changes the behavior to first check if we are already aware of a transaction and thus, to only validate them if we don't know them yet. However, there is still the possibility that a given transaction is validated multiple times. This can happen if the transaction is added the first time, but is not yet validated and added to the validated pool. Besides that, this pr fixes the wrong metrics of gossiped transactions in the network. It also moves some metrics to the transaction pool api, to better track when a transaction actually is scheduled for validation. --- bin/node-template/node/src/service.rs | 1 + bin/node/cli/src/service.rs | 1 + .../basic-authorship/src/basic_authorship.rs | 8 +- client/basic-authorship/src/lib.rs | 6 +- client/network/src/protocol.rs | 16 ++-- client/offchain/src/lib.rs | 2 +- client/rpc/src/author/tests.rs | 2 +- client/service/src/lib.rs | 2 +- .../transaction-pool/graph/src/base_pool.rs | 7 +- client/transaction-pool/graph/src/pool.rs | 78 +++++++++---------- .../graph/src/validated_pool.rs | 21 ++++- client/transaction-pool/src/api.rs | 28 ++++++- client/transaction-pool/src/lib.rs | 35 ++------- client/transaction-pool/src/metrics.rs | 58 +++++++++++--- client/transaction-pool/src/testing/pool.rs | 4 +- utils/frame/rpc/system/src/lib.rs | 8 +- 16 files changed, 168 insertions(+), 109 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 89bf159927fc6..3961971fbe37f 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -47,6 +47,7 @@ macro_rules! new_full_start { .with_transaction_pool(|builder| { let pool_api = sc_transaction_pool::FullChainApi::new( builder.client().clone(), + None, ); Ok(sc_transaction_pool::BasicPool::new( builder.config().transaction_pool.clone(), diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 632092cdaa188..70c2d10964f2f 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -61,6 +61,7 @@ macro_rules! new_full_start { .with_transaction_pool(|builder| { let pool_api = sc_transaction_pool::FullChainApi::new( builder.client().clone(), + builder.prometheus_registry(), ); let config = builder.config(); diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index 7343b13c04031..581da62737ad1 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -361,7 +361,7 @@ mod tests { let txpool = Arc::new( BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0 ); @@ -414,7 +414,7 @@ mod tests { let txpool = Arc::new( BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0 ); @@ -449,7 +449,7 @@ mod tests { let txpool = Arc::new( BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0 ); @@ -511,7 +511,7 @@ mod tests { let txpool = Arc::new( BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0 ); diff --git a/client/basic-authorship/src/lib.rs b/client/basic-authorship/src/lib.rs index 4f53c87de3979..bc51037277612 100644 --- a/client/basic-authorship/src/lib.rs +++ b/client/basic-authorship/src/lib.rs @@ -31,7 +31,11 @@ //! # }; //! # use sc_transaction_pool::{BasicPool, FullChainApi}; //! # let client = Arc::new(substrate_test_runtime_client::new()); -//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone())), None).0); +//! # let txpool = Arc::new(BasicPool::new( +//! # Default::default(), +//! # Arc::new(FullChainApi::new(client.clone(), None)), +//! # None).0, +//! # ); //! // The first step is to create a `ProposerFactory`. //! let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone(), None); //! diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index ff3748bd55cf2..9a273413f7287 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -1141,7 +1141,7 @@ impl Protocol { fn on_transactions( &mut self, who: PeerId, - transactions: message::Transactions + transactions: message::Transactions, ) { // sending transaction to light node is considered a bad behavior if !self.config.roles.is_full() { @@ -1211,7 +1211,9 @@ impl Protocol { &mut self, transactions: &[(H, B::Extrinsic)], ) -> HashMap> { - let mut propagated_to = HashMap::new(); + let mut propagated_to = HashMap::<_, Vec<_>>::new(); + let mut propagated_transactions = 0; + for (who, peer) in self.context_data.peers.iter_mut() { // never send transactions to the light node if !peer.info.roles.is_full() { @@ -1224,11 +1226,13 @@ impl Protocol { .cloned() .unzip(); + propagated_transactions += hashes.len(); + if !to_send.is_empty() { for hash in hashes { propagated_to .entry(hash) - .or_insert_with(Vec::new) + .or_default() .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); @@ -1243,10 +1247,8 @@ impl Protocol { } } - if propagated_to.len() > 0 { - if let Some(ref metrics) = self.metrics { - metrics.propagated_transactions.inc(); - } + if let Some(ref metrics) = self.metrics { + metrics.propagated_transactions.inc_by(propagated_transactions as _) } propagated_to diff --git a/client/offchain/src/lib.rs b/client/offchain/src/lib.rs index 7c90065746aa3..2f50ede7adef2 100644 --- a/client/offchain/src/lib.rs +++ b/client/offchain/src/lib.rs @@ -250,7 +250,7 @@ mod tests { let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new(TestPool(BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0)); client.execution_extensions() diff --git a/client/rpc/src/author/tests.rs b/client/rpc/src/author/tests.rs index f2f4ddebb2f1d..870390969c225 100644 --- a/client/rpc/src/author/tests.rs +++ b/client/rpc/src/author/tests.rs @@ -63,7 +63,7 @@ impl Default for TestSetup { let pool = Arc::new(BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0); TestSetup { diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 978b77974fbb3..4614d1f97a5bc 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -557,7 +557,7 @@ mod tests { let client = Arc::new(client); let pool = Arc::new(BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0); let source = sp_runtime::transaction_validity::TransactionSource::External; diff --git a/client/transaction-pool/graph/src/base_pool.rs b/client/transaction-pool/graph/src/base_pool.rs index 0128e94675e23..25da341e6794e 100644 --- a/client/transaction-pool/graph/src/base_pool.rs +++ b/client/transaction-pool/graph/src/base_pool.rs @@ -261,6 +261,11 @@ impl BasePool bool { + self.future.contains(tx_hash) || self.ready.contains(tx_hash) + } + /// Imports transaction to the pool. /// /// The pool consists of two parts: Future and Ready. @@ -272,7 +277,7 @@ impl BasePool, ) -> error::Result> { - if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) { + if self.is_imported(&tx.hash) { return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone()))) } diff --git a/client/transaction-pool/graph/src/pool.rs b/client/transaction-pool/graph/src/pool.rs index e4d81c38ae38e..6a18e5c9d4935 100644 --- a/client/transaction-pool/graph/src/pool.rs +++ b/client/transaction-pool/graph/src/pool.rs @@ -23,7 +23,7 @@ use std::{ use crate::{base_pool as base, watcher::Watcher}; -use futures::{Future, FutureExt}; +use futures::Future; use sp_runtime::{ generic::BlockId, traits::{self, SaturatedConversion, Block as BlockT}, @@ -149,23 +149,27 @@ impl Pool { } /// Imports a bunch of unverified extrinsics to the pool - pub async fn submit_at( + pub async fn submit_at( &self, at: &BlockId, source: TransactionSource, - xts: T, - force: bool, - ) -> Result, B::Error>>, B::Error> where - T: IntoIterator>, - { - let validated_pool = self.validated_pool.clone(); + xts: impl IntoIterator>, + ) -> Result, B::Error>>, B::Error> { let xts = xts.into_iter().map(|xt| (source, xt)); - self.verify(at, xts, force) - .map(move |validated_transactions| validated_transactions - .map(|validated_transactions| validated_pool.submit(validated_transactions - .into_iter() - .map(|(_, tx)| tx)))) - .await + let validated_transactions = self.verify(at, xts, true).await?; + Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx))) + } + + /// Resubmit the given extrinsics to the pool. + pub async fn resubmit_at( + &self, + at: &BlockId, + source: TransactionSource, + xts: impl IntoIterator>, + ) -> Result, B::Error>>, B::Error> { + let xts = xts.into_iter().map(|xt| (source, xt)); + let validated_transactions = self.verify(at, xts, false).await?; + Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx))) } /// Imports one unverified extrinsic to the pool @@ -175,12 +179,8 @@ impl Pool { source: TransactionSource, xt: ExtrinsicFor, ) -> Result, B::Error> { - self.submit_at(at, source, std::iter::once(xt), false) - .map(|import_result| import_result.and_then(|mut import_result| import_result - .pop() - .expect("One extrinsic passed; one result returned; qed") - )) - .await + let res = self.submit_at(at, source, std::iter::once(xt)).await?.pop(); + res.expect("One extrinsic passed; one result returned; qed") } /// Import a single extrinsic and starts to watch their progress in the pool. @@ -191,9 +191,7 @@ impl Pool { xt: ExtrinsicFor, ) -> Result, ExtrinsicHash>, B::Error> { let block_number = self.resolve_block_number(at)?; - let (_, tx) = self.verify_one( - at, block_number, source, xt, false - ).await; + let (_, tx) = self.verify_one(at, block_number, source, xt, true).await; self.validated_pool.submit_and_watch(tx) } @@ -328,7 +326,7 @@ impl Pool { .into_iter() .map(|tx| (tx.source, tx.data.clone())); - let reverified_transactions = self.verify(at, pruned_transactions, false).await?; + let reverified_transactions = self.verify(at, pruned_transactions, true).await?; log::trace!(target: "txpool", "Pruning at {:?}. Resubmitting transactions.", at); // And finally - submit reverified transactions back to the pool @@ -358,23 +356,17 @@ impl Pool { &self, at: &BlockId, xts: impl IntoIterator)>, - force: bool, + check_is_known: bool, ) -> Result, ValidatedTransactionFor>, B::Error> { // we need a block number to compute tx validity let block_number = self.resolve_block_number(at)?; - let mut result = HashMap::new(); - - for (hash, validated_tx) in - futures::future::join_all( - xts.into_iter() - .map(|(source, xt)| self.verify_one(at, block_number, source, xt, force)) - ) - .await - { - result.insert(hash, validated_tx); - } - Ok(result) + let res = futures::future::join_all( + xts.into_iter() + .map(|(source, xt)| self.verify_one(at, block_number, source, xt, check_is_known)) + ).await.into_iter().collect::>(); + + Ok(res) } /// Returns future that validates single transaction at given block. @@ -384,14 +376,14 @@ impl Pool { block_number: NumberFor, source: TransactionSource, xt: ExtrinsicFor, - force: bool, + check_is_known: bool, ) -> (ExtrinsicHash, ValidatedTransactionFor) { let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt); - if !force && self.validated_pool.is_banned(&hash) { - return ( - hash.clone(), - ValidatedTransaction::Invalid(hash, error::Error::TemporarilyBanned.into()), - ) + + if check_is_known { + if let Err(err) = self.validated_pool.check_is_known(&hash) { + return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into())) + } } let validation_result = self.validated_pool.api().validate_transaction( diff --git a/client/transaction-pool/graph/src/validated_pool.rs b/client/transaction-pool/graph/src/validated_pool.rs index d730b892e3502..a406c39f4f6b3 100644 --- a/client/transaction-pool/graph/src/validated_pool.rs +++ b/client/transaction-pool/graph/src/validated_pool.rs @@ -137,10 +137,24 @@ impl ValidatedPool { self.rotator.is_banned(hash) } + /// A fast check before doing any further processing of a transaction, like validation. + /// + /// It checks if the transaction is already imported or banned. If so, it returns an error. + pub fn check_is_known(&self, tx_hash: &ExtrinsicHash) -> Result<(), B::Error> { + if self.is_banned(tx_hash) { + Err(error::Error::TemporarilyBanned.into()) + } else if self.pool.read().is_imported(tx_hash) { + Err(error::Error::AlreadyImported(Box::new(tx_hash.clone())).into()) + } else { + Ok(()) + } + } + /// Imports a bunch of pre-validated transactions to the pool. - pub fn submit(&self, txs: T) -> Vec, B::Error>> where - T: IntoIterator> - { + pub fn submit( + &self, + txs: impl IntoIterator>, + ) -> Vec, B::Error>> { let results = txs.into_iter() .map(|validated_tx| self.submit_one(validated_tx)) .collect::>(); @@ -175,6 +189,7 @@ impl ValidatedPool { }, ValidatedTransaction::Invalid(hash, err) => { self.rotator.ban(&Instant::now(), std::iter::once(hash)); + self.listener.write().invalid(&hash, false); Err(err.into()) }, ValidatedTransaction::Unknown(hash, err) => { diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 10ac4aa46960d..a14d5b0db18ea 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -33,19 +33,38 @@ use sp_runtime::{ }; use sp_transaction_pool::runtime_api::TaggedTransactionQueue; use sp_api::{ProvideRuntimeApi, ApiExt}; +use prometheus_endpoint::Registry as PrometheusRegistry; -use crate::error::{self, Error}; +use crate::{metrics::{ApiMetrics, ApiMetricsExt}, error::{self, Error}}; /// The transaction pool logic for full client. pub struct FullChainApi { client: Arc, pool: ThreadPool, _marker: PhantomData, + metrics: Option>, } impl FullChainApi { /// Create new transaction pool logic. - pub fn new(client: Arc) -> Self { + pub fn new( + client: Arc, + prometheus: Option<&PrometheusRegistry>, + ) -> Self { + let metrics = prometheus.map(ApiMetrics::register).and_then(|r| { + match r { + Err(err) => { + log::warn!( + target: "txpool", + "Failed to register transaction pool api prometheus metrics: {:?}", + err, + ); + None + }, + Ok(api) => Some(Arc::new(api)) + } + }); + FullChainApi { client, pool: ThreadPoolBuilder::new() @@ -54,6 +73,7 @@ impl FullChainApi { .create() .expect("Failed to spawn verifier threads, that are critical for node operation."), _marker: Default::default(), + metrics, } } } @@ -87,6 +107,9 @@ where let client = self.client.clone(); let at = at.clone(); + let metrics = self.metrics.clone(); + metrics.report(|m| m.validations_scheduled.inc()); + self.pool.spawn_ok(futures_diagnose::diagnose( "validate-transaction", async move { @@ -94,6 +117,7 @@ where if let Err(e) = tx.send(res) { log::warn!("Unable to send a validate transaction result: {:?}", e); } + metrics.report(|m| m.validations_finished.inc()); }, )); diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index ea8b4bf9dec81..a7504eb694378 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -248,15 +248,9 @@ impl TransactionPool for BasicPool let pool = self.pool.clone(); let at = *at; - self.metrics.report(|metrics| metrics.validations_scheduled.inc_by(xts.len() as u64)); + self.metrics.report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64)); - let metrics = self.metrics.clone(); - async move { - let tx_count = xts.len(); - let res = pool.submit_at(&at, source, xts, false).await; - metrics.report(|metrics| metrics.validations_finished.inc_by(tx_count as u64)); - res - }.boxed() + async move { pool.submit_at(&at, source, xts).await }.boxed() } fn submit_one( @@ -268,16 +262,9 @@ impl TransactionPool for BasicPool let pool = self.pool.clone(); let at = *at; - self.metrics.report(|metrics| metrics.validations_scheduled.inc()); - - let metrics = self.metrics.clone(); - async move { - let res = pool.submit_one(&at, source, xt).await; - - metrics.report(|metrics| metrics.validations_finished.inc()); - res + self.metrics.report(|metrics| metrics.submitted_transactions.inc()); - }.boxed() + async move { pool.submit_one(&at, source, xt).await }.boxed() } fn submit_and_watch( @@ -289,17 +276,12 @@ impl TransactionPool for BasicPool let at = *at; let pool = self.pool.clone(); - self.metrics.report(|metrics| metrics.validations_scheduled.inc()); + self.metrics.report(|metrics| metrics.submitted_transactions.inc()); - let metrics = self.metrics.clone(); async move { - let result = pool.submit_and_watch(&at, source, xt) + pool.submit_and_watch(&at, source, xt) .map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _)) - .await; - - metrics.report(|metrics| metrics.validations_finished.inc()); - - result + .await }.boxed() } @@ -632,13 +614,12 @@ impl MaintainedTransactionPool for BasicPool ); } - if let Err(e) = pool.submit_at( + if let Err(e) = pool.resubmit_at( &id, // These transactions are coming from retracted blocks, we should // simply consider them external. TransactionSource::External, resubmit_transactions, - true, ).await { log::debug!( target: "txpool", diff --git a/client/transaction-pool/src/metrics.rs b/client/transaction-pool/src/metrics.rs index d5a10dfd6f4bd..376e6dfe94488 100644 --- a/client/transaction-pool/src/metrics.rs +++ b/client/transaction-pool/src/metrics.rs @@ -45,8 +45,7 @@ impl MetricsLink { /// Transaction pool Prometheus metrics. pub struct Metrics { - pub validations_scheduled: Counter, - pub validations_finished: Counter, + pub submitted_transactions: Counter, pub validations_invalid: Counter, pub block_transactions_pruned: Counter, pub block_transactions_resubmitted: Counter, @@ -55,17 +54,10 @@ pub struct Metrics { impl Metrics { pub fn register(registry: &Registry) -> Result { Ok(Self { - validations_scheduled: register( + submitted_transactions: register( Counter::new( - "sub_txpool_validations_scheduled", - "Total number of transactions scheduled for validation", - )?, - registry, - )?, - validations_finished: register( - Counter::new( - "sub_txpool_validations_finished", - "Total number of transactions that finished validation", + "sub_txpool_submitted_transactions", + "Total number of transactions submitted", )?, registry, )?, @@ -93,3 +85,45 @@ impl Metrics { }) } } + +/// Transaction pool api Prometheus metrics. +pub struct ApiMetrics { + pub validations_scheduled: Counter, + pub validations_finished: Counter, +} + +impl ApiMetrics { + /// Register the metrics at the given Prometheus registry. + pub fn register(registry: &Registry) -> Result { + Ok(Self { + validations_scheduled: register( + Counter::new( + "sub_txpool_validations_scheduled", + "Total number of transactions scheduled for validation", + )?, + registry, + )?, + validations_finished: register( + Counter::new( + "sub_txpool_validations_finished", + "Total number of transactions that finished validation", + )?, + registry, + )?, + }) + } +} + +/// An extension trait for [`ApiMetrics`]. +pub trait ApiMetricsExt { + /// Report an event to the metrics. + fn report(&self, report: impl FnOnce(&ApiMetrics)); +} + +impl ApiMetricsExt for Option> { + fn report(&self, report: impl FnOnce(&ApiMetrics)) { + if let Some(metrics) = self.as_ref() { + report(metrics) + } + } +} diff --git a/client/transaction-pool/src/testing/pool.rs b/client/transaction-pool/src/testing/pool.rs index 5ad79a6f75d87..a938313733ecd 100644 --- a/client/transaction-pool/src/testing/pool.rs +++ b/client/transaction-pool/src/testing/pool.rs @@ -1008,7 +1008,7 @@ fn should_not_accept_old_signatures() { let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client))).0 + BasicPool::new_test(Arc::new(FullChainApi::new(client, None))).0 ); let transfer = Transfer { @@ -1044,7 +1044,7 @@ fn import_notification_to_pool_maintain_works() { let mut client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new_test(Arc::new(FullChainApi::new(client.clone()))).0 + BasicPool::new_test(Arc::new(FullChainApi::new(client.clone(), None))).0 ); // Prepare the extrisic, push it to the pool and check that it was added. diff --git a/utils/frame/rpc/system/src/lib.rs b/utils/frame/rpc/system/src/lib.rs index 6927f05b4f05b..3382453b1dd08 100644 --- a/utils/frame/rpc/system/src/lib.rs +++ b/utils/frame/rpc/system/src/lib.rs @@ -301,7 +301,7 @@ mod tests { let pool = Arc::new( BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0 ); @@ -340,7 +340,7 @@ mod tests { let pool = Arc::new( BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0 ); @@ -363,7 +363,7 @@ mod tests { let pool = Arc::new( BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0 ); @@ -395,7 +395,7 @@ mod tests { let pool = Arc::new( BasicPool::new( Default::default(), - Arc::new(FullChainApi::new(client.clone())), + Arc::new(FullChainApi::new(client.clone(), None)), None, ).0 ); From 16c65bf7e59a1706132bf0e64e3a4c6afa6320d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 7 Jul 2020 21:21:50 +0200 Subject: [PATCH 2/5] Make sure we don't submit the same transaction twice from the network concurrently --- client/network/src/protocol.rs | 53 ++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 9a273413f7287..88eb7a0e40574 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -51,7 +51,7 @@ use message::generic::{Message as GenericMessage, ConsensusMessage, Roles}; use prometheus_endpoint::{Registry, Gauge, Counter, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64}; use sync::{ChainSync, SyncState}; use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map::Entry}; use std::sync::Arc; use std::fmt::Write; use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time}; @@ -199,18 +199,21 @@ impl Metrics { } } -struct PendingTransaction { +#[pin_project::pin_project] +struct PendingTransaction { + #[pin] validation: TransactionImportFuture, - peer_id: PeerId, + tx_hash: H, } -impl Future for PendingTransaction { - type Output = (PeerId, TransactionImport); +impl Future for PendingTransaction { + type Output = (H, TransactionImport); fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - let this = Pin::into_inner(self); - if let Poll::Ready(import_result) = this.validation.poll_unpin(cx) { - return Poll::Ready((this.peer_id.clone(), import_result)); + let mut this = self.project(); + + if let Poll::Ready(import_result) = Pin::new(&mut this.validation).poll_unpin(cx) { + return Poll::Ready((this.tx_hash.clone(), import_result)); } Poll::Pending @@ -226,7 +229,12 @@ pub struct Protocol { /// Pending list of messages to return from `poll` as a priority. pending_messages: VecDeque>, /// Pending transactions verification tasks. - pending_transactions: FuturesUnordered, + pending_transactions: FuturesUnordered>, + /// As multiple peers can send us the same transaction, we group + /// these peers using the transaction hash while the transaction is + /// imported. This prevents that we import the same transaction + /// multiple times concurrently. + pending_transactions_peers: HashMap>, config: ProtocolConfig, genesis_hash: B::Hash, sync: ChainSync, @@ -435,6 +443,7 @@ impl Protocol { propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)), pending_messages: VecDeque::new(), pending_transactions: FuturesUnordered::new(), + pending_transactions_peers: HashMap::new(), config, context_data: ContextData { peers: HashMap::new(), @@ -1170,14 +1179,22 @@ impl Protocol { } let hash = self.transaction_pool.hash_of(&t); - peer.known_transactions.insert(hash); + peer.known_transactions.insert(hash.clone()); self.peerset_handle.report_peer(who.clone(), rep::ANY_TRANSACTION); - self.pending_transactions.push(PendingTransaction { - peer_id: who.clone(), - validation: self.transaction_pool.import(t), - }); + match self.pending_transactions_peers.entry(hash.clone()) { + Entry::Vacant(entry) => { + self.pending_transactions.push(PendingTransaction { + validation: self.transaction_pool.import(t), + tx_hash: hash, + }); + entry.insert(vec![who.clone()]); + }, + Entry::Occupied(mut entry) => { + entry.get_mut().push(who.clone()); + } + } } } } @@ -2013,8 +2030,12 @@ impl NetworkBehaviour for Protocol { }; self.pending_messages.push_back(event); } - if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) { - self.on_handle_transaction_import(peer_id, result); + if let Poll::Ready(Some((tx_hash, result))) = self.pending_transactions.poll_next_unpin(cx) { + if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) { + peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result)); + } else { + warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!"); + } } if let Some(message) = self.pending_messages.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)); From 9c4e575359c21263062c52b6a20c0bcaad1f74c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 7 Jul 2020 21:59:25 +0200 Subject: [PATCH 3/5] Remove added listener call --- client/transaction-pool/graph/src/validated_pool.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/transaction-pool/graph/src/validated_pool.rs b/client/transaction-pool/graph/src/validated_pool.rs index a406c39f4f6b3..49ddb1105db4a 100644 --- a/client/transaction-pool/graph/src/validated_pool.rs +++ b/client/transaction-pool/graph/src/validated_pool.rs @@ -189,7 +189,6 @@ impl ValidatedPool { }, ValidatedTransaction::Invalid(hash, err) => { self.rotator.ban(&Instant::now(), std::iter::once(hash)); - self.listener.write().invalid(&hash, false); Err(err.into()) }, ValidatedTransaction::Unknown(hash, err) => { From 76381454a4c45a8fb329e1706133dde38e77cec6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 8 Jul 2020 15:21:27 +0200 Subject: [PATCH 4/5] Feedback --- client/transaction-pool/graph/src/pool.rs | 26 ++++++++++++++++------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/client/transaction-pool/graph/src/pool.rs b/client/transaction-pool/graph/src/pool.rs index 6a18e5c9d4935..4378d88dc733a 100644 --- a/client/transaction-pool/graph/src/pool.rs +++ b/client/transaction-pool/graph/src/pool.rs @@ -125,6 +125,14 @@ impl Default for Options { } } +/// Should we check that the transaction exists +/// in the pool, before we verify it? +#[derive(Copy, Clone)] +enum CheckBeforeVerify { + Yes, + No, +} + /// Extrinsics pool that performs validation. pub struct Pool { validated_pool: Arc>, @@ -156,11 +164,13 @@ impl Pool { xts: impl IntoIterator>, ) -> Result, B::Error>>, B::Error> { let xts = xts.into_iter().map(|xt| (source, xt)); - let validated_transactions = self.verify(at, xts, true).await?; + let validated_transactions = self.verify(at, xts, CheckBeforeVerify::Yes).await?; Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx))) } /// Resubmit the given extrinsics to the pool. + /// + /// This does not check if a transaction is banned, before we verify it again. pub async fn resubmit_at( &self, at: &BlockId, @@ -168,7 +178,7 @@ impl Pool { xts: impl IntoIterator>, ) -> Result, B::Error>>, B::Error> { let xts = xts.into_iter().map(|xt| (source, xt)); - let validated_transactions = self.verify(at, xts, false).await?; + let validated_transactions = self.verify(at, xts, CheckBeforeVerify::No).await?; Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx))) } @@ -191,7 +201,7 @@ impl Pool { xt: ExtrinsicFor, ) -> Result, ExtrinsicHash>, B::Error> { let block_number = self.resolve_block_number(at)?; - let (_, tx) = self.verify_one(at, block_number, source, xt, true).await; + let (_, tx) = self.verify_one(at, block_number, source, xt, CheckBeforeVerify::Yes).await; self.validated_pool.submit_and_watch(tx) } @@ -326,7 +336,7 @@ impl Pool { .into_iter() .map(|tx| (tx.source, tx.data.clone())); - let reverified_transactions = self.verify(at, pruned_transactions, true).await?; + let reverified_transactions = self.verify(at, pruned_transactions, CheckBeforeVerify::Yes).await?; log::trace!(target: "txpool", "Pruning at {:?}. Resubmitting transactions.", at); // And finally - submit reverified transactions back to the pool @@ -356,14 +366,14 @@ impl Pool { &self, at: &BlockId, xts: impl IntoIterator)>, - check_is_known: bool, + check: CheckBeforeVerify, ) -> Result, ValidatedTransactionFor>, B::Error> { // we need a block number to compute tx validity let block_number = self.resolve_block_number(at)?; let res = futures::future::join_all( xts.into_iter() - .map(|(source, xt)| self.verify_one(at, block_number, source, xt, check_is_known)) + .map(|(source, xt)| self.verify_one(at, block_number, source, xt, check)) ).await.into_iter().collect::>(); Ok(res) @@ -376,11 +386,11 @@ impl Pool { block_number: NumberFor, source: TransactionSource, xt: ExtrinsicFor, - check_is_known: bool, + check: CheckBeforeVerify, ) -> (ExtrinsicHash, ValidatedTransactionFor) { let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt); - if check_is_known { + if let CheckBeforeVerify::Yes = check { if let Err(err) = self.validated_pool.check_is_known(&hash) { return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into())) } From 24ff60195a18f6e3c5c842f0913ad465c018fbd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 8 Jul 2020 15:57:58 +0200 Subject: [PATCH 5/5] Ignore banned on resubmit --- client/transaction-pool/graph/src/pool.rs | 33 ++++++++++++------- .../graph/src/validated_pool.rs | 10 ++++-- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/client/transaction-pool/graph/src/pool.rs b/client/transaction-pool/graph/src/pool.rs index 4378d88dc733a..750d5f5d10eb7 100644 --- a/client/transaction-pool/graph/src/pool.rs +++ b/client/transaction-pool/graph/src/pool.rs @@ -125,10 +125,10 @@ impl Default for Options { } } -/// Should we check that the transaction exists +/// Should we check that the transaction is banned /// in the pool, before we verify it? #[derive(Copy, Clone)] -enum CheckBeforeVerify { +enum CheckBannedBeforeVerify { Yes, No, } @@ -164,7 +164,7 @@ impl Pool { xts: impl IntoIterator>, ) -> Result, B::Error>>, B::Error> { let xts = xts.into_iter().map(|xt| (source, xt)); - let validated_transactions = self.verify(at, xts, CheckBeforeVerify::Yes).await?; + let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await?; Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx))) } @@ -178,7 +178,7 @@ impl Pool { xts: impl IntoIterator>, ) -> Result, B::Error>>, B::Error> { let xts = xts.into_iter().map(|xt| (source, xt)); - let validated_transactions = self.verify(at, xts, CheckBeforeVerify::No).await?; + let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await?; Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx))) } @@ -201,7 +201,13 @@ impl Pool { xt: ExtrinsicFor, ) -> Result, ExtrinsicHash>, B::Error> { let block_number = self.resolve_block_number(at)?; - let (_, tx) = self.verify_one(at, block_number, source, xt, CheckBeforeVerify::Yes).await; + let (_, tx) = self.verify_one( + at, + block_number, + source, + xt, + CheckBannedBeforeVerify::Yes, + ).await; self.validated_pool.submit_and_watch(tx) } @@ -336,7 +342,11 @@ impl Pool { .into_iter() .map(|tx| (tx.source, tx.data.clone())); - let reverified_transactions = self.verify(at, pruned_transactions, CheckBeforeVerify::Yes).await?; + let reverified_transactions = self.verify( + at, + pruned_transactions, + CheckBannedBeforeVerify::Yes, + ).await?; log::trace!(target: "txpool", "Pruning at {:?}. Resubmitting transactions.", at); // And finally - submit reverified transactions back to the pool @@ -366,7 +376,7 @@ impl Pool { &self, at: &BlockId, xts: impl IntoIterator)>, - check: CheckBeforeVerify, + check: CheckBannedBeforeVerify, ) -> Result, ValidatedTransactionFor>, B::Error> { // we need a block number to compute tx validity let block_number = self.resolve_block_number(at)?; @@ -386,14 +396,13 @@ impl Pool { block_number: NumberFor, source: TransactionSource, xt: ExtrinsicFor, - check: CheckBeforeVerify, + check: CheckBannedBeforeVerify, ) -> (ExtrinsicHash, ValidatedTransactionFor) { let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt); - if let CheckBeforeVerify::Yes = check { - if let Err(err) = self.validated_pool.check_is_known(&hash) { - return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into())) - } + let ignore_banned = matches!(check, CheckBannedBeforeVerify::No); + if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) { + return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into())) } let validation_result = self.validated_pool.api().validate_transaction( diff --git a/client/transaction-pool/graph/src/validated_pool.rs b/client/transaction-pool/graph/src/validated_pool.rs index 49ddb1105db4a..bde76196ec4c5 100644 --- a/client/transaction-pool/graph/src/validated_pool.rs +++ b/client/transaction-pool/graph/src/validated_pool.rs @@ -139,9 +139,15 @@ impl ValidatedPool { /// A fast check before doing any further processing of a transaction, like validation. /// + /// If `ingore_banned` is `true`, it will not check if the transaction is banned. + /// /// It checks if the transaction is already imported or banned. If so, it returns an error. - pub fn check_is_known(&self, tx_hash: &ExtrinsicHash) -> Result<(), B::Error> { - if self.is_banned(tx_hash) { + pub fn check_is_known( + &self, + tx_hash: &ExtrinsicHash, + ignore_banned: bool, + ) -> Result<(), B::Error> { + if !ignore_banned && self.is_banned(tx_hash) { Err(error::Error::TemporarilyBanned.into()) } else if self.pool.read().is_imported(tx_hash) { Err(error::Error::AlreadyImported(Box::new(tx_hash.clone())).into())