diff --git a/Cargo.lock b/Cargo.lock index 051e6448b36bf..57aa346c2632e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6765,6 +6765,7 @@ dependencies = [ "sp-runtime", "sp-transaction-pool", "sp-utils", + "substrate-prometheus-endpoint", "substrate-test-runtime-client", "substrate-test-runtime-transaction-pool", "tracing", diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 7c4a574f6be04..b8e4d73db69f5 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -35,9 +35,9 @@ macro_rules! new_full_start { .with_select_chain(|_config, backend| { Ok(sc_client::LongestChain::new(backend.clone())) })? - .with_transaction_pool(|config, client, _fetcher| { + .with_transaction_pool(|config, client, _fetcher, prometheus_registry| { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); - Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api))) + Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry)) })? .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { let select_chain = select_chain.take() @@ -183,13 +183,13 @@ pub fn new_light(config: Configuration) .with_select_chain(|_config, backend| { Ok(LongestChain::new(backend.clone())) })? - .with_transaction_pool(|config, client, fetcher| { + .with_transaction_pool(|config, client, fetcher, prometheus_registry| { let fetcher = fetcher .ok_or_else(|| "Trying to start light transaction pool without active fetcher")?; let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone()); let pool = sc_transaction_pool::BasicPool::with_revalidation_type( - config, Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light, + config, Arc::new(pool_api), prometheus_registry, sc_transaction_pool::RevalidationType::Light, ); Ok(pool) })? diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 07099d9c97634..0acd553ea0127 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -56,9 +56,9 @@ macro_rules! new_full_start { .with_select_chain(|_config, backend| { Ok(sc_client::LongestChain::new(backend.clone())) })? - .with_transaction_pool(|config, client, _fetcher| { + .with_transaction_pool(|config, client, _fetcher, prometheus_registry| { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); - Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api))) + Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry)) })? .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { let select_chain = select_chain.take() @@ -312,12 +312,12 @@ pub fn new_light(config: Configuration) .with_select_chain(|_config, backend| { Ok(LongestChain::new(backend.clone())) })? - .with_transaction_pool(|config, client, fetcher| { + .with_transaction_pool(|config, client, fetcher, prometheus_registry| { let fetcher = fetcher .ok_or_else(|| "Trying to start light transaction pool without active fetcher")?; let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone()); let pool = sc_transaction_pool::BasicPool::with_revalidation_type( - config, Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light, + config, Arc::new(pool_api), prometheus_registry, sc_transaction_pool::RevalidationType::Light, ); Ok(pool) })? diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index 37bb34a4b67d9..e1e99938e375e 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -360,7 +360,11 @@ mod tests { // given let client = Arc::new(substrate_test_runtime_client::new()); let txpool = Arc::new( - BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0 + BasicPool::new( + Default::default(), + Arc::new(FullChainApi::new(client.clone())), + None, + ).0 ); futures::executor::block_on( @@ -408,7 +412,11 @@ mod tests { fn should_not_panic_when_deadline_is_reached() { let client = Arc::new(substrate_test_runtime_client::new()); let txpool = Arc::new( - BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0 + BasicPool::new( + Default::default(), + Arc::new(FullChainApi::new(client.clone())), + None, + ).0 ); let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone()); @@ -440,8 +448,13 @@ mod tests { .build_with_backend(); let client = Arc::new(client); let txpool = Arc::new( - BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0 + BasicPool::new( + Default::default(), + Arc::new(FullChainApi::new(client.clone())), + None, + ).0 ); + let genesis_hash = client.info().best_hash; let block_id = BlockId::Hash(genesis_hash); @@ -493,7 +506,11 @@ mod tests { // given let mut client = Arc::new(substrate_test_runtime_client::new()); let txpool = Arc::new( - BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0 + BasicPool::new( + Default::default(), + Arc::new(FullChainApi::new(client.clone())), + None, + ).0 ); futures::executor::block_on( diff --git a/client/basic-authorship/src/lib.rs b/client/basic-authorship/src/lib.rs index 5ec0bc6f9a520..5eb60f1cd586c 100644 --- a/client/basic-authorship/src/lib.rs +++ b/client/basic-authorship/src/lib.rs @@ -26,7 +26,7 @@ //! # use substrate_test_runtime_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring}; //! # use sc_transaction_pool::{BasicPool, FullChainApi}; //! # let client = Arc::new(substrate_test_runtime_client::new()); -//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0); +//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone())), None).0); //! // The first step is to create a `ProposerFactory`. //! let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone()); //! diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index 8294ae049f658..687d072aaa058 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -217,7 +217,7 @@ mod tests { let (client, select_chain) = builder.build_with_longest_chain(); let client = Arc::new(client); let inherent_data_providers = InherentDataProviders::new(); - let pool = Arc::new(BasicPool::new(Options::default(), api()).0); + let pool = Arc::new(BasicPool::new(Options::default(), api(), None).0); let env = ProposerFactory::new( client.clone(), pool.clone() @@ -281,7 +281,7 @@ mod tests { let (client, select_chain) = builder.build_with_longest_chain(); let client = Arc::new(client); let inherent_data_providers = InherentDataProviders::new(); - let pool = Arc::new(BasicPool::new(Options::default(), api()).0); + let pool = Arc::new(BasicPool::new(Options::default(), api(), None).0); let env = ProposerFactory::new( client.clone(), pool.clone() @@ -349,7 +349,7 @@ mod tests { let client = Arc::new(client); let inherent_data_providers = InherentDataProviders::new(); let pool_api = api(); - let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone()).0); + let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone(), None).0); let env = ProposerFactory::new( client.clone(), pool.clone(), diff --git a/client/offchain/src/lib.rs b/client/offchain/src/lib.rs index 94850e3fd3461..332e9f779a8e2 100644 --- a/client/offchain/src/lib.rs +++ b/client/offchain/src/lib.rs @@ -206,6 +206,7 @@ mod tests { let pool = Arc::new(TestPool(BasicPool::new( Default::default(), Arc::new(FullChainApi::new(client.clone())), + None, ).0)); client.execution_extensions() .register_transaction_pool(Arc::downgrade(&pool.clone()) as _); diff --git a/client/rpc/src/author/tests.rs b/client/rpc/src/author/tests.rs index 8b956c23a5e66..445888c523fe3 100644 --- a/client/rpc/src/author/tests.rs +++ b/client/rpc/src/author/tests.rs @@ -65,6 +65,7 @@ impl Default for TestSetup { let pool = Arc::new(BasicPool::new( Default::default(), Arc::new(FullChainApi::new(client.clone())), + None, ).0); TestSetup { runtime: runtime::Runtime::new().expect("Failed to create runtime in test setup"), diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 4f370c1118fb2..90e644481f76e 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -53,6 +53,7 @@ use wasm_timer::SystemTime; use sc_telemetry::{telemetry, SUBSTRATE_INFO}; use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent}; use sp_blockchain; +use prometheus_endpoint::Registry as PrometheusRegistry; pub type BackgroundTask = Pin + Send>>; @@ -585,6 +586,7 @@ impl sc_transaction_pool::txpool::Options, Arc, Option, + Option<&PrometheusRegistry>, ) -> Result<(UExPool, Option), Error> ) -> Result, Error> @@ -593,6 +595,7 @@ impl self.config.transaction_pool.clone(), self.client.clone(), self.fetcher.clone(), + self.config.prometheus_config.as_ref().map(|config| &config.registry), )?; if let Some(background_task) = background_task{ diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index f416d363deba9..97481fcc251b3 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -696,6 +696,7 @@ mod tests { let pool = Arc::new(BasicPool::new( Default::default(), Arc::new(FullChainApi::new(client.clone())), + None, ).0); let source = sp_runtime::transaction_validity::TransactionSource::External; let best = longest_chain.best_chain().unwrap(); diff --git a/client/transaction-pool/Cargo.toml b/client/transaction-pool/Cargo.toml index 2a757652f92ab..c96e4c03324e9 100644 --- a/client/transaction-pool/Cargo.toml +++ b/client/transaction-pool/Cargo.toml @@ -20,6 +20,7 @@ intervalier = "0.4.0" log = "0.4.8" parity-util-mem = { version = "0.6.1", default-features = false, features = ["primitive-types"] } parking_lot = "0.10.0" +prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-dev"} sc-client-api = { version = "2.0.0-dev", path = "../api" } sc-transaction-graph = { version = "2.0.0-dev", path = "./graph" } sp-api = { version = "2.0.0-dev", path = "../../primitives/api" } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index c50d9dbbb45ac..e095191c574fa 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -21,8 +21,10 @@ #![warn(unused_extern_crates)] mod api; -pub mod error; mod revalidation; +mod metrics; + +pub mod error; #[cfg(any(feature = "test-helpers", test))] pub mod testing; @@ -45,6 +47,9 @@ use sp_transaction_pool::{ }; use wasm_timer::Instant; +use prometheus_endpoint::Registry as PrometheusRegistry; +use crate::metrics::MetricsLink as PrometheusMetrics; + type BoxedReadyIterator = Box>> + Send>; type ReadyIteratorFor = BoxedReadyIterator, sc_transaction_graph::ExtrinsicFor>; @@ -62,6 +67,7 @@ pub struct BasicPool revalidation_strategy: Arc>>>, revalidation_queue: Arc>, ready_poll: Arc, Block>>>, + metrics: PrometheusMetrics, } struct ReadyPoll { @@ -147,8 +153,9 @@ impl BasicPool pub fn new( options: sc_transaction_graph::Options, pool_api: Arc, + prometheus: Option<&PrometheusRegistry>, ) -> (Self, Option + Send>>>) { - Self::with_revalidation_type(options, pool_api, RevalidationType::Full) + Self::with_revalidation_type(options, pool_api, prometheus, RevalidationType::Full) } /// Create new basic transaction pool with provided api, for tests. @@ -166,6 +173,7 @@ impl BasicPool revalidation_queue: Arc::new(revalidation_queue), revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)), ready_poll: Default::default(), + metrics: Default::default(), }, background_task, notifier, @@ -177,6 +185,7 @@ impl BasicPool pub fn with_revalidation_type( options: sc_transaction_graph::Options, pool_api: Arc, + prometheus: Option<&PrometheusRegistry>, revalidation_type: RevalidationType, ) -> (Self, Option + Send>>>) { let pool = Arc::new(sc_transaction_graph::Pool::new(options, pool_api.clone())); @@ -187,6 +196,7 @@ impl BasicPool (queue, Some(background)) }, }; + ( BasicPool { api: pool_api, @@ -199,6 +209,7 @@ impl BasicPool } )), ready_poll: Default::default(), + metrics: PrometheusMetrics::new(prometheus), }, background_task, ) @@ -228,8 +239,15 @@ impl TransactionPool for BasicPool ) -> PoolFuture, Self::Error>>, Self::Error> { let pool = self.pool.clone(); let at = *at; + + self.metrics.report(|metrics| metrics.validations_scheduled.inc_by(xts.len() as u64)); + + let metrics = self.metrics.clone(); async move { - pool.submit_at(&at, source, xts, false).await + let tx_count = xts.len(); + let res = pool.submit_at(&at, source, xts, false).await; + metrics.report(|metrics| metrics.validations_finished.inc_by(tx_count as u64)); + res }.boxed() } @@ -241,8 +259,16 @@ impl TransactionPool for BasicPool ) -> PoolFuture, Self::Error> { let pool = self.pool.clone(); let at = *at; + + self.metrics.report(|metrics| metrics.validations_scheduled.inc()); + + let metrics = self.metrics.clone(); async move { - pool.submit_one(&at, source, xt).await + let res = pool.submit_one(&at, source, xt).await; + + metrics.report(|metrics| metrics.validations_finished.inc()); + res + }.boxed() } @@ -255,10 +281,17 @@ impl TransactionPool for BasicPool let at = *at; let pool = self.pool.clone(); + self.metrics.report(|metrics| metrics.validations_scheduled.inc()); + + let metrics = self.metrics.clone(); async move { - pool.submit_and_watch(&at, source, xt) + let result = pool.submit_and_watch(&at, source, xt) .map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _)) - .await + .await; + + metrics.report(|metrics| metrics.validations_finished.inc()); + + result }.boxed() } diff --git a/client/transaction-pool/src/metrics.rs b/client/transaction-pool/src/metrics.rs new file mode 100644 index 0000000000000..78e49b3ca53af --- /dev/null +++ b/client/transaction-pool/src/metrics.rs @@ -0,0 +1,69 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Transaction pool Prometheus metrics. + +use std::sync::Arc; + +use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; + +#[derive(Clone, Default)] +pub struct MetricsLink(Arc>); + +impl MetricsLink { + pub fn new(registry: Option<&Registry>) -> Self { + Self(Arc::new( + registry.and_then(|registry| + Metrics::register(registry) + .map_err(|err| { log::warn!("Failed to register prometheus metrics: {}", err); }) + .ok() + ) + )) + } + + pub fn report(&self, do_this: impl FnOnce(&Metrics)) { + if let Some(metrics) = self.0.as_ref() { + do_this(metrics); + } + } +} + +/// Transaction pool Prometheus metrics. +pub struct Metrics { + pub validations_scheduled: Counter, + pub validations_finished: Counter, +} + +impl Metrics { + pub fn register(registry: &Registry) -> Result { + Ok(Self { + validations_scheduled: register( + Counter::new( + "sub_txpool_validations_scheduled", + "Total number of transactions scheduled for validation", + )?, + registry, + )?, + validations_finished: register( + Counter::new( + "sub_txpool_validations_finished", + "Total number of transactions that finished validation", + )?, + registry, + )?, + }) + } +} diff --git a/client/transaction-pool/src/testing/pool.rs b/client/transaction-pool/src/testing/pool.rs index f2815db1c36a7..45fb6f42c321f 100644 --- a/client/transaction-pool/src/testing/pool.rs +++ b/client/transaction-pool/src/testing/pool.rs @@ -415,7 +415,7 @@ fn finalization() { let xt = uxt(Alice, 209); let api = TestApi::with_alice_nonce(209); api.push_block(1, vec![]); - let (pool, _background) = BasicPool::new(Default::default(), api.into()); + let (pool, _background, _) = BasicPool::new_test(api.into()); let watcher = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, xt.clone()) ).expect("1. Imported"); @@ -446,7 +446,7 @@ fn fork_aware_finalization() { // starting block A1 (last finalized.) api.push_block(1, vec![]); - let (pool, _background) = BasicPool::new(Default::default(), api.into()); + let (pool, _background, _) = BasicPool::new_test(api.into()); let mut canon_watchers = vec![]; let from_alice = uxt(Alice, 1); @@ -677,7 +677,7 @@ fn should_not_accept_old_signatures() { let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client))).0 + BasicPool::new_test(Arc::new(FullChainApi::new(client))).0 ); let transfer = Transfer { diff --git a/utils/frame/rpc/system/src/lib.rs b/utils/frame/rpc/system/src/lib.rs index c73ddfe93efa0..4838e8e8f436d 100644 --- a/utils/frame/rpc/system/src/lib.rs +++ b/utils/frame/rpc/system/src/lib.rs @@ -236,7 +236,11 @@ mod tests { let _ = env_logger::try_init(); let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( - BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0 + BasicPool::new( + Default::default(), + Arc::new(FullChainApi::new(client.clone())), + None, + ).0 ); let source = sp_runtime::transaction_validity::TransactionSource::External;