Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/rpc/api/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ impl Subscriptions {
}
}

/// Borrows the internal task executor.
///
/// This can be used to spawn additional tasks on the underyling event loop.
pub fn executor(&self) -> &TaskExecutor {
&self.executor
}

/// Creates new subscription for given subscriber.
///
/// Second parameter is a function that converts Subscriber sink into a future.
Expand Down
71 changes: 36 additions & 35 deletions core/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ use log::warn;
use client::{self, Client};
use rpc::futures::{
Sink, Future,
stream::Stream as _,
future::result,
};
use futures03::{StreamExt as _, compat::Compat};
use futures03::{StreamExt as _, compat::Compat, future::ready};
use api::Subscriptions;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use codec::{Encode, Decode};
Expand Down Expand Up @@ -162,42 +161,44 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
let best_block_hash = self.client.info().chain.best_hash;
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])
.map_err(error::Error::from)?;
Ok(self.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
.boxed()
.compat()
.map_err(|e| e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
))
Ok(
self.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
.map_err(|e| e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
)
)
};

let future_watcher = match submit() {
Ok(future_watcher) => future_watcher,
Err(err) => {
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
let _ = subscriber.reject(err.into());
return;
},
};

// make 'future' watcher be a future with output = stream of watcher events
let future_watcher = future_watcher
.map_err(|err| { warn!("Failed to submit extrinsic: {}", err); })
.map(|watcher| Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))));

// convert a 'future' watcher into the stream with single element = stream of watcher events
let watcher_stream = future_watcher.into_stream();

// and now flatten the 'watcher_stream' so that we'll have the stream with watcher events
let watcher_stream = watcher_stream.flatten();
let subscriptions = self.subscriptions.clone();
let future = ready(submit())
.and_then(|res| res)
// convert the watcher into a `Stream`
.map(|res| res.map(|watcher| watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
// now handle the import result,
// start a new subscrition
.map(move |result| match result {
Ok(watcher) => {
subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|_| unimplemented!())
.send_all(Compat::new(watcher))
.map(|_| ())
});
},
Err(err) => {
warn!("Failed to submit extrinsic: {}", err);
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
let _ = subscriber.reject(err.into());
},
});

self.subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(watcher_stream)
.map(|_| ())
});
let res = self.subscriptions.executor()
.execute(Box::new(Compat::new(future.map(|_| Ok(())))));
if res.is_err() {
warn!("Error spawning subscription RPC task.");
}
}

fn unwatch_extrinsic(&self, _metadata: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
Expand Down
34 changes: 29 additions & 5 deletions core/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ use super::*;
use std::sync::Arc;
use assert_matches::assert_matches;
use codec::Encode;
use transaction_pool::{
txpool::Pool,
FullChainApi,
};
use primitives::{
H256, blake2_256, hexdisplay::HexDisplay, testing::{ED25519, SR25519, KeyStore}, ed25519,
crypto::Pair,
};
use rpc::futures::Stream as _;
use test_client::{
self, AccountKeyring, runtime::{Extrinsic, Transfer, SessionKeys}, DefaultTestClientBuilderExt,
TestClientBuilderExt,
};
use transaction_pool::{
txpool::Pool,
FullChainApi,
};
use tokio::runtime;

fn uxt(sender: AccountKeyring, nonce: u64) -> Extrinsic {
Expand Down Expand Up @@ -102,7 +103,7 @@ fn should_watch_extrinsic() {
subscriptions: Subscriptions::new(Arc::new(runtime.executor())),
keystore: keystore.clone(),
};
let (subscriber, id_rx, data) = ::jsonrpc_pubsub::typed::Subscriber::new_test("test");
let (subscriber, id_rx, data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");

// when
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 0).encode().into());
Expand Down Expand Up @@ -132,6 +133,29 @@ fn should_watch_extrinsic() {
);
}

#[test]
fn should_return_watch_validation_error() {
//given
let mut runtime = runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new());
let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone())));
let keystore = KeyStore::new();
let p = Author {
client,
pool: pool.clone(),
subscriptions: Subscriptions::new(Arc::new(runtime.executor())),
keystore: keystore.clone(),
};
let (subscriber, id_rx, _data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");

// when
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 179).encode().into());

// then
let res = runtime.block_on(id_rx).unwrap();
assert!(res.is_err(), "Expected the transaction to be rejected as invalid.");
}

#[test]
fn should_return_pending_extrinsics() {
let runtime = runtime::Runtime::new().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion core/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub trait ChainApi: Send + Sync {
/// Error type.
type Error: From<error::Error> + error::IntoPoolError;
/// Validate transaction future.
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send;
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;

/// Verify extrinsic at given block.
fn validate_transaction(
Expand Down