Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ethereum-types = "0.4"
ethjson = { path = "../json" }
ethkey = { path = "../accounts/ethkey" }
evm = { path = "evm" }
futures = "0.1"
hash-db = "0.11.0"
heapsize = "0.4"
itertools = "0.5"
Expand Down
1 change: 1 addition & 0 deletions ethcore/light/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ memory-db = "0.11.0"
trie-db = "0.11.0"
patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" }
ethcore-network = { path = "../../util/network" }
ethcore-miner = { path = "../../miner" }
ethcore-io = { path = "../../util/io" }
hash-db = "0.11.0"
heapsize = "0.4"
Expand Down
1 change: 1 addition & 0 deletions ethcore/light/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ extern crate ethcore_io as io;
extern crate ethcore_network as network;
extern crate parity_bytes as bytes;
extern crate ethereum_types;
extern crate ethcore_miner as miner;
extern crate ethcore;
extern crate hash_db;
extern crate heapsize;
Expand Down
25 changes: 23 additions & 2 deletions ethcore/light/src/transaction_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
//! address-wise manner.

use std::fmt;
use std::sync::Arc;
use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry;

use common_types::transaction::{self, Condition, PendingTransaction, SignedTransaction};
use ethereum_types::{H256, U256, Address};
use fastmap::H256FastMap;
use futures::sync::mpsc;
use miner::pool::TxStatus;

// Knowledge of an account's current nonce.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -134,6 +137,7 @@ pub struct TransactionQueue {
by_account: HashMap<Address, AccountTransactions>,
by_hash: H256FastMap<PendingTransaction>,
listeners: Vec<Listener>,
tx_statuses_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
}

impl fmt::Debug for TransactionQueue {
Expand Down Expand Up @@ -231,7 +235,7 @@ impl TransactionQueue {
};

self.by_hash.insert(hash, tx);
self.notify(&promoted);
self.notify(&promoted, TxStatus::Added);
Ok(res)
}

Expand Down Expand Up @@ -343,6 +347,8 @@ impl TransactionQueue {
trace!(target: "txqueue", "Culled {} old transactions from sender {} (nonce={})",
removed_hashes.len(), address, cur_nonce);

self.notify(&removed_hashes, TxStatus::Culled);

for hash in removed_hashes {
self.by_hash.remove(&hash);
}
Expand All @@ -358,11 +364,26 @@ impl TransactionQueue {
self.listeners.push(f);
}

/// Add a transaction queue listener.
pub fn tx_statuses_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
let (sender, receiver) = mpsc::unbounded();
self.tx_statuses_listeners.push(sender);
receiver
}

/// Notifies all listeners about new pending transaction.
fn notify(&self, hashes: &[H256]) {
fn notify(&mut self, hashes: &[H256], status: TxStatus) {
for listener in &self.listeners {
listener(hashes)
}

let to_send: Arc<Vec<(H256, TxStatus)>> = Arc::new(
Comment thread
seunlanlege marked this conversation as resolved.
hashes
.into_iter()
.map(|hash| (hash.clone(), status)).collect()
);

self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(to_send.clone()).is_ok());
}
}

Expand Down
1 change: 1 addition & 0 deletions ethcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ extern crate ethcore_miner;
extern crate ethereum_types;
extern crate ethjson;
extern crate ethkey;
extern crate futures;
extern crate hash_db;
extern crate heapsize;
extern crate itertools;
Expand Down
10 changes: 9 additions & 1 deletion ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ use bytes::Bytes;
use call_contract::CallContract;
use ethcore_miner::gas_pricer::GasPricer;
use ethcore_miner::local_accounts::LocalAccounts;
use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy};
use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy, TxStatus};
use ethcore_miner::service_transaction_checker::ServiceTransactionChecker;
#[cfg(feature = "work-notify")]
use ethcore_miner::work_notify::NotifyWork;
use ethereum_types::{H256, U256, Address};
use futures::sync::mpsc;
use io::IoChannel;
use miner::pool_client::{PoolClient, CachedNonceClient, NonceCache};
use miner;
Expand Down Expand Up @@ -263,6 +264,13 @@ impl Miner {
self.transaction_queue.add_listener(f);
}

/// Set a callback to be notified
pub fn tx_pool_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
let (sender, receiver) = mpsc::unbounded();
self.transaction_queue.add_tx_pool_listener(sender);
receiver
}

/// Creates new instance of miner Arc.
pub fn new<A: LocalAccounts + 'static>(
options: MinerOptions,
Expand Down
3 changes: 3 additions & 0 deletions miner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ parity-runtime = { path = "../util/runtime" }
parking_lot = "0.7"
price-info = { path = "./price-info", optional = true }
rlp = { version = "0.3.0", features = ["ethereum"] }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
trace-time = "0.1"
transaction-pool = "2.0"

Expand Down
3 changes: 3 additions & 0 deletions miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ extern crate parking_lot;
extern crate price_info;
extern crate rlp;
extern crate transaction_pool as txpool;
extern crate serde;

#[macro_use]
extern crate ethabi_contract;
Expand All @@ -44,6 +45,8 @@ extern crate error_chain;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate trace_time;

#[cfg(test)]
Expand Down
61 changes: 61 additions & 0 deletions miner/src/pool/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use std::fmt;
use std::sync::Arc;

use ethereum_types::H256;
use futures::sync::mpsc;
use txpool::{self, VerifiedTransaction};

use pool::VerifiedTransaction as Transaction;
use pool::TxStatus;

type Listener = Box<Fn(&[H256]) + Send + Sync>;

Expand Down Expand Up @@ -116,6 +118,65 @@ impl txpool::Listener<Transaction> for Logger {
}
}

/// Transactions pool notifier
#[derive(Default)]
pub struct TransactionsPoolNotifier {
listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
tx_statuses: Vec<(H256, TxStatus)>,
}

impl TransactionsPoolNotifier {
/// Add new listener to receive notifications.
pub fn add(&mut self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
self.listeners.push(f);
}

/// Notify listeners about all currently transactions.
pub fn notify(&mut self) {
if self.tx_statuses.is_empty() {
return;
}

let to_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
self.listeners
.retain(|listener| listener.unbounded_send(to_send.clone()).is_ok());
}
}

impl fmt::Debug for TransactionsPoolNotifier {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("TransactionsPoolNotifier")
.field("listeners", &self.listeners.len())
.finish()
}
}

impl txpool::Listener<Transaction> for TransactionsPoolNotifier {
fn added(&mut self, tx: &Arc<Transaction>, _old: Option<&Arc<Transaction>>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Added));
}

fn rejected<H: fmt::Debug + fmt::LowerHex>(&mut self, tx: &Arc<Transaction>, _reason: &txpool::Error<H>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Rejected));
}

fn dropped(&mut self, tx: &Arc<Transaction>, _new: Option<&Transaction>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Dropped));
}

fn invalid(&mut self, tx: &Arc<Transaction>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Invalid));
}

fn canceled(&mut self, tx: &Arc<Transaction>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Canceled));
}

fn culled(&mut self, tx: &Arc<Transaction>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Culled));
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
18 changes: 18 additions & 0 deletions miner/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,21 @@ impl ScoredTransaction for VerifiedTransaction {
self.transaction.nonce
}
}

/// Pool transactions status
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum TxStatus {
/// Added transaction
Added,
/// Rejected transaction
Rejected,
/// Dropped transaction
Dropped,
/// Invalid transaction
Invalid,
/// Canceled transaction
Canceled,
/// Culled transaction
Culled,
}
13 changes: 11 additions & 2 deletions miner/src/pool/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ use std::sync::atomic::{self, AtomicUsize};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use ethereum_types::{H256, U256, Address};
use futures::sync::mpsc;
use parking_lot::RwLock;
use txpool::{self, Verifier};
use types::transaction;

use pool::{
self, replace, scoring, verifier, client, ready, listener,
PrioritizationStrategy, PendingOrdering, PendingSettings,
PrioritizationStrategy, PendingOrdering, PendingSettings, TxStatus
};
use pool::local_transactions::LocalTransactionsList;

type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger));
type Listener = (LocalTransactionsList, (listener::Notifier, (listener::Logger, listener::TransactionsPoolNotifier)));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That starts to look pretty ugly, we could have a macro in parity-common to implement Listener for tuples of different sizes.
(Note it's not strictly related to this PR)

type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, Listener>;

/// Max cache time in milliseconds for pending transactions.
Expand Down Expand Up @@ -304,6 +305,8 @@ impl TransactionQueue {
// Notify about imported transactions.
(self.pool.write().listener_mut().1).0.notify();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This listener now seems obsolete, since it just duplicates the same data and is less efficient than the new channel-based one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomusdrw

Looks nice now, I'd like to see the other Notifier rewritten to futures as well, but we can leave that as a separate PR - please create an issue for this if that happens to be the case.

Ok, then let`s do it in other PR

This listener now seems obsolete, since it just duplicates the same data and is less efficient than the new channel-based one.

Maybe we can make one Notifier by combining old one Notifier and TransactionsPoolNotifier and store two different lists of listeners for parity_watchTransactionsPool and eth_pubsub in it?


((self.pool.write().listener_mut().1).1).1.notify();

if results.iter().any(|r| r.is_ok()) {
self.cached_pending.write().clear();
}
Expand Down Expand Up @@ -574,6 +577,12 @@ impl TransactionQueue {
(pool.listener_mut().1).0.add(f);
}

/// Add a listener to be notified about all transactions the pool
pub fn add_tx_pool_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
let mut pool = self.pool.write();
((pool.listener_mut().1).1).1.add(f);
}

/// Check if pending set is cached.
#[cfg(test)]
pub fn is_pending_cached(&self) -> bool {
Expand Down
6 changes: 3 additions & 3 deletions parity/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ usage! {
"--jsonrpc-interface=[IP]",
"Specify the hostname portion of the HTTP JSON-RPC API server, IP should be an interface's IP address, or all (all interfaces) or local.",

ARG arg_jsonrpc_apis: (String) = "web3,eth,pubsub,net,parity,private,parity_pubsub,traces,rpc,shh,shh_pubsub", or |c: &Config| c.rpc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")),
ARG arg_jsonrpc_apis: (String) = "web3,eth,pubsub,net,parity,private,parity_pubsub,traces,rpc,shh,shh_pubsub,parity_transactions_pool", or |c: &Config| c.rpc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")),
"--jsonrpc-apis=[APIS]",
"Specify the APIs available through the HTTP JSON-RPC interface using a comma-delimited list of API names. Possible names are: all, safe, debug, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub",

Expand Down Expand Up @@ -539,7 +539,7 @@ usage! {
"--ws-interface=[IP]",
"Specify the hostname portion of the WebSockets JSON-RPC server, IP should be an interface's IP address, or all (all interfaces) or local.",

ARG arg_ws_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,private,traces,rpc,shh,shh_pubsub", or |c: &Config| c.websockets.as_ref()?.apis.as_ref().map(|vec| vec.join(",")),
ARG arg_ws_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,private,traces,rpc,shh,shh_pubsub,parity_transactions_pool", or |c: &Config| c.websockets.as_ref()?.apis.as_ref().map(|vec| vec.join(",")),
"--ws-apis=[APIS]",
"Specify the JSON-RPC APIs available through the WebSockets interface using a comma-delimited list of API names. Possible names are: all, safe, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub",

Expand All @@ -564,7 +564,7 @@ usage! {
"--ipc-path=[PATH]",
"Specify custom path for JSON-RPC over IPC service.",

ARG arg_ipc_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,private,traces,rpc,shh,shh_pubsub", or |c: &Config| c.ipc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")),
ARG arg_ipc_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,private,traces,rpc,shh,shh_pubsub,parity_transactions_pool", or |c: &Config| c.ipc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")),
"--ipc-apis=[APIS]",
"Specify custom API set available via JSON-RPC over IPC using a comma-delimited list of API names. Possible names are: all, safe, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub",

Expand Down
Loading