Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ macro_rules! new_full_start {
.with_select_chain(|_config, backend| {
Ok(sc_client::LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, _fetcher| {
.with_transaction_pool(|config, client, _fetcher, prometheus_registry| {
Copy link
Contributor

@tomaka tomaka Apr 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is the most straight-forward solution for this PR, but ugh this API. It doesn't make sense at all.
We should really remove these closures and simply have prometheus_registry (and all the other components) be a local variable.
We can't just continue adding parameters to these closures (which is a breaking change every single time) whenever we need something that is in the builder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not requesting any change on this PR in particular, but I thought I'd comment on that in general.

Copy link
Contributor Author

@NikVolf NikVolf Apr 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you seen this PR?
#5557

let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api)))
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
let select_chain = select_chain.take()
Expand Down Expand Up @@ -183,13 +183,13 @@ pub fn new_light(config: Configuration)
.with_select_chain(|_config, backend| {
Ok(LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, fetcher| {
.with_transaction_pool(|config, client, fetcher, prometheus_registry| {
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;

let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light,
config, Arc::new(pool_api), prometheus_registry, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
Expand Down
8 changes: 4 additions & 4 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ macro_rules! new_full_start {
.with_select_chain(|_config, backend| {
Ok(sc_client::LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, _fetcher| {
.with_transaction_pool(|config, client, _fetcher, prometheus_registry| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api)))
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
let select_chain = select_chain.take()
Expand Down Expand Up @@ -295,12 +295,12 @@ pub fn new_light(config: Configuration)
.with_select_chain(|_config, backend| {
Ok(LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, fetcher| {
.with_transaction_pool(|config, client, fetcher, prometheus_registry| {
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light,
config, Arc::new(pool_api), prometheus_registry, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
Expand Down
25 changes: 21 additions & 4 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,11 @@ mod tests {
// given
let client = Arc::new(substrate_test_runtime_client::new());
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);

futures::executor::block_on(
Expand Down Expand Up @@ -408,7 +412,11 @@ mod tests {
fn should_not_panic_when_deadline_is_reached() {
let client = Arc::new(substrate_test_runtime_client::new());
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);

let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
Expand Down Expand Up @@ -440,8 +448,13 @@ mod tests {
.build_with_backend();
let client = Arc::new(client);
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);

let genesis_hash = client.info().best_hash;
let block_id = BlockId::Hash(genesis_hash);

Expand Down Expand Up @@ -493,7 +506,11 @@ mod tests {
// given
let mut client = Arc::new(substrate_test_runtime_client::new());
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);

futures::executor::block_on(
Expand Down
2 changes: 1 addition & 1 deletion client/basic-authorship/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! # use substrate_test_runtime_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring};
//! # use sc_transaction_pool::{BasicPool, FullChainApi};
//! # let client = Arc::new(substrate_test_runtime_client::new());
//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0);
//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone())), None).0);
//! // The first step is to create a `ProposerFactory`.
//! let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
//!
Expand Down
6 changes: 3 additions & 3 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ mod tests {
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let inherent_data_providers = InherentDataProviders::new();
let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
let pool = Arc::new(BasicPool::new(Options::default(), api(), None).0);
let env = ProposerFactory::new(
client.clone(),
pool.clone()
Expand Down Expand Up @@ -281,7 +281,7 @@ mod tests {
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let inherent_data_providers = InherentDataProviders::new();
let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
let pool = Arc::new(BasicPool::new(Options::default(), api(), None).0);
let env = ProposerFactory::new(
client.clone(),
pool.clone()
Expand Down Expand Up @@ -349,7 +349,7 @@ mod tests {
let client = Arc::new(client);
let inherent_data_providers = InherentDataProviders::new();
let pool_api = api();
let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone()).0);
let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone(), None).0);
let env = ProposerFactory::new(
client.clone(),
pool.clone(),
Expand Down
1 change: 1 addition & 0 deletions client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ mod tests {
let pool = Arc::new(TestPool(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0));
client.execution_extensions()
.register_transaction_pool(Arc::downgrade(&pool.clone()) as _);
Expand Down
1 change: 1 addition & 0 deletions client/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl Default for TestSetup {
let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0);
TestSetup {
runtime: runtime::Runtime::new().expect("Failed to create runtime in test setup"),
Expand Down
3 changes: 3 additions & 0 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use wasm_timer::SystemTime;
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent};
use sp_blockchain;
use prometheus_endpoint::Registry as PrometheusRegistry;

pub type BackgroundTask = Pin<Box<dyn Future<Output=()> + Send>>;

Expand Down Expand Up @@ -585,6 +586,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
sc_transaction_pool::txpool::Options,
Arc<TCl>,
Option<TFchr>,
Option<&PrometheusRegistry>,
) -> Result<(UExPool, Option<BackgroundTask>), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
UExPool, TRpc, Backend>, Error>
Expand All @@ -593,6 +595,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
self.config.transaction_pool.clone(),
self.client.clone(),
self.fetcher.clone(),
self.config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

if let Some(background_task) = background_task{
Expand Down
1 change: 1 addition & 0 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ mod tests {
let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0);
let source = sp_runtime::transaction_validity::TransactionSource::External;
let best = longest_chain.best_chain().unwrap();
Expand Down
1 change: 1 addition & 0 deletions client/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ sc-client-api = { version = "2.0.0-dev", path = "../api" }
sp-blockchain = { version = "2.0.0-dev", path = "../../primitives/blockchain" }
intervalier = "0.4.0"
parity-util-mem = { version = "0.6.0", default-features = false, features = ["primitive-types"] }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-dev"}

[dev-dependencies]
assert_matches = "1.3.0"
Expand Down
69 changes: 63 additions & 6 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#![warn(unused_extern_crates)]

mod api;
pub mod error;
mod revalidation;
mod metrics;

pub mod error;

#[cfg(any(feature = "test-helpers", test))]
pub mod testing;
Expand All @@ -45,6 +47,9 @@ use sp_transaction_pool::{
};
use wasm_timer::Instant;

use prometheus_endpoint::Registry as PrometheusRegistry;
use crate::metrics::Metrics as PrometheusMetrics;

type BoxedReadyIterator<Hash, Data> = Box<dyn Iterator<Item=Arc<sc_transaction_graph::base_pool::Transaction<Hash, Data>>> + Send>;

type ReadyIteratorFor<PoolApi> = BoxedReadyIterator<sc_transaction_graph::ExHash<PoolApi>, sc_transaction_graph::ExtrinsicFor<PoolApi>>;
Expand All @@ -62,6 +67,7 @@ pub struct BasicPool<PoolApi, Block>
revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
metrics: Arc<Option<PrometheusMetrics>>,
}

struct ReadyPoll<T, Block: BlockT> {
Expand Down Expand Up @@ -147,8 +153,9 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
pub fn new(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
prometheus: Option<&PrometheusRegistry>,
) -> (Self, Option<Pin<Box<dyn Future<Output=()> + Send>>>) {
Self::with_revalidation_type(options, pool_api, RevalidationType::Full)
Self::with_revalidation_type(options, pool_api, prometheus, RevalidationType::Full)
}

/// Create new basic transaction pool with provided api, for tests.
Expand All @@ -166,6 +173,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
revalidation_queue: Arc::new(revalidation_queue),
revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
ready_poll: Default::default(),
metrics: Arc::new(None),
},
background_task,
notifier,
Expand All @@ -177,6 +185,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
pub fn with_revalidation_type(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
prometheus: Option<&PrometheusRegistry>,
revalidation_type: RevalidationType,
) -> (Self, Option<Pin<Box<dyn Future<Output=()> + Send>>>) {
let pool = Arc::new(sc_transaction_graph::Pool::new(options, pool_api.clone()));
Expand All @@ -187,6 +196,16 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
(queue, Some(background))
},
};

let metrics = Arc::new(
prometheus.and_then(|registry|
PrometheusMetrics::register(registry).map_err(|err| {
log::warn!("Failed to register prometheus metrics: {}", err);
()
}).ok()
)
);

(
BasicPool {
api: pool_api,
Expand All @@ -199,6 +218,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
}
)),
ready_poll: Default::default(),
metrics,
},
background_task,
)
Expand Down Expand Up @@ -228,8 +248,20 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
let pool = self.pool.clone();
let at = *at;

if let Some(metrics) = self.metrics.as_ref() {
metrics.validation_pending.add(xts.len() as u64);
}
let metrics = self.metrics.clone();

async move {
pool.submit_at(&at, source, xts, false).await
let tx_count = xts.len();
let res = pool.submit_at(&at, source, xts, false).await;
if let Some(metrics) = metrics.as_ref() {
metrics.validation_pending.sub(tx_count as u64);
metrics.total_validated.inc_by(tx_count as u64);
}
res
}.boxed()
}

Expand All @@ -241,8 +273,21 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
) -> PoolFuture<TxHash<Self>, Self::Error> {
let pool = self.pool.clone();
let at = *at;

if let Some(metrics) = self.metrics.as_ref() {
metrics.validation_pending.inc();
}
let metrics = self.metrics.clone();

async move {
pool.submit_one(&at, source, xt).await
let res = pool.submit_one(&at, source, xt).await;

if let Some(metrics) = metrics.as_ref() {
metrics.validation_pending.dec();
metrics.total_validated.inc();
}
res

}.boxed()
}

Expand All @@ -255,10 +300,22 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
let at = *at;
let pool = self.pool.clone();

if let Some(metrics) = self.metrics.as_ref() {
metrics.validation_pending.inc();
}
let metrics = self.metrics.clone();

async move {
pool.submit_and_watch(&at, source, xt)
let result = pool.submit_and_watch(&at, source, xt)
.map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _))
.await
.await;

if let Some(metrics) = metrics.as_ref() {
metrics.validation_pending.dec();
metrics.total_validated.inc();
}

result
}.boxed()
}

Expand Down
Loading