Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion demo/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use demo_executor::NativeExecutor;
struct DummyPool;
impl extrinsic_pool::api::ExtrinsicPool<UncheckedExtrinsic, BlockId, Hash> for DummyPool {
type Error = extrinsic_pool::txpool::Error;
type InPool = ();

fn submit(&self, _block: BlockId, _: Vec<UncheckedExtrinsic>)
-> Result<Vec<Hash>, Self::Error>
Expand All @@ -79,6 +80,10 @@ impl extrinsic_pool::api::ExtrinsicPool<UncheckedExtrinsic, BlockId, Hash> for D
fn import_notification_stream(&self) -> extrinsic_pool::api::EventStream {
unreachable!()
}

fn all(&self) -> Self::InPool {
unreachable!()
}
}

struct DummySystem;
Expand Down Expand Up @@ -176,7 +181,7 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let state = rpc::apis::state::State::new(client.clone(), runtime.executor());
let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor());
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor());
rpc::rpc_handler::<Block, _, _, _, _>(state, chain, author, DummySystem)
rpc::rpc_handler::<Block, _, _, _, _, _>(state, chain, author, DummySystem)
};
let http_address = "127.0.0.1:9933".parse().unwrap();
let ws_address = "127.0.0.1:9944".parse().unwrap();
Expand Down
26 changes: 22 additions & 4 deletions polkadot/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod error;

use std::{
cmp::Ordering,
collections::HashMap,
collections::{BTreeMap, HashMap},
ops::Deref,
sync::Arc,
};
Expand All @@ -54,6 +54,7 @@ use extrinsic_pool::{
use polkadot_api::PolkadotApi;
use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic};
use runtime::{Address, UncheckedExtrinsic};
use substrate_primitives::Bytes;
use substrate_runtime_primitives::traits::{Bounded, Checkable, Hash as HashT, BlakeTwo256};

pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps};
Expand Down Expand Up @@ -182,9 +183,14 @@ impl txpool::Scoring<VerifiedTransaction> for Scoring {
}
}

fn should_replace(&self, old: &VerifiedTransaction, _new: &VerifiedTransaction) -> bool {
// Always replace not fully verified transactions.
!old.is_fully_verified()
fn should_replace(&self, old: &VerifiedTransaction, _new: &VerifiedTransaction) -> Choice {
if old.is_fully_verified() {
// Don't allow new transactions if we are reaching the limit.
Choice::RejectNew
} else {
// Always replace not fully verified transactions.
Choice::ReplaceOld
}
}
}

Expand Down Expand Up @@ -415,6 +421,7 @@ impl<A> ExtrinsicPool<FutureProofUncheckedExtrinsic, BlockId, Hash> for Transact
A: PolkadotApi,
{
type Error = Error;
type InPool = BTreeMap<AccountId, Vec<Bytes>>;

fn submit(&self, block: BlockId, xts: Vec<FutureProofUncheckedExtrinsic>) -> Result<Vec<Hash>> {
xts.into_iter()
Expand Down Expand Up @@ -446,6 +453,17 @@ impl<A> ExtrinsicPool<FutureProofUncheckedExtrinsic, BlockId, Hash> for Transact
fn import_notification_stream(&self) -> EventStream {
self.inner.import_notification_stream()
}

fn all(&self) -> Self::InPool {
self.inner.all(|it| it.fold(Default::default(), |mut map: Self::InPool, tx| {
// Map with `null` key is not serializable, so we fallback to default accountId.
map.entry(tx.sender().unwrap_or_default())
.or_insert_with(Vec::new)
// use bytes type to make it serialize nicer.
.push(Bytes(tx.primitive_extrinsic()));
map
}))
}
}

#[cfg(test)]
Expand Down
9 changes: 9 additions & 0 deletions substrate/extrinsic-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

//! External API for extrinsic pool.

use std::fmt::Debug;

use serde::{Serialize, de::DeserializeOwned};
use txpool;
use futures::sync::mpsc;

Expand Down Expand Up @@ -43,6 +46,9 @@ pub trait ExtrinsicPool<Ex, BlockId, Hash>: Send + Sync + 'static {
/// Error type
type Error: Error;

/// Pooled extrinsics
type InPool: Debug + Serialize + DeserializeOwned + Send + Sync + 'static;

/// Submit a collection of extrinsics to the pool.
fn submit(&self, block: BlockId, xt: Vec<Ex>) -> Result<Vec<Hash>, Self::Error>;

Expand All @@ -54,4 +60,7 @@ pub trait ExtrinsicPool<Ex, BlockId, Hash>: Send + Sync + 'static {

/// Return an event stream of transactions imported to the pool.
fn import_notification_stream(&self) -> EventStream;

/// Return all extrinsics in the pool aggregated by the sender.
fn all(&self) -> Self::InPool;
}
15 changes: 15 additions & 0 deletions substrate/extrinsic-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,19 @@ impl<Hash, VEx, S, E> Pool<Hash, VEx, S, E> where
{
f(self.pool.read().pending(ready))
}

/// Retrieve all transactions in the pool. The transactions might be unordered.
pub fn all<F, T>(&self, f: F) -> T where
F: FnOnce(txpool::UnorderedIterator<VEx, AlwaysReady, S>) -> T,
{
f(self.pool.read().unordered_pending(AlwaysReady))
}
}

/// A Readiness implementation that returns `Ready` for all transactions.
pub struct AlwaysReady;
impl<VEx> txpool::Ready<VEx> for AlwaysReady {
fn is_ready(&mut self, _tx: &VEx) -> txpool::Readiness {
txpool::Readiness::Ready
}
}
1 change: 1 addition & 0 deletions substrate/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ jsonrpc-http-server = { git = "https://github.com/paritytech/jsonrpc.git" }
jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git" }
jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc.git" }
log = "0.3"
serde = "1.0"
substrate-rpc = { path = "../rpc", version = "0.1" }
substrate-runtime-primitives = { path = "../runtime/primitives" }
8 changes: 5 additions & 3 deletions substrate/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern crate jsonrpc_core as rpc;
extern crate jsonrpc_http_server as http;
extern crate jsonrpc_pubsub as pubsub;
extern crate jsonrpc_ws_server as ws;
extern crate serde;
extern crate substrate_runtime_primitives;

#[macro_use]
Expand All @@ -38,16 +39,17 @@ pub type HttpServer = http::Server;
pub type WsServer = ws::Server;

/// Construct rpc `IoHandler`
pub fn rpc_handler<Block: BlockT, S, C, A, Y>(
pub fn rpc_handler<Block, PendingExtrinsics, S, C, A, Y>(
state: S,
chain: C,
author: A,
system: Y,
) -> RpcHandler where
Block: 'static,
Block: BlockT + 'static,
PendingExtrinsics: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
S: apis::state::StateApi<Block::Hash, Metadata=Metadata>,
C: apis::chain::ChainApi<Block::Hash, Block::Header, Metadata=Metadata>,
A: apis::author::AuthorApi<Block::Hash, Block::Extrinsic, Metadata=Metadata>,
A: apis::author::AuthorApi<Block::Hash, Block::Extrinsic, PendingExtrinsics, Metadata=Metadata>,
Y: apis::system::SystemApi,
{
let mut io = pubsub::PubSubHandler::default();
Expand Down
19 changes: 14 additions & 5 deletions substrate/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use self::error::Result;

build_rpc_trait! {
/// Substrate authoring RPC API
pub trait AuthorApi<Hash, Extrinsic> {
pub trait AuthorApi<Hash, Extrinsic, PendingExtrinsics> {
type Metadata;

/// Submit extrinsic for inclusion in block.
Expand All @@ -51,6 +51,10 @@ build_rpc_trait! {
#[rpc(name = "author_submitExtrinsic")]
fn submit_extrinsic(&self, Bytes) -> Result<Hash>;

/// Returns all pending extrinsics, potentially grouped by sender.
#[rpc(name = "author_pendingExtrinsics")]
fn pending_extrinsics(&self) -> Result<PendingExtrinsics>;

#[pubsub(name = "author_extrinsicUpdate")] {
/// Submit an extrinsic to watch.
#[rpc(name = "author_submitAndWatchExtrinsic")]
Expand All @@ -60,7 +64,6 @@ build_rpc_trait! {
#[rpc(name = "author_unwatchExtrinsic")]
fn unwatch_extrinsic(&self, SubscriptionId) -> Result<bool>;
}

}
}

Expand All @@ -85,12 +88,13 @@ impl<B, E, Block: traits::Block, P> Author<B, E, Block, P> {
}
}

impl<B, E, Block, P, Ex, Hash> AuthorApi<Hash, Ex> for Author<B, E, Block, P> where
impl<B, E, Block, P, Ex, Hash, InPool> AuthorApi<Hash, Ex, InPool> for Author<B, E, Block, P> where
B: client::backend::Backend<Block> + Send + Sync + 'static,
E: client::CallExecutor<Block> + Send + Sync + 'static,
Block: traits::Block + 'static,
Hash: traits::MaybeSerializeDebug + Sync + Send + 'static,
P: ExtrinsicPool<Ex, generic::BlockId<Block>, Hash>,
Hash: traits::MaybeSerializeDebug + Send + Sync + 'static,
InPool: traits::MaybeSerializeDebug + Send + Sync + 'static,
P: ExtrinsicPool<Ex, generic::BlockId<Block>, Hash, InPool=InPool>,
P::Error: 'static,
Ex: Codec,
{
Expand All @@ -112,6 +116,10 @@ impl<B, E, Block, P, Ex, Hash> AuthorApi<Hash, Ex> for Author<B, E, Block, P> wh
)
}

fn pending_extrinsics(&self) -> Result<InPool> {
Ok(self.pool.all())
}

fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<Status<Hash>>, xt: Bytes) {

let submit = || -> Result<_> {
Expand Down Expand Up @@ -146,3 +154,4 @@ impl<B, E, Block, P, Ex, Hash> AuthorApi<Hash, Ex> for Author<B, E, Block, P> wh
Ok(self.subscriptions.cancel(id))
}
}

20 changes: 20 additions & 0 deletions substrate/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl fmt::Display for Error {

impl<BlockHash> api::ExtrinsicPool<Extrinsic, BlockHash, u64> for DummyTxPool {
type Error = Error;
type InPool = Vec<u8>;

/// Submit extrinsic for inclusion in block.
fn submit(&self, _block: BlockHash, xt: Vec<Extrinsic>) -> Result<Vec<Hash>, Self::Error> {
Expand Down Expand Up @@ -79,6 +80,10 @@ impl<BlockHash> api::ExtrinsicPool<Extrinsic, BlockHash, u64> for DummyTxPool {
fn import_notification_stream(&self) -> api::EventStream {
unreachable!()
}

fn all(&self) -> Self::InPool {
vec![1, 2, 3, 4, 5]
}
}

#[test]
Expand Down Expand Up @@ -143,3 +148,18 @@ fn should_watch_extrinsic() {
Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":5},"subscription":0}}"#.into())
);
}

#[test]
fn should_return_pending_extrinsics() {
let runtime = runtime::Runtime::new().unwrap();
let p = Author {
client: Arc::new(test_client::new()),
pool: Arc::new(DummyTxPool::default()),
subscriptions: Subscriptions::new(runtime.executor()),
};

assert_matches!(
p.pending_extrinsics(),
Ok(ref expected) if expected == &[1u8, 2, 3, 4, 5]
);
}
3 changes: 2 additions & 1 deletion substrate/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl<Components> Service<Components>
let chain = rpc::apis::chain::Chain::new(client.clone(), task_executor.clone());
let state = rpc::apis::state::State::new(client.clone(), task_executor.clone());
let author = rpc::apis::author::Author::new(client.clone(), extrinsic_pool.api(), task_executor.clone());
rpc::rpc_handler::<ComponentBlock<Components>, _, _, _, _>(

rpc::rpc_handler::<ComponentBlock<Components>, _, _, _, _, _>(
state,
chain,
author,
Expand Down