Watch transactions pool#10558
Conversation
|
It looks like @IntegralTeam hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement at the following URL: https://cla.parity.io Once you've signed, please reply to this thread with Many thanks, Parity Technologies CLA Bot |
|
[clabot:check] |
|
It looks like @IntegralTeam signed our Contributor License Agreement. 👍 Many thanks, Parity Technologies CLA Bot |
tomusdrw
left a comment
There was a problem hiding this comment.
Looks really good for a WiP, few things to polish.
| } | ||
|
|
||
| /// Set a callback to be notified | ||
| pub fn get_tx_pool_receiver(&self) -> mpsc::Receiver<(H256, String)> { |
| /// Transactions pool notifier | ||
| #[derive(Default)] | ||
| pub struct TransactionsPoolNotifier { | ||
| listeners: Vec<Arc<Mutex<mpsc::Sender<(H256, String)>>>>, |
There was a problem hiding this comment.
We don't really need Arc<Mutex here afaict.
| listeners: Vec<Arc<Mutex<mpsc::Sender<(H256, String)>>>>, | |
| listeners: Vec<mpsc::Sender<(H256, String)>>, |
There was a problem hiding this comment.
@tomusdrw If I delete the Arc<Mutex<, it leads to errors
Compiling ethcore v1.12.0 (/home/user/Work/parity-ethereum/ethcore)
error[E0277]: `std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>` cannot be shared between threads safely
--> ethcore/src/client/client.rs:1305:6
|
1305 | impl snapshot::DatabaseRestore for Client {
| ^^^^^^^^^^^^^^^^^^^^^^^^^ `std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>` cannot be shared between threads safely
|
= help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>`
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>>`
= note: required because it appears within the type `alloc::raw_vec::RawVec<std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>>`
= note: required because it appears within the type `std::vec::Vec<std::sync::mpsc::Sender<ethcore_miner::pool::listener::TxStatus>>`
= note: required because it appears within the type `ethcore_miner::pool::listener::TransactionsPoolNotifier`
= note: required because it appears within the type `(ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier)`
= note: required because it appears within the type `(ethcore_miner::pool::listener::Notifier, (ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier))`
= note: required because it appears within the type `(ethcore_miner::pool::local_transactions::LocalTransactionsList, (ethcore_miner::pool::listener::Notifier, (ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier)))`
= note: required because it appears within the type `transaction_pool::pool::Pool<ethcore_miner::pool::VerifiedTransaction, ethcore_miner::pool::scoring::NonceAndGasPrice, (ethcore_miner::pool::local_transactions::LocalTransactionsList, (ethcore_miner::pool::listener::Notifier, (ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier)))>`
= note: required because of the requirements on the impl of `std::marker::Sync` for `lock_api::rwlock::RwLock<parking_lot::RawRwLock, transaction_pool::pool::Pool<ethcore_miner::pool::VerifiedTransaction, ethcore_miner::pool::scoring::NonceAndGasPrice, (ethcore_miner::pool::local_transactions::LocalTransactionsList, (ethcore_miner::pool::listener::Notifier, (ethcore_miner::pool::listener::Logger, ethcore_miner::pool::listener::TransactionsPoolNotifier)))>>`
= note: required because it appears within the type `ethcore_miner::pool::TransactionQueue`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<ethcore_miner::pool::TransactionQueue>`
= note: required because it appears within the type `miner::miner::Miner`
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::Arc<miner::miner::Miner>`
= note: required because it appears within the type `client::client::Importer`
= note: required because it appears within the type `client::client::Client`
...
Found this: Allowing Access from Multiple Threads with Sync
And it seems to me that either Mutex is needed here, or I don't understand what am I doing wrong.
There was a problem hiding this comment.
Ah, I see. Yes, unfortunatelly we can't satisfy Sync with std::mpsc::Sender. To overcome this I'd go with mpsc::unbounded_channel from futures crate, this will also allow us to get rid of this extra thread responsible for polling the channel.
| impl TransactionsPoolNotifier { | ||
| /// Add new listener to receive notifications. | ||
| pub fn add(&mut self, f: mpsc::Sender<(H256, String)>) { | ||
| self.listeners.push(Arc::new(Mutex::new(f))); |
There was a problem hiding this comment.
| self.listeners.push(Arc::new(Mutex::new(f))); | |
| self.listeners.push(f); |
| /// Notify listeners about all currently transactions. | ||
| fn notify(& mut self, hash: H256, status: String) { | ||
| for l in &self.listeners { | ||
| let l = l.lock(); |
There was a problem hiding this comment.
This would also clear dropped listeners:
| let l = l.lock(); | |
| self.listeners | |
| .retain(|sender| sender.send((hash, status.clone()).is_ok()) |
| impl txpool::Listener<Transaction> for TransactionsPoolNotifier { | ||
| fn added(&mut self, tx: &Arc<Transaction>, _old: Option<&Arc<Transaction>>) { | ||
| let hash = tx.hash.clone(); | ||
| self.notify(hash, "added".to_string()); |
There was a problem hiding this comment.
I think it would be better to have an enum like this:
#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
enum Status {
Added,
Rejected,
...
Culled,
}
And the same status enum should be sent in the notification.
| } | ||
| pub fn run(&self, pool_receiver: mpsc::Receiver<(H256, String)>) { | ||
| let handler = self.handler.clone(); | ||
| thread::spawn(move || loop { |
There was a problem hiding this comment.
Can't we use future-aware channels instead to avoid spawning a new thread?
|
|
||
| } | ||
|
|
||
| pub struct TransactionsNotificationHandler { |
|
|
||
| pub fn notify_transaction(&self, status: (H256, String)) { | ||
| for subscriber in self.transactions_pool_subscribers.read().values() { | ||
| let status = (status.0.clone(), status.1.clone()); |
There was a problem hiding this comment.
| let status = (status.0.clone(), status.1.clone()); | |
| let status = status.clone(); |
or even inline it into the next line.
|
|
||
| /// Subscribe to Transactions Pool subscription. | ||
| #[pubsub(subscription = "parity_watchTransactionsPool", subscribe, name = "parity_watchTransactionsPool")] | ||
| fn subscribe(&self, Self::Metadata, typed::Subscriber<pubsub::Result>, Option<pubsub::Params>); |
There was a problem hiding this comment.
If you don't want any params just leave this out:
| fn subscribe(&self, Self::Metadata, typed::Subscriber<pubsub::Result>, Option<pubsub::Params>); | |
| fn subscribe(&self, Self::Metadata, typed::Subscriber<pubsub::Result>); |
| impl TransactionsPool for TransactionsPoolClient { | ||
| type Metadata = Metadata; | ||
|
|
||
| fn subscribe(&self, _meta: Metadata, subscriber: Subscriber<pubsub::Result>, params: Option<pubsub::Params>) { |
There was a problem hiding this comment.
using pubsub::Result here kind of suggests that we might get different values of this enum as well. I think it should be safe to just use the tuple directly:
| fn subscribe(&self, _meta: Metadata, subscriber: Subscriber<pubsub::Result>, params: Option<pubsub::Params>) { | |
| fn subscribe(&self, _meta: Metadata, subscriber: Subscriber<(H256, Status)>, params: Option<pubsub::Params>) { |
| self.notify(hash, TxStatus::Added); | ||
| } | ||
|
|
||
| fn rejected(&mut self, tx: &Arc<Transaction>, _reason: &txpool::ErrorKind) { |
There was a problem hiding this comment.
I just realised that this might be called really frequently, and currently we need to iterate over all senders and have a unique lock for this, so this will have drastic performance consequences.
Instead I think we should do something very similar that we do with Notifer in the same file, i.e:
- We first aggregate all the notifications that should be sent
- Then we send them all at once after batch import to the queue is finished.
It might be even worth to remove Notifier and convert this other notification to use this (channel-based) notifier instead.
struct Notifier {
pending: HashMap<H256, TxStatus>,
listeners: Vec<futures::sync::mpsc::UnboundedSender<Arc<HashMap<H256, TxStatus>>>>,
}
impl Notifier {
fn notify(&mut self) {
let to_send = Arc::new(std::mem::replace(&mut self.pending, HashMap::new));
self.listeners.retain(|listener| listener.unbounded_send(to_send.clone()).is_ok());
}
}
impl txpool::Listener<Transaction> for TransactionPoolNotifier {
fn added(&mut self, ...) {
self.pending.insert(tx.hash, TxStatus::Added);
}
...
}There was a problem hiding this comment.
@IntegralTeam Please let me know if you need any more explanations or help with this.
There was a problem hiding this comment.
@tomusdrw Thank you, I've got what I need to do.
But, I think, it's not quite right to use HashMap here, because transaction firstly could be added with according Added status, and then before sending notification, same transaction could get status Culled and it will lead that Added status will be erased from HashMap
Maybe it will be better to use tuple vector Vec<(H256, TxStatus)>?
There was a problem hiding this comment.
@IntegralTeam good point, yeah if we never want to miss any event and have them in order, then Vec is indeed a good choice here.
There was a problem hiding this comment.
Ok, let's do it this way.
tomusdrw
left a comment
There was a problem hiding this comment.
Looks nice now, I'd like to see the other Notifier rewritten to futures as well, but we can leave that as a separate PR - please create an issue for this if that happens to be the case.
| @@ -303,6 +303,8 @@ impl TransactionQueue { | |||
| // Notify about imported transactions. | |||
| (self.pool.write().listener_mut().1).0.notify(); | |||
There was a problem hiding this comment.
This listener now seems obsolete, since it just duplicates the same data and is less efficient than the new channel-based one.
There was a problem hiding this comment.
Looks nice now, I'd like to see the other
Notifierrewritten to futures as well, but we can leave that as a separate PR - please create an issue for this if that happens to be the case.
Ok, then let`s do it in other PR
This listener now seems obsolete, since it just duplicates the same data and is less efficient than the new channel-based one.
Maybe we can make one Notifier by combining old one Notifier and TransactionsPoolNotifier and store two different lists of listeners for parity_watchTransactionsPool and eth_pubsub in it?
# Conflicts: # ethcore/src/miner/miner.rs # miner/src/pool/queue.rs # rpc/src/v1/types/pubsub.rs
|
@tomusdrw Please review |
tomusdrw
left a comment
There was a problem hiding this comment.
One last tiny grumble regarding recently added commit, but looks good otherwise.
Could you also create an issue to merge the two listeners?
|
|
||
| let to_send: Vec<(H256, TxStatus)> = hashes.into_iter().map(|hash| (hash.clone(), status)).collect(); | ||
|
|
||
| self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(Arc::new(to_send.clone())).is_ok()); |
There was a problem hiding this comment.
That's incorrect, the reason we have Arc is to avoid cloning the entire Vec. So:
- First create the
Arc - Then clone
ArcnotVec
18b0800 to
286ba53
Compare
|
Perfect 👌 |
Issue #9713
Still in progress, but would like some review on current approach.
What is not ready:
I will add more if feedback will be positive