From 843b2ba655d6b6cb9f8b007313aff03bdea1d720 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Tue, 8 May 2018 16:17:28 +0200 Subject: [PATCH 01/19] Verify private transaction before propagating --- ethcore/private-tx/src/lib.rs | 98 +++++++++---------- ethcore/private-tx/src/messages.rs | 5 + .../private-tx/src/private_transactions.rs | 17 ++-- ethcore/src/client/chain_notify.rs | 4 +- ethcore/src/test_helpers.rs | 4 +- ethcore/sync/src/api.rs | 6 +- ethcore/sync/src/chain/handler.rs | 2 + ethcore/sync/src/chain/mod.rs | 51 ++++++++-- ethcore/sync/src/chain/propagator.rs | 16 ++- ethcore/sync/src/tests/helpers.rs | 6 +- 10 files changed, 127 insertions(+), 82 deletions(-) diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 31abdb1eca5..9ef2ef937ea 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -216,7 +216,7 @@ impl Provider where { let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); trace!("Hashed effective private state for sender: {:?}", private_state_hash); self.transactions_for_signing.lock().add_transaction(private.hash(), signed_transaction, contract_validators, private_state, contract_nonce)?; - self.broadcast_private_transaction(private.rlp_bytes().into_vec()); + self.broadcast_private_transaction(private.hash(), private.rlp_bytes().into_vec()); Ok(Receipt { hash: tx_hash, contract_address: None, @@ -269,24 +269,34 @@ impl Provider where { let transaction_hash = transaction.signed().hash(); match verification_queue.private_transaction_descriptor(&transaction_hash) { Ok(desc) => { - if !self.validator_accounts.contains(&desc.validator_account) { - trace!("Cannot find validator account in config"); - bail!(ErrorKind::ValidatorAccountNotSet); - } - let account = desc.validator_account; - if let Action::Call(contract) = transaction.signed().action { - // TODO [ToDr] Usage of BlockId::Latest - let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; - let private_state = self.execute_private_transaction(BlockId::Latest, transaction.signed())?; - let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); - trace!("Hashed effective private state for validator: {:?}", private_state_hash); - let password = find_account_password(&self.passwords, &*self.accounts, &account); - let signed_state = self.accounts.sign(account, password, private_state_hash)?; - let signed_private_transaction = SignedPrivateTransaction::new(desc.private_hash, signed_state, None); - trace!("Sending signature for private transaction: {:?}", signed_private_transaction); - self.broadcast_signed_private_transaction(signed_private_transaction.rlp_bytes().into_vec()); - } else { - warn!("Incorrect type of action for the transaction"); + let private_hash = desc.private_transaction.hash(); + match desc.validator_account { + None => { + trace!("Propagating transaction further"); + self.broadcast_private_transaction(private_hash, desc.private_transaction.rlp_bytes().into_vec()); + return Ok(()) + } + Some(validator_account) => { + if !self.validator_accounts.contains(&validator_account) { + trace!("Propagating transaction further"); + self.broadcast_private_transaction(private_hash, desc.private_transaction.rlp_bytes().into_vec()); + return Ok(()) + } + if let Action::Call(contract) = transaction.signed().action { + // TODO [ToDr] Usage of BlockId::Latest + let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; + let private_state = self.execute_private_transaction(BlockId::Latest, transaction.signed())?; + let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); + trace!("Hashed effective private state for validator: {:?}", private_state_hash); + let password = find_account_password(&self.passwords, &*self.accounts, &validator_account); + let signed_state = self.accounts.sign(validator_account, password, private_state_hash)?; + let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None); + trace!("Sending signature for private transaction: {:?}", signed_private_transaction); + self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes().into_vec()); + } else { + warn!("Incorrect type of action for the transaction"); + } + } } }, Err(e) => { @@ -324,13 +334,13 @@ impl Provider where { } /// Broadcast the private transaction message to the chain - fn broadcast_private_transaction(&self, message: Bytes) { - self.notify(|notify| notify.broadcast(ChainMessageType::PrivateTransaction(message.clone()))); + fn broadcast_private_transaction(&self, transaction_hash: H256, message: Bytes) { + self.notify(|notify| notify.broadcast(ChainMessageType::PrivateTransaction(transaction_hash.clone(), message.clone()))); } /// Broadcast signed private transaction message to the chain - fn broadcast_signed_private_transaction(&self, message: Bytes) { - self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(message.clone()))); + fn broadcast_signed_private_transaction(&self, transaction_hash: H256, message: Bytes) { + self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(transaction_hash.clone(), message.clone()))); } fn iv_from_transaction(transaction: &SignedTransaction) -> H128 { @@ -567,40 +577,22 @@ impl Importer for Arc { .iter() .find(|address| self.validator_accounts.contains(address)); - match validation_account { - None => { - // TODO [ToDr] This still seems a bit invalid, imho we should still import the transaction to the pool. - // Importing to pool verifies correctness and nonce; here we are just blindly forwarding. - // - // Not for verification, broadcast further to peers - self.broadcast_private_transaction(rlp.into()); - return Ok(()); - }, - Some(&validation_account) => { - let hash = private_tx.hash(); - trace!("Private transaction taken for verification"); - let original_tx = self.extract_original_transaction(private_tx, &contract)?; - trace!("Validating transaction: {:?}", original_tx); - // Verify with the first account available - trace!("The following account will be used for verification: {:?}", validation_account); - let nonce_cache = Default::default(); - self.transactions_for_verification.lock().add_transaction( - original_tx, - contract, - validation_account, - hash, - self.pool_client(&nonce_cache), - )?; - let provider = Arc::downgrade(self); - self.channel.send(ClientIoMessage::execute(move |_| { + let original_tx = self.extract_original_transaction(private_tx.clone(), &contract)?; + trace!("Original transaction: {:?}", original_tx); + let nonce_cache = Default::default(); + self.transactions_for_verification.lock().add_transaction( + original_tx, + validation_account.map(|&account| account), + private_tx, + self.pool_client(&nonce_cache), + )?; + self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { if let Err(e) = provider.process_queue() { debug!("Unable to process the queue: {}", e); } } })).map_err(|_| ErrorKind::ClientIsMalformed.into()) - } - } } fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> { @@ -609,10 +601,8 @@ impl Importer for Arc { let private_hash = tx.private_transaction_hash(); let desc = match self.transactions_for_signing.lock().get(&private_hash) { None => { - // TODO [ToDr] Verification (we can't just blindly forward every transaction) - // Not our transaction, broadcast further to peers - self.broadcast_signed_private_transaction(rlp.into()); + self.broadcast_signed_private_transaction(tx.hash(), rlp.into()); return Ok(()); }, Some(desc) => desc, diff --git a/ethcore/private-tx/src/messages.rs b/ethcore/private-tx/src/messages.rs index 57362e7ce62..c9bfb2cbfb3 100644 --- a/ethcore/private-tx/src/messages.rs +++ b/ethcore/private-tx/src/messages.rs @@ -73,4 +73,9 @@ impl SignedPrivateTransaction { pub fn private_transaction_hash(&self) -> H256 { self.private_transaction_hash } + + /// Own hash + pub fn hash(&self) -> H256 { + keccak(&*self.rlp_bytes()) + } } diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index fcc6da514ed..15abdfeec85 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -21,6 +21,7 @@ use bytes::Bytes; use ethcore_miner::pool; use ethereum_types::{H256, U256, Address}; use ethkey::Signature; +use messages::PrivateTransaction; use transaction::{UnverifiedTransaction, SignedTransaction}; use error::{Error, ErrorKind}; @@ -31,12 +32,10 @@ const MAX_QUEUE_LEN: usize = 8312; /// Desriptor for private transaction stored in queue for verification #[derive(Default, Debug, Clone, PartialEq, Eq)] pub struct PrivateTransactionDesc { - /// Hash of the private transaction - pub private_hash: H256, - /// Contract's address used in private transaction - pub contract: Address, + /// Original private transaction + pub private_transaction: PrivateTransaction, /// Address that should be used for verification - pub validator_account: Address, + pub validator_account: Option
, } /// Storage for private transactions for verification @@ -79,9 +78,8 @@ impl VerificationStore { pub fn add_transaction( &mut self, transaction: UnverifiedTransaction, - contract: Address, - validator_account: Address, - private_hash: H256, + validator_account: Option
, + private_transaction: PrivateTransaction, client: C, ) -> Result<(), Error> { if self.descriptors.len() > MAX_QUEUE_LEN { @@ -104,8 +102,7 @@ impl VerificationStore { .expect("One transaction inserted; one result returned; qed")?; self.descriptors.insert(transaction_hash, PrivateTransactionDesc { - private_hash, - contract, + private_transaction, validator_account, }); diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 62de03591df..ebfe7bdef5c 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -26,9 +26,9 @@ pub enum ChainMessageType { /// Consensus message Consensus(Vec), /// Message with private transaction - PrivateTransaction(Vec), + PrivateTransaction(H256, Vec), /// Message with signed private transaction - SignedPrivateTransaction(Vec), + SignedPrivateTransaction(H256, Vec), } /// Route type to indicate whether it is enacted or retracted. diff --git a/ethcore/src/test_helpers.rs b/ethcore/src/test_helpers.rs index 4a83752c046..c6f4337e5a7 100644 --- a/ethcore/src/test_helpers.rs +++ b/ethcore/src/test_helpers.rs @@ -403,8 +403,8 @@ impl ChainNotify for TestNotify { fn broadcast(&self, message: ChainMessageType) { let data = match message { ChainMessageType::Consensus(data) => data, - ChainMessageType::SignedPrivateTransaction(data) => data, - ChainMessageType::PrivateTransaction(data) => data, + ChainMessageType::SignedPrivateTransaction(_, data) => data, + ChainMessageType::PrivateTransaction(_, data) => data, }; self.messages.write().push(data); } diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 56bc579ad88..686c6d340e7 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -502,8 +502,10 @@ impl ChainNotify for EthSync { let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay); match message_type { ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message), - ChainMessageType::PrivateTransaction(message) => self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, message), - ChainMessageType::SignedPrivateTransaction(message) => self.eth_handler.sync.write().propagate_signed_private_transaction(&mut sync_io, message), + ChainMessageType::PrivateTransaction(transaction_hash, message) => + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, message), + ChainMessageType::SignedPrivateTransaction(transaction_hash, message) => + self.eth_handler.sync.write().propagate_signed_private_transaction(&mut sync_io, transaction_hash, message), } }); } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 75111a4d4a6..14349981de2 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -626,6 +626,8 @@ impl SyncHandler { asking_hash: None, ask_time: Instant::now(), last_sent_transactions: HashSet::new(), + last_sent_private_transactions: HashSet::new(), + last_sent_signed_private_transactions: HashSet::new(), expired: false, confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed }, asking_snapshot_data: None, diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 84e6344e688..fdfda275108 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -329,6 +329,10 @@ pub struct PeerInfo { ask_time: Instant, /// Holds a set of transactions recently sent to this peer to avoid spamming. last_sent_transactions: HashSet, + /// Holds a set of private transactions recently sent to this peer to avoid spamming. + last_sent_private_transactions: HashSet, + /// Holds a set of signed private transactions recently sent to this peer to avoid spamming. + last_sent_signed_private_transactions: HashSet, /// Pending request is expired and result should be ignored expired: bool, /// Peer fork confirmation status @@ -358,6 +362,11 @@ impl PeerInfo { self.expired = true; } } + + fn reset_private_stats(&mut self) { + self.last_sent_private_transactions.clear(); + self.last_sent_signed_private_transactions.clear(); + } } #[cfg(not(test))] @@ -1050,8 +1059,33 @@ impl ChainSync { self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect() } - fn get_private_transaction_peers(&self) -> Vec { - self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 { Some(*id) } else { None }).collect() + fn get_private_transaction_peers(&self, transaction_hash: &H256) -> Vec { + self.peers.iter().filter_map( + |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3 + && !p.last_sent_private_transactions.contains(transaction_hash) { + Some(*id) + } else { + None + } + ).collect() + } + + fn get_signed_private_transaction_peers(&self, transaction_hash: &H256) -> Vec { + self.peers.iter().filter_map( + |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3 + && !p.last_sent_signed_private_transactions.contains(transaction_hash) { + Some(*id) + } else { + None + } + ).collect() + } + + /// Clear private packets stats from peer infos + fn clear_private_stats(&mut self) { + for (_, ref mut peer) in &mut self.peers { + peer.reset_private_stats(); + } } /// Maintain other peers. Send out any new blocks and transactions @@ -1083,6 +1117,9 @@ impl ChainSync { peer_info.last_sent_transactions.clear() ); } + + // reset stats for private transaction packets + self.clear_private_stats(); } /// Dispatch incoming requests and responses @@ -1121,13 +1158,13 @@ impl ChainSync { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, packet: Bytes) { - SyncPropagator::propagate_private_transaction(self, io, packet); + pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { + SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet); } /// Broadcast signed private transaction message to peers. - pub fn propagate_signed_private_transaction(&mut self, io: &mut SyncIo, packet: Bytes) { - SyncPropagator::propagate_signed_private_transaction(self, io, packet); + pub fn propagate_signed_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { + SyncPropagator::propagate_signed_private_transaction(self, io, transaction_hash, packet); } } @@ -1250,6 +1287,8 @@ pub mod tests { asking_hash: None, ask_time: Instant::now(), last_sent_transactions: HashSet::new(), + last_sent_private_transactions: HashSet::new(), + last_sent_signed_private_transactions: HashSet::new(), expired: false, confirmation: super::ForkConfirmation::Confirmed, snapshot_number: None, diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 75cf550f28b..298c1627407 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -289,19 +289,25 @@ impl SyncPropagator { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) { - let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers()); + pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { + let lucky_peers = ChainSync::select_random_peers(&self.get_private_transaction_peers(&transaction_hash)); trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers); for peer_id in lucky_peers { + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.last_sent_private_transactions.insert(transaction_hash); + } SyncPropagator::send_packet(io, peer_id, PRIVATE_TRANSACTION_PACKET, packet.clone()); } } /// Broadcast signed private transaction message to peers. - pub fn propagate_signed_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) { - let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers()); + pub fn propagate_signed_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { + let lucky_peers = ChainSync::select_random_peers(&self.get_signed_private_transaction_peers(&transaction_hash)); trace!(target: "sync", "Sending signed private transaction packet to {:?}", lucky_peers); for peer_id in lucky_peers { + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.last_sent_signed_private_transactions.insert(transaction_hash); + } SyncPropagator::send_packet(io, peer_id, SIGNED_PRIVATE_TRANSACTION_PACKET, packet.clone()); } } @@ -424,6 +430,8 @@ mod tests { asking_hash: None, ask_time: Instant::now(), last_sent_transactions: HashSet::new(), + last_sent_private_transactions: HashSet::new(), + last_sent_signed_private_transactions: HashSet::new(), expired: false, confirmation: ForkConfirmation::Confirmed, snapshot_number: None, diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 112dab8a986..4f8bf0ed06d 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -229,8 +229,10 @@ impl EthPeer where C: FlushingBlockChainClient { let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); match message { ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data), - ChainMessageType::PrivateTransaction(data) => self.sync.write().propagate_private_transaction(&mut io, data), - ChainMessageType::SignedPrivateTransaction(data) => self.sync.write().propagate_signed_private_transaction(&mut io, data), + ChainMessageType::PrivateTransaction(transaction_hash, data) => + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, data), + ChainMessageType::SignedPrivateTransaction(transaction_hash, data) => + self.sync.write().propagate_signed_private_transaction(&mut io, transaction_hash, data), } } From fbdf3de87a830e975e81cdb47d245af248ea1621 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Wed, 23 May 2018 15:56:27 +0200 Subject: [PATCH 02/19] Private transactions queue reworked with tx pool queue direct usage --- Cargo.lock | 2 + ethcore/private-tx/Cargo.toml | 2 + ethcore/private-tx/src/error.rs | 13 + ethcore/private-tx/src/lib.rs | 79 +++--- .../private-tx/src/private_transactions.rs | 259 +++++++++++++----- 5 files changed, 235 insertions(+), 120 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1b156dbaff..ba056290ea8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -770,6 +770,7 @@ dependencies = [ "ethkey 0.3.0", "fetch 0.1.0", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hash 0.1.2", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -782,6 +783,7 @@ dependencies = [ "serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "transaction-pool 1.12.0", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/ethcore/private-tx/Cargo.toml b/ethcore/private-tx/Cargo.toml index 441c9882e29..52a554ca873 100644 --- a/ethcore/private-tx/Cargo.toml +++ b/ethcore/private-tx/Cargo.toml @@ -22,6 +22,7 @@ ethjson = { path = "../../json" } ethkey = { path = "../../ethkey" } fetch = { path = "../../util/fetch" } futures = "0.1" +heapsize = "0.4" keccak-hash = { path = "../../util/hash" } log = "0.3" parking_lot = "0.5" @@ -34,6 +35,7 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" tiny-keccak = "1.4" +transaction-pool = { path = "../../transaction-pool" } url = "1" [dev-dependencies] diff --git a/ethcore/private-tx/src/error.rs b/ethcore/private-tx/src/error.rs index 0456b330530..0231eedbccd 100644 --- a/ethcore/private-tx/src/error.rs +++ b/ethcore/private-tx/src/error.rs @@ -21,6 +21,7 @@ use ethcore::account_provider::SignError; use ethcore::error::{Error as EthcoreError, ExecutionError}; use transaction::Error as TransactionError; use ethkey::Error as KeyError; +use txpool::Error as TxPoolError; error_chain! { foreign_links { @@ -167,6 +168,12 @@ error_chain! { description("General ethcore error."), display("General ethcore error {}", err), } + + #[doc = "Tx pool error."] + Txpool(err: TxPoolError) { + description("Tx pool error."), + display("Tx pool error {}", err), + } } } @@ -200,6 +207,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: TxPoolError) -> Self { + ErrorKind::Txpool(err).into() + } +} + impl From> for Error where Error: From { fn from(err: Box) -> Error { Error::from(*err) diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 9ef2ef937ea..8b6343eb00b 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -37,9 +37,11 @@ extern crate ethkey; extern crate ethjson; extern crate fetch; extern crate futures; +extern crate heapsize; extern crate keccak_hash as hash; extern crate parking_lot; extern crate patricia_trie as trie; +extern crate transaction_pool as txpool; extern crate rlp; extern crate url; extern crate rustc_hex; @@ -60,7 +62,7 @@ extern crate rand; extern crate ethcore_logger; pub use encryptor::{Encryptor, SecretStoreEncryptor, EncryptorConfig, NoopEncryptor}; -pub use private_transactions::{PrivateTransactionDesc, VerificationStore, PrivateTransactionSigningDesc, SigningStore}; +pub use private_transactions::{VerifiedPrivateTransaction, VerificationStore, PrivateTransactionSigningDesc, SigningStore}; pub use messages::{PrivateTransaction, SignedPrivateTransaction}; pub use error::{Error, ErrorKind}; @@ -124,8 +126,7 @@ pub struct Provider { passwords: Vec, notify: RwLock>>, transactions_for_signing: Mutex, - // TODO [ToDr] Move the Mutex/RwLock inside `VerificationStore` after refactored to `drain`. - transactions_for_verification: Mutex, + transactions_for_verification: VerificationStore, client: Arc, miner: Arc, accounts: Arc, @@ -157,7 +158,7 @@ impl Provider where { passwords: config.passwords, notify: RwLock::default(), transactions_for_signing: Mutex::default(), - transactions_for_verification: Mutex::default(), + transactions_for_verification: VerificationStore::default(), client, miner, accounts, @@ -256,54 +257,40 @@ impl Provider where { } /// Retrieve and verify the first available private transaction for every sender - /// - /// TODO [ToDr] It seems that: - /// The 3 methods `ready_transaction,get_descriptor,remove` are always used in conjuction so most likely - /// can be replaced with a single `drain()` method instead. - /// Thanks to this we also don't really need to lock the entire verification for the time of execution. fn process_queue(&self) -> Result<(), Error> { let nonce_cache = Default::default(); - let mut verification_queue = self.transactions_for_verification.lock(); - let ready_transactions = verification_queue.ready_transactions(self.pool_client(&nonce_cache)); + let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache)); for transaction in ready_transactions { - let transaction_hash = transaction.signed().hash(); - match verification_queue.private_transaction_descriptor(&transaction_hash) { - Ok(desc) => { - let private_hash = desc.private_transaction.hash(); - match desc.validator_account { - None => { - trace!("Propagating transaction further"); - self.broadcast_private_transaction(private_hash, desc.private_transaction.rlp_bytes().into_vec()); - return Ok(()) - } - Some(validator_account) => { - if !self.validator_accounts.contains(&validator_account) { - trace!("Propagating transaction further"); - self.broadcast_private_transaction(private_hash, desc.private_transaction.rlp_bytes().into_vec()); - return Ok(()) - } - if let Action::Call(contract) = transaction.signed().action { - // TODO [ToDr] Usage of BlockId::Latest - let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; - let private_state = self.execute_private_transaction(BlockId::Latest, transaction.signed())?; - let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); - trace!("Hashed effective private state for validator: {:?}", private_state_hash); - let password = find_account_password(&self.passwords, &*self.accounts, &validator_account); - let signed_state = self.accounts.sign(validator_account, password, private_state_hash)?; - let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None); - trace!("Sending signature for private transaction: {:?}", signed_private_transaction); - self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes().into_vec()); - } else { - warn!("Incorrect type of action for the transaction"); - } - } + let private_hash = transaction.private_transaction.hash(); + match transaction.validator_account { + None => { + trace!("Propagating transaction further"); + self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); + return Ok(()) + } + Some(validator_account) => { + if !self.validator_accounts.contains(&validator_account) { + trace!("Propagating transaction further"); + self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); + return Ok(()) + } + let signed_tx = transaction.transaction.clone(); + if let Action::Call(contract) = signed_tx.action { + // TODO [ToDr] Usage of BlockId::Latest + let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; + let private_state = self.execute_private_transaction(BlockId::Latest, &signed_tx)?; + let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); + trace!("Hashed effective private state for validator: {:?}", private_state_hash); + let password = find_account_password(&self.passwords, &*self.accounts, &validator_account); + let signed_state = self.accounts.sign(validator_account, password, private_state_hash)?; + let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None); + trace!("Sending signature for private transaction: {:?}", signed_private_transaction); + self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes().into_vec()); + } else { + warn!("Incorrect type of action for the transaction"); } - }, - Err(e) => { - warn!("Cannot retrieve descriptor for transaction with error {:?}", e); } } - verification_queue.remove_private_transaction(&transaction_hash); } Ok(()) } diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index 15abdfeec85..715307cff23 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -15,60 +15,187 @@ // along with Parity. If not, see . use std::sync::Arc; -use std::collections::{HashMap, HashSet}; +use std::cmp; +use std::collections::HashMap; +use std::collections::hash_map::Entry; use bytes::Bytes; use ethcore_miner::pool; use ethereum_types::{H256, U256, Address}; +use heapsize::HeapSizeOf; use ethkey::Signature; use messages::PrivateTransaction; +use parking_lot::RwLock; use transaction::{UnverifiedTransaction, SignedTransaction}; - +use txpool; +use txpool::{VerifiedTransaction, Verifier}; use error::{Error, ErrorKind}; +type Pool = txpool::Pool; + /// Maximum length for private transactions queues. const MAX_QUEUE_LEN: usize = 8312; +/// Transaction with the same (sender, nonce) can be replaced only if +/// `new_gas_price > old_gas_price + old_gas_price >> SHIFT` +const GAS_PRICE_BUMP_SHIFT: usize = 3; // 2 = 25%, 3 = 12.5%, 4 = 6.25% /// Desriptor for private transaction stored in queue for verification -#[derive(Default, Debug, Clone, PartialEq, Eq)] -pub struct PrivateTransactionDesc { +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct VerifiedPrivateTransaction { /// Original private transaction pub private_transaction: PrivateTransaction, /// Address that should be used for verification pub validator_account: Option
, + /// Resulted verified + pub transaction: SignedTransaction, + /// Original transaction's hash + pub transaction_hash: H256, + /// Original transaction's sender + pub transaction_sender: Address, +} + +impl txpool::VerifiedTransaction for VerifiedPrivateTransaction { + type Hash = H256; + type Sender = Address; + + fn hash(&self) -> &H256 { + &self.transaction_hash + } + + fn mem_usage(&self) -> usize { + self.transaction.heap_size_of_children() + } + + fn sender(&self) -> &Address { + &self.transaction_sender + } +} + +#[derive(Debug)] +pub struct PrivateScorying; + +impl txpool::Scoring for PrivateScorying { + type Score = U256; + type Event = (); + + fn compare(&self, old: &VerifiedPrivateTransaction, other: &VerifiedPrivateTransaction) -> cmp::Ordering { + old.transaction.nonce.cmp(&other.transaction.nonce) + } + + fn choose(&self, old: &VerifiedPrivateTransaction, new: &VerifiedPrivateTransaction) -> txpool::scoring::Choice { + if old.transaction.nonce != new.transaction.nonce { + return txpool::scoring::Choice::InsertNew + } + + let old_gp = old.transaction.gas_price; + let new_gp = new.transaction.gas_price; + + let min_required_gp = old_gp + (old_gp >> GAS_PRICE_BUMP_SHIFT); + + match min_required_gp.cmp(&new_gp) { + cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew, + _ => txpool::scoring::Choice::ReplaceOld, + } + } + + fn update_scores(&self, txs: &[txpool::Transaction], scores: &mut [U256], change: txpool::scoring::Change) { + use self::txpool::scoring::Change; + + match change { + Change::Culled(_) => {}, + Change::RemovedAt(_) => {} + Change::InsertedAt(i) | Change::ReplacedAt(i) => { + assert!(i < txs.len()); + assert!(i < scores.len()); + + scores[i] = txs[i].transaction.transaction.gas_price; + }, + Change::Event(_) => {} + } + } + + fn should_replace(&self, old: &VerifiedPrivateTransaction, new: &VerifiedPrivateTransaction) -> bool { + if old.sender() == new.sender() { + // prefer earliest transaction + if new.transaction.nonce < old.transaction.nonce { + return true + } + } + + self.choose(old, new) == txpool::scoring::Choice::ReplaceOld + } +} + +/// Checks readiness of transactions by comparing the nonce to state nonce. +/// Guarantees only one transaction per sender +#[derive(Debug)] +pub struct PrivateReadyState { + nonces: HashMap, + state: C, +} + +impl PrivateReadyState { + /// Create new State checker, given client interface. + pub fn new( + state: C, + ) -> Self { + PrivateReadyState { + nonces: Default::default(), + state, + } + } +} + +impl txpool::Ready for PrivateReadyState { + fn is_ready(&mut self, tx: &VerifiedPrivateTransaction) -> txpool::Readiness { + let sender = tx.sender(); + let state = &self.state; + let state_nonce = state.account_nonce(sender); + match self.nonces.entry(*sender) { + Entry::Vacant(entry) => { + let nonce = entry.insert(state_nonce); + match tx.transaction.nonce.cmp(nonce) { + cmp::Ordering::Greater => txpool::Readiness::Future, + cmp::Ordering::Less => txpool::Readiness::Stale, + cmp::Ordering::Equal => { + *nonce = *nonce + 1.into(); + txpool::Readiness::Ready + }, + } + } + Entry::Occupied(_) => { + txpool::Readiness::Future + } + } + } } /// Storage for private transactions for verification pub struct VerificationStore { - /// Descriptors for private transactions in queue for verification with key - hash of the original transaction - descriptors: HashMap, - /// Queue with transactions for verification - /// - /// TODO [ToDr] Might actually be better to use `txpool` directly and: - /// 1. Store descriptors inside `VerifiedTransaction` - /// 2. Use custom `ready` implementation to only fetch one transaction per sender. - /// 3. Get rid of passing dummy `block_number` and `timestamp` - transactions: pool::TransactionQueue, + verification_pool: RwLock, + verification_options: pool::verifier::Options, } impl Default for VerificationStore { fn default() -> Self { VerificationStore { - descriptors: Default::default(), - transactions: pool::TransactionQueue::new( - pool::Options { - max_count: MAX_QUEUE_LEN, - max_per_sender: MAX_QUEUE_LEN / 10, - max_mem_usage: 8 * 1024 * 1024, - }, - pool::verifier::Options { - // TODO [ToDr] This should probably be based on some real values? - minimal_gas_price: 0.into(), - block_gas_limit: 8_000_000.into(), - tx_gas_limit: U256::max_value(), - }, - pool::PrioritizationStrategy::GasPriceOnly, - ) + verification_pool: RwLock::new( + txpool::Pool::new( + txpool::NoopListener, + PrivateScorying, + pool::Options { + max_count: MAX_QUEUE_LEN, + max_per_sender: MAX_QUEUE_LEN / 10, + max_mem_usage: 8 * 1024 * 1024, + }, + ) + ), + verification_options: pool::verifier::Options { + // TODO [ToDr] This should probably be based on some real values? + minimal_gas_price: 0.into(), + block_gas_limit: 8_000_000.into(), + tx_gas_limit: U256::max_value(), + }, } } } @@ -76,64 +203,48 @@ impl Default for VerificationStore { impl VerificationStore { /// Adds private transaction for verification into the store pub fn add_transaction( - &mut self, + &self, transaction: UnverifiedTransaction, validator_account: Option
, private_transaction: PrivateTransaction, client: C, ) -> Result<(), Error> { - if self.descriptors.len() > MAX_QUEUE_LEN { - bail!(ErrorKind::QueueIsFull); - } - - let transaction_hash = transaction.hash(); - if self.descriptors.get(&transaction_hash).is_some() { - bail!(ErrorKind::PrivateTransactionAlreadyImported); - } - - let results = self.transactions.import( - client, - vec![pool::verifier::Transaction::Unverified(transaction)], - ); - - // Verify that transaction was imported - results.into_iter() - .next() - .expect("One transaction inserted; one result returned; qed")?; - self.descriptors.insert(transaction_hash, PrivateTransactionDesc { + let options = self.verification_options.clone(); + // Use pool's verifying pipeline for original transaction's verification + let verifier = pool::verifier::Verifier::new(client, options, Default::default()); + let _verified_tx = verifier.verify_transaction(pool::verifier::Transaction::Unverified(transaction.clone()))?; + let signed_tx = SignedTransaction::new(transaction)?; + let verified = VerifiedPrivateTransaction { private_transaction, validator_account, - }); - + transaction: signed_tx.clone(), + transaction_hash: signed_tx.hash(), + transaction_sender: signed_tx.sender(), + }; + let mut pool = self.verification_pool.write(); + pool.import(verified)?; Ok(()) } - /// Returns transactions ready for verification + /// Drains transactions ready for verification from the pool /// Returns only one transaction per sender because several cannot be verified in a row without verification from other peers - pub fn ready_transactions(&self, client: C) -> Vec> { - // We never store PendingTransactions and we don't use internal cache, - // so we don't need to provide real block number of timestamp here - let block_number = 0; - let timestamp = 0; - let nonce_cap = None; - - self.transactions.collect_pending(client, block_number, timestamp, nonce_cap, |transactions| { - // take only one transaction per sender - let mut senders = HashSet::with_capacity(self.descriptors.len()); - transactions.filter(move |tx| senders.insert(tx.signed().sender())).collect() - }) - } - - /// Returns descriptor of the corresponding private transaction - pub fn private_transaction_descriptor(&self, transaction_hash: &H256) -> Result<&PrivateTransactionDesc, Error> { - self.descriptors.get(transaction_hash).ok_or(ErrorKind::PrivateTransactionNotFound.into()) - } - - /// Remove transaction from the queue for verification - pub fn remove_private_transaction(&mut self, transaction_hash: &H256) { - self.descriptors.remove(transaction_hash); - self.transactions.remove(&[*transaction_hash], true); + pub fn drain(&self, client: C) -> Vec> { + let ready = PrivateReadyState::new(client); + let mut hashes: Vec = Vec::new(); + let res: Vec> = self.verification_pool.read().pending(ready).collect(); + res + .iter() + .for_each(|tx| { + hashes.push(tx.hash().clone()); + }); + let mut pool = self.verification_pool.write(); + hashes + .iter() + .for_each(|hash| { + pool.remove(&hash, true); + }); + res } } From a4778e37c5d0f42e5db79ee2834a59f197bb3c29 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Fri, 25 May 2018 16:02:25 +0200 Subject: [PATCH 03/19] Styling fixed --- ethcore/private-tx/src/private_transactions.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index 715307cff23..20d63487fe0 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -237,13 +237,15 @@ impl VerificationStore { .iter() .for_each(|tx| { hashes.push(tx.hash().clone()); - }); + } + ); let mut pool = self.verification_pool.write(); hashes .iter() .for_each(|hash| { pool.remove(&hash, true); - }); + } + ); res } } From fc113ba6d7fd4b5ed05fcebb13aed4c1d61cc09a Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Mon, 11 Jun 2018 11:51:13 +0200 Subject: [PATCH 04/19] Prevent resending private packets to the sender --- Cargo.lock | 1 + ethcore/private-tx/src/lib.rs | 21 +++++++++++--------- ethcore/service/Cargo.toml | 1 + ethcore/service/src/lib.rs | 1 + ethcore/service/src/service.rs | 5 +++-- ethcore/sync/src/chain/handler.rs | 32 +++++++++++++++++++++++-------- ethcore/sync/src/private_tx.rs | 23 ++++++++++++---------- 7 files changed, 55 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba056290ea8..62e83340a70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -836,6 +836,7 @@ dependencies = [ "ethcore-io 1.12.0", "ethcore-private-tx 1.0.0", "ethcore-sync 1.12.0", + "ethereum-types 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0", "kvdb-rocksdb 0.1.0", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 8b6343eb00b..99b1b20a8dc 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -540,12 +540,12 @@ impl Provider where { pub trait Importer { /// Process received private transaction - fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>; + fn import_private_transaction(&self, _rlp: &[u8]) -> Result; /// Add signed private transaction into the store /// /// Creates corresponding public transaction if last required signature collected and sends it to the chain - fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>; + fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result; } // TODO [ToDr] Offload more heavy stuff to the IoService thread. @@ -554,7 +554,7 @@ pub trait Importer { // for both verification and execution. impl Importer for Arc { - fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> { + fn import_private_transaction(&self, rlp: &[u8]) -> Result { trace!("Private transaction received"); let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?; let contract = private_tx.contract; @@ -570,19 +570,22 @@ impl Importer for Arc { self.transactions_for_verification.lock().add_transaction( original_tx, validation_account.map(|&account| account), - private_tx, + private_tx.clone(), self.pool_client(&nonce_cache), )?; - self.channel.send(ClientIoMessage::execute(move |_| { + if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { if let Err(e) = provider.process_queue() { debug!("Unable to process the queue: {}", e); } } - })).map_err(|_| ErrorKind::ClientIsMalformed.into()) + })) { + trace!("Error sending NewPrivateTransaction message: {:?}", e); + } + Ok(private_tx.hash()) } - fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> { + fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result { let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?; trace!("Signature for private transaction received: {:?}", tx); let private_hash = tx.private_transaction_hash(); @@ -590,7 +593,7 @@ impl Importer for Arc { None => { // Not our transaction, broadcast further to peers self.broadcast_signed_private_transaction(tx.hash(), rlp.into()); - return Ok(()); + return Ok(private_hash); }, Some(desc) => desc, }; @@ -642,7 +645,7 @@ impl Importer for Arc { } } } - Ok(()) + Ok(private_hash) } } diff --git a/ethcore/service/Cargo.toml b/ethcore/service/Cargo.toml index 634ee55dba4..f0c58e2d698 100644 --- a/ethcore/service/Cargo.toml +++ b/ethcore/service/Cargo.toml @@ -10,6 +10,7 @@ ethcore = { path = ".." } ethcore-io = { path = "../../util/io" } ethcore-private-tx = { path = "../private-tx" } ethcore-sync = { path = "../sync" } +ethereum-types = "0.3" kvdb = { path = "../../util/kvdb" } log = "0.3" stop-guard = { path = "../../util/stop-guard" } diff --git a/ethcore/service/src/lib.rs b/ethcore/service/src/lib.rs index d85a377cde2..7ded2af79ef 100644 --- a/ethcore/service/src/lib.rs +++ b/ethcore/service/src/lib.rs @@ -19,6 +19,7 @@ extern crate ethcore; extern crate ethcore_io as io; extern crate ethcore_private_tx; extern crate ethcore_sync as sync; +extern crate ethereum_types; extern crate kvdb; extern crate stop_guard; diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index 7248d97229f..b0ba36943f7 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -21,6 +21,7 @@ use std::path::Path; use std::time::Duration; use ansi_term::Colour; +use ethereum_types::H256; use io::{IoContext, TimerToken, IoHandler, IoService, IoError}; use kvdb::{KeyValueDB, KeyValueDBHandler}; use stop_guard::StopGuard; @@ -54,11 +55,11 @@ impl PrivateTxService { } impl PrivateTxHandler for PrivateTxService { - fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String> { + fn import_private_transaction(&self, rlp: &[u8]) -> Result { self.provider.import_private_transaction(rlp).map_err(|e| e.to_string()) } - fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String> { + fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result { self.provider.import_signed_private_transaction(rlp).map_err(|e| e.to_string()) } } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 14349981de2..81b1431737e 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -717,21 +717,29 @@ impl SyncHandler { } /// Called when peer sends us signed private transaction packet - fn on_signed_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); return Ok(()); } trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id); - if let Err(e) = sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) { - trace!(target: "sync", "Ignoring the message, error queueing: {}", e); - } + match self.private_tx_handler.import_signed_private_transaction(r.as_raw()) { + Ok(transaction_hash) => { + //don't send the packet back + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.last_sent_signed_private_transactions.insert(transaction_hash); + } + }, + Err(e) => { + trace!(target: "sync", "Ignoring the message, error queueing: {}", e); + } + } Ok(()) } /// Called when peer sends us new private transaction packet - fn on_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { + fn on_private_transaction(sync: &mut ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> { if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) { trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id); return Ok(()); @@ -739,9 +747,17 @@ impl SyncHandler { trace!(target: "sync", "Received private transaction packet from {:?}", peer_id); - if let Err(e) = sync.private_tx_handler.import_private_transaction(r.as_raw()) { - trace!(target: "sync", "Ignoring the message, error queueing: {}", e); - } + match self.private_tx_handler.import_private_transaction(r.as_raw()) { + Ok(transaction_hash) => { + //don't send the packet back + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.last_sent_private_transactions.insert(transaction_hash); + } + }, + Err(e) => { + trace!(target: "sync", "Ignoring the message, error queueing: {}", e); + } + } Ok(()) } } diff --git a/ethcore/sync/src/private_tx.rs b/ethcore/sync/src/private_tx.rs index d7434c8bd5b..c8c8528ed91 100644 --- a/ethcore/sync/src/private_tx.rs +++ b/ethcore/sync/src/private_tx.rs @@ -15,26 +15,29 @@ // along with Parity. If not, see . use parking_lot::Mutex; +use ethereum_types::H256; /// Trait which should be implemented by a private transaction handler. pub trait PrivateTxHandler: Send + Sync + 'static { /// Function called on new private transaction received. - fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String>; + /// Returns hash of the imported transaction + fn import_private_transaction(&self, rlp: &[u8]) -> Result; /// Function called on new signed private transaction received. - fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String>; + /// Returns hash of the imported transaction + fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result; } /// Nonoperative private transaction handler. pub struct NoopPrivateTxHandler; impl PrivateTxHandler for NoopPrivateTxHandler { - fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), String> { - Ok(()) + fn import_private_transaction(&self, _rlp: &[u8]) -> Result { + Ok(H256::default()) } - fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), String> { - Ok(()) + fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result { + Ok(H256::default()) } } @@ -48,13 +51,13 @@ pub struct SimplePrivateTxHandler { } impl PrivateTxHandler for SimplePrivateTxHandler { - fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), String> { + fn import_private_transaction(&self, rlp: &[u8]) -> Result { self.txs.lock().push(rlp.to_vec()); - Ok(()) + Ok(H256::default()) } - fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), String> { + fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result { self.signed_txs.lock().push(rlp.to_vec()); - Ok(()) + Ok(H256::default()) } } From e5feccda1d2c9a9a243969024ba0cbc58b756ec8 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Mon, 11 Jun 2018 14:06:21 +0200 Subject: [PATCH 05/19] Process signed private transaction packets via io queue --- ethcore/private-tx/src/lib.rs | 135 +++++++++++++++++------------- ethcore/sync/src/tests/helpers.rs | 5 ++ 2 files changed, 83 insertions(+), 57 deletions(-) diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 99b1b20a8dc..e0944211d59 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -131,6 +131,7 @@ pub struct Provider { miner: Arc, accounts: Arc, channel: IoChannel, + imported_signed_transactions: Mutex>, } #[derive(Debug)] @@ -159,6 +160,7 @@ impl Provider where { notify: RwLock::default(), transactions_for_signing: Mutex::default(), transactions_for_verification: VerificationStore::default(), + imported_signed_transactions: Mutex::default(), client, miner, accounts, @@ -257,7 +259,7 @@ impl Provider where { } /// Retrieve and verify the first available private transaction for every sender - fn process_queue(&self) -> Result<(), Error> { + fn process_verification_queue(&self) -> Result<(), Error> { let nonce_cache = Default::default(); let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache)); for transaction in ready_transactions { @@ -295,6 +297,71 @@ impl Provider where { Ok(()) } + /// Add signed private transaction into the store /// Creates corresponding public transaction if last required signature collected and sends it to the chain + pub fn process_signatures(&self) -> Result<(), Error> { + trace!("Processing signed private transactions"); + let mut signed_transactions = self.imported_signed_transactions.lock(); + for tx in signed_transactions.drain(..) { + let private_hash = tx.private_transaction_hash(); + let desc = match self.transactions_for_signing.lock().get(&private_hash) { + None => { + // Not our transaction, broadcast further to peers + self.broadcast_signed_private_transaction(tx.hash(), tx.rlp_bytes().into_vec()); + return Ok(()); + }, + Some(desc) => desc, + }; + let last = self.last_required_signature(&desc, tx.signature())?; + + if last { + let mut signatures = desc.received_signatures.clone(); + signatures.push(tx.signature()); + let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); + //Create public transaction + let public_tx = self.public_transaction( + desc.state.clone(), + &desc.original_transaction, + &rsv, + desc.original_transaction.nonce, + desc.original_transaction.gas_price + )?; + trace!("Last required signature received, public transaction created: {:?}", public_tx); + //Sign and add it to the queue + let chain_id = desc.original_transaction.chain_id(); + let hash = public_tx.hash(chain_id); + let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?; + let password = find_account_password(&self.passwords, &*self.accounts, &signer_account); + let signature = self.accounts.sign(signer_account, password, hash)?; + let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?; + match self.miner.import_own_transaction(&*self.client, signed.into()) { + Ok(_) => trace!("Public transaction added to queue"), + Err(err) => { + trace!("Failed to add transaction to queue, error: {:?}", err); + bail!(err); + } + } + //Remove from store for signing + match self.transactions_for_signing.lock().remove(&private_hash) { + Ok(_) => {} + Err(err) => { + trace!("Failed to remove transaction from signing store, error: {:?}", err); + bail!(err); + } + } + } else { + //Add signature to the store + match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) { + Ok(_) => trace!("Signature stored for private transaction"), + Err(err) => { + trace!("Failed to add signature to signing store, error: {:?}", err); + bail!(err); + } + } + } + } + Ok(()) + } + fn last_required_signature(&self, desc: &PrivateTransactionSigningDesc, sign: Signature) -> Result { if desc.received_signatures.contains(&sign) { return Ok(false); @@ -575,7 +642,7 @@ impl Importer for Arc { )?; if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { - if let Err(e) = provider.process_queue() { + if let Err(e) = provider.process_verification_queue() { debug!("Unable to process the queue: {}", e); } } @@ -589,61 +656,15 @@ impl Importer for Arc { let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?; trace!("Signature for private transaction received: {:?}", tx); let private_hash = tx.private_transaction_hash(); - let desc = match self.transactions_for_signing.lock().get(&private_hash) { - None => { - // Not our transaction, broadcast further to peers - self.broadcast_signed_private_transaction(tx.hash(), rlp.into()); - return Ok(private_hash); - }, - Some(desc) => desc, - }; - - let last = self.last_required_signature(&desc, tx.signature())?; - - if last { - let mut signatures = desc.received_signatures.clone(); - signatures.push(tx.signature()); - let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); - //Create public transaction - let public_tx = self.public_transaction( - desc.state.clone(), - &desc.original_transaction, - &rsv, - desc.original_transaction.nonce, - desc.original_transaction.gas_price - )?; - trace!("Last required signature received, public transaction created: {:?}", public_tx); - //Sign and add it to the queue - let chain_id = desc.original_transaction.chain_id(); - let hash = public_tx.hash(chain_id); - let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?; - let password = find_account_password(&self.passwords, &*self.accounts, &signer_account); - let signature = self.accounts.sign(signer_account, password, hash)?; - let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?; - match self.miner.import_own_transaction(&*self.client, signed.into()) { - Ok(_) => trace!("Public transaction added to queue"), - Err(err) => { - trace!("Failed to add transaction to queue, error: {:?}", err); - bail!(err); - } - } - //Remove from store for signing - match self.transactions_for_signing.lock().remove(&private_hash) { - Ok(_) => {} - Err(err) => { - trace!("Failed to remove transaction from signing store, error: {:?}", err); - bail!(err); - } - } - } else { - //Add signature to the store - match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) { - Ok(_) => trace!("Signature stored for private transaction"), - Err(err) => { - trace!("Failed to add signature to signing store, error: {:?}", err); - bail!(err); - } - } + self.imported_signed_transactions.lock().push(tx); + if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| { + if let Some(provider) = provider.upgrade() { + if let Err(e) = provider.process_signatures() { + debug!("Unable to process the queue: {}", e); + } + } + })) { + trace!("Error sending NewSignedPrivateTransaction message: {:?}", e); } Ok(private_hash) } diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 4f8bf0ed06d..faca81cc04b 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -508,6 +508,7 @@ impl TestNet> { pub struct TestIoHandler { pub client: Arc, pub private_tx_queued: Mutex, + pub private_signed_tx_queued: Mutex, } impl TestIoHandler { @@ -515,6 +516,7 @@ impl TestIoHandler { TestIoHandler { client, private_tx_queued: Mutex::default(), + private_signed_tx_queued: Mutex::default(), } } } @@ -526,6 +528,9 @@ impl IoHandler for TestIoHandler { *self.private_tx_queued.lock() += 1; (*exec.0)(&self.client); }, + ClientIoMessage::NewSignedPrivateTransaction => { + *self.private_signed_tx_queued.lock() += 1; + }, _ => {} // ignore other messages } } From 64367454a4b9544950afb14be36a4a80a5a9690b Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Mon, 11 Jun 2018 14:20:29 +0200 Subject: [PATCH 06/19] Test fixed --- ethcore/sync/src/tests/private.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ethcore/sync/src/tests/private.rs b/ethcore/sync/src/tests/private.rs index 120dc8fc9c5..836e9825e48 100644 --- a/ethcore/sync/src/tests/private.rs +++ b/ethcore/sync/src/tests/private.rs @@ -144,6 +144,7 @@ fn send_private_transaction() { //process signed response let signed_private_transaction = received_signed_private_transactions[0].clone(); assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok()); + assert!(pm0.on_signed_private_transaction_queued().is_ok()); let local_transactions = net.peer(0).miner.local_transactions(); assert_eq!(local_transactions.len(), 1); } From 800aa194d0b8a1323fbabc3762b9ea9f5eafc890 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Tue, 19 Jun 2018 13:42:03 +0200 Subject: [PATCH 07/19] Build and test fixed after merge --- Cargo.lock | 4 ++-- ethcore/private-tx/src/lib.rs | 6 ++++-- ethcore/sync/src/chain/handler.rs | 8 ++++---- ethcore/sync/src/chain/mod.rs | 4 ++-- ethcore/sync/src/chain/propagator.rs | 8 ++++---- ethcore/sync/src/tests/helpers.rs | 5 ----- ethcore/sync/src/tests/private.rs | 2 +- 7 files changed, 17 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 62e83340a70..83f3cb364ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,7 +783,7 @@ dependencies = [ "serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "transaction-pool 1.12.0", + "transaction-pool 1.12.1", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -836,7 +836,7 @@ dependencies = [ "ethcore-io 1.12.0", "ethcore-private-tx 1.0.0", "ethcore-sync 1.12.0", - "ethereum-types 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0", "kvdb-rocksdb 0.1.0", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index e0944211d59..8ba5ab2dab2 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -634,12 +634,13 @@ impl Importer for Arc { let original_tx = self.extract_original_transaction(private_tx.clone(), &contract)?; trace!("Original transaction: {:?}", original_tx); let nonce_cache = Default::default(); - self.transactions_for_verification.lock().add_transaction( + self.transactions_for_verification.add_transaction( original_tx, validation_account.map(|&account| account), private_tx.clone(), self.pool_client(&nonce_cache), )?; + let provider = Arc::downgrade(self); if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { if let Err(e) = provider.process_verification_queue() { @@ -657,6 +658,7 @@ impl Importer for Arc { trace!("Signature for private transaction received: {:?}", tx); let private_hash = tx.private_transaction_hash(); self.imported_signed_transactions.lock().push(tx); + let provider = Arc::downgrade(self); if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { if let Err(e) = provider.process_signatures() { @@ -684,7 +686,7 @@ impl ChainNotify for Provider { fn new_blocks(&self, imported: Vec, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { if !imported.is_empty() { trace!("New blocks imported, try to prune the queue"); - if let Err(err) = self.process_queue() { + if let Err(err) = self.process_verification_queue() { trace!("Cannot prune private transactions queue. error: {:?}", err); } } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 81b1431737e..1c966cf4fd1 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -724,10 +724,10 @@ impl SyncHandler { } trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id); - match self.private_tx_handler.import_signed_private_transaction(r.as_raw()) { + match sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) { Ok(transaction_hash) => { //don't send the packet back - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { peer.last_sent_signed_private_transactions.insert(transaction_hash); } }, @@ -747,10 +747,10 @@ impl SyncHandler { trace!(target: "sync", "Received private transaction packet from {:?}", peer_id); - match self.private_tx_handler.import_private_transaction(r.as_raw()) { + match sync.private_tx_handler.import_private_transaction(r.as_raw()) { Ok(transaction_hash) => { //don't send the packet back - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { peer.last_sent_private_transactions.insert(transaction_hash); } }, diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index fdfda275108..0510eb8077f 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -1061,7 +1061,7 @@ impl ChainSync { fn get_private_transaction_peers(&self, transaction_hash: &H256) -> Vec { self.peers.iter().filter_map( - |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3 + |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 && !p.last_sent_private_transactions.contains(transaction_hash) { Some(*id) } else { @@ -1072,7 +1072,7 @@ impl ChainSync { fn get_signed_private_transaction_peers(&self, transaction_hash: &H256) -> Vec { self.peers.iter().filter_map( - |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3 + |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 && !p.last_sent_signed_private_transactions.contains(transaction_hash) { Some(*id) } else { diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 298c1627407..918fd8d234a 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -290,10 +290,10 @@ impl SyncPropagator { /// Broadcast private transaction message to peers. pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { - let lucky_peers = ChainSync::select_random_peers(&self.get_private_transaction_peers(&transaction_hash)); + let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers); for peer_id in lucky_peers { - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { peer.last_sent_private_transactions.insert(transaction_hash); } SyncPropagator::send_packet(io, peer_id, PRIVATE_TRANSACTION_PACKET, packet.clone()); @@ -302,10 +302,10 @@ impl SyncPropagator { /// Broadcast signed private transaction message to peers. pub fn propagate_signed_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { - let lucky_peers = ChainSync::select_random_peers(&self.get_signed_private_transaction_peers(&transaction_hash)); + let lucky_peers = ChainSync::select_random_peers(&sync.get_signed_private_transaction_peers(&transaction_hash)); trace!(target: "sync", "Sending signed private transaction packet to {:?}", lucky_peers); for peer_id in lucky_peers { - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { peer.last_sent_signed_private_transactions.insert(transaction_hash); } SyncPropagator::send_packet(io, peer_id, SIGNED_PRIVATE_TRANSACTION_PACKET, packet.clone()); diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index faca81cc04b..4f8bf0ed06d 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -508,7 +508,6 @@ impl TestNet> { pub struct TestIoHandler { pub client: Arc, pub private_tx_queued: Mutex, - pub private_signed_tx_queued: Mutex, } impl TestIoHandler { @@ -516,7 +515,6 @@ impl TestIoHandler { TestIoHandler { client, private_tx_queued: Mutex::default(), - private_signed_tx_queued: Mutex::default(), } } } @@ -528,9 +526,6 @@ impl IoHandler for TestIoHandler { *self.private_tx_queued.lock() += 1; (*exec.0)(&self.client); }, - ClientIoMessage::NewSignedPrivateTransaction => { - *self.private_signed_tx_queued.lock() += 1; - }, _ => {} // ignore other messages } } diff --git a/ethcore/sync/src/tests/private.rs b/ethcore/sync/src/tests/private.rs index 836e9825e48..06d6e73a1f0 100644 --- a/ethcore/sync/src/tests/private.rs +++ b/ethcore/sync/src/tests/private.rs @@ -144,7 +144,7 @@ fn send_private_transaction() { //process signed response let signed_private_transaction = received_signed_private_transactions[0].clone(); assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok()); - assert!(pm0.on_signed_private_transaction_queued().is_ok()); + assert!(pm0.process_signatures().is_ok()); let local_transactions = net.peer(0).miner.local_transactions(); assert_eq!(local_transactions.len(), 1); } From 55eef9ec6939f3cb063777fba3b45ef91bfde2f1 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Fri, 13 Jul 2018 16:15:34 +0200 Subject: [PATCH 08/19] Comments after review fixed --- ethcore/private-tx/src/error.rs | 13 +- ethcore/private-tx/src/lib.rs | 180 +++++++++--------- ethcore/private-tx/src/messages.rs | 32 +++- .../private-tx/src/private_transactions.rs | 28 +-- ethcore/sync/src/api.rs | 7 +- ethcore/sync/src/chain/handler.rs | 3 +- ethcore/sync/src/chain/mod.rs | 45 +---- ethcore/sync/src/chain/propagator.rs | 19 +- ethcore/sync/src/tests/helpers.rs | 6 +- ethcore/sync/src/tests/private.rs | 6 +- 10 files changed, 156 insertions(+), 183 deletions(-) diff --git a/ethcore/private-tx/src/error.rs b/ethcore/private-tx/src/error.rs index 0231eedbccd..906195e97c6 100644 --- a/ethcore/private-tx/src/error.rs +++ b/ethcore/private-tx/src/error.rs @@ -28,6 +28,7 @@ error_chain! { Io(::std::io::Error) #[doc = "Error concerning the Rust standard library's IO subsystem."]; Decoder(DecoderError) #[doc = "RLP decoding error."]; Trie(TrieError) #[doc = "Error concerning TrieDBs."]; + Txpool(TxPoolError) #[doc = "Tx pool error."]; } errors { @@ -168,12 +169,6 @@ error_chain! { description("General ethcore error."), display("General ethcore error {}", err), } - - #[doc = "Tx pool error."] - Txpool(err: TxPoolError) { - description("Tx pool error."), - display("Tx pool error {}", err), - } } } @@ -207,12 +202,6 @@ impl From for Error { } } -impl From for Error { - fn from(err: TxPoolError) -> Self { - ErrorKind::Txpool(err).into() - } -} - impl From> for Error where Error: From { fn from(err: Box) -> Error { Error::from(*err) diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 8ba5ab2dab2..866d7ed87cc 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -72,7 +72,7 @@ use std::time::Duration; use ethereum_types::{H128, H256, U256, Address}; use hash::keccak; use rlp::*; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use bytes::Bytes; use ethkey::{Signature, recover, public_to_address}; use io::IoChannel; @@ -125,13 +125,12 @@ pub struct Provider { signer_account: Option
, passwords: Vec, notify: RwLock>>, - transactions_for_signing: Mutex, + transactions_for_signing: RwLock, transactions_for_verification: VerificationStore, client: Arc, miner: Arc, accounts: Arc, channel: IoChannel, - imported_signed_transactions: Mutex>, } #[derive(Debug)] @@ -158,9 +157,8 @@ impl Provider where { signer_account: config.signer_account, passwords: config.passwords, notify: RwLock::default(), - transactions_for_signing: Mutex::default(), + transactions_for_signing: RwLock::default(), transactions_for_verification: VerificationStore::default(), - imported_signed_transactions: Mutex::default(), client, miner, accounts, @@ -201,10 +199,7 @@ impl Provider where { Action::Call(contract) => { let data = signed_transaction.rlp_bytes(); let encrypted_transaction = self.encrypt(&contract, &Self::iv_from_transaction(&signed_transaction), &data)?; - let private = PrivateTransaction { - encrypted: encrypted_transaction, - contract, - }; + let private = PrivateTransaction::new(encrypted_transaction, contract); // TODO [ToDr] Using BlockId::Latest is bad here, // the block may change in the middle of execution // causing really weird stuff to happen. @@ -218,7 +213,7 @@ impl Provider where { trace!("Required validators: {:?}", contract_validators); let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); trace!("Hashed effective private state for sender: {:?}", private_state_hash); - self.transactions_for_signing.lock().add_transaction(private.hash(), signed_transaction, contract_validators, private_state, contract_nonce)?; + self.transactions_for_signing.write().add_transaction(private.hash(), signed_transaction, contract_validators, private_state, contract_nonce)?; self.broadcast_private_transaction(private.hash(), private.rlp_bytes().into_vec()); Ok(Receipt { hash: tx_hash, @@ -268,23 +263,38 @@ impl Provider where { None => { trace!("Propagating transaction further"); self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); - return Ok(()) + continue; } Some(validator_account) => { if !self.validator_accounts.contains(&validator_account) { trace!("Propagating transaction further"); self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); - return Ok(()) + continue; } - let signed_tx = transaction.transaction.clone(); - if let Action::Call(contract) = signed_tx.action { + let tx_action = transaction.transaction.action.clone(); + if let Action::Call(contract) = tx_action { // TODO [ToDr] Usage of BlockId::Latest - let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; - let private_state = self.execute_private_transaction(BlockId::Latest, &signed_tx)?; + let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest); + if let Err(e) = contract_nonce { + warn!("Cannot retrieve contract nonce: {:?}", e); + continue; + } + let contract_nonce = contract_nonce.expect("Error was checked before"); + let private_state = self.execute_private_transaction(BlockId::Latest, &transaction.transaction); + if let Err(e) = private_state { + warn!("Cannot retrieve private state: {:?}", e); + continue; + } + let private_state = private_state.expect("Error was checked before"); let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); trace!("Hashed effective private state for validator: {:?}", private_state_hash); let password = find_account_password(&self.passwords, &*self.accounts, &validator_account); - let signed_state = self.accounts.sign(validator_account, password, private_state_hash)?; + let signed_state = self.accounts.sign(validator_account, password, private_state_hash); + if let Err(e) = signed_state { + warn!("Cannot sign the state: {:?}", e); + continue; + } + let signed_state = signed_state.expect("Error was checked before"); let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None); trace!("Sending signature for private transaction: {:?}", signed_private_transaction); self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes().into_vec()); @@ -297,68 +307,66 @@ impl Provider where { Ok(()) } - /// Add signed private transaction into the store /// Creates corresponding public transaction if last required signature collected and sends it to the chain - pub fn process_signatures(&self) -> Result<(), Error> { - trace!("Processing signed private transactions"); - let mut signed_transactions = self.imported_signed_transactions.lock(); - for tx in signed_transactions.drain(..) { - let private_hash = tx.private_transaction_hash(); - let desc = match self.transactions_for_signing.lock().get(&private_hash) { - None => { - // Not our transaction, broadcast further to peers - self.broadcast_signed_private_transaction(tx.hash(), tx.rlp_bytes().into_vec()); - return Ok(()); - }, - Some(desc) => desc, - }; - let last = self.last_required_signature(&desc, tx.signature())?; - - if last { - let mut signatures = desc.received_signatures.clone(); - signatures.push(tx.signature()); - let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); - //Create public transaction - let public_tx = self.public_transaction( - desc.state.clone(), - &desc.original_transaction, - &rsv, - desc.original_transaction.nonce, - desc.original_transaction.gas_price - )?; - trace!("Last required signature received, public transaction created: {:?}", public_tx); - //Sign and add it to the queue - let chain_id = desc.original_transaction.chain_id(); - let hash = public_tx.hash(chain_id); - let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?; - let password = find_account_password(&self.passwords, &*self.accounts, &signer_account); - let signature = self.accounts.sign(signer_account, password, hash)?; - let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?; - match self.miner.import_own_transaction(&*self.client, signed.into()) { - Ok(_) => trace!("Public transaction added to queue"), - Err(err) => { - trace!("Failed to add transaction to queue, error: {:?}", err); - bail!(err); - } - } - //Remove from store for signing - match self.transactions_for_signing.lock().remove(&private_hash) { - Ok(_) => {} - Err(err) => { - trace!("Failed to remove transaction from signing store, error: {:?}", err); - bail!(err); - } - } - } else { - //Add signature to the store - match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) { - Ok(_) => trace!("Signature stored for private transaction"), - Err(err) => { - trace!("Failed to add signature to signing store, error: {:?}", err); - bail!(err); - } - } - } - } + /// Add signed private transaction into the store + /// Creates corresponding public transaction if last required signature collected and sends it to the chain + pub fn process_signature(&self, signed_tx: SignedPrivateTransaction) -> Result<(), Error> { + trace!("Processing signed private transaction"); + let private_hash = signed_tx.private_transaction_hash(); + let desc = match self.transactions_for_signing.read().get(&private_hash) { + None => { + // Not our transaction, broadcast further to peers + self.broadcast_signed_private_transaction(signed_tx.hash(), signed_tx.rlp_bytes().into_vec()); + return Ok(()); + }, + Some(desc) => desc, + }; + let last = self.last_required_signature(&desc, signed_tx.signature())?; + + if last { + let mut signatures = desc.received_signatures.clone(); + signatures.push(signed_tx.signature()); + let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); + //Create public transaction + let public_tx = self.public_transaction( + desc.state.clone(), + &desc.original_transaction, + &rsv, + desc.original_transaction.nonce, + desc.original_transaction.gas_price + )?; + trace!("Last required signature received, public transaction created: {:?}", public_tx); + //Sign and add it to the queue + let chain_id = desc.original_transaction.chain_id(); + let hash = public_tx.hash(chain_id); + let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?; + let password = find_account_password(&self.passwords, &*self.accounts, &signer_account); + let signature = self.accounts.sign(signer_account, password, hash)?; + let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?; + match self.miner.import_own_transaction(&*self.client, signed.into()) { + Ok(_) => trace!("Public transaction added to queue"), + Err(err) => { + trace!("Failed to add transaction to queue, error: {:?}", err); + bail!(err); + } + } + //Remove from store for signing + match self.transactions_for_signing.write().remove(&private_hash) { + Ok(_) => {} + Err(err) => { + trace!("Failed to remove transaction from signing store, error: {:?}", err); + bail!(err); + } + } + } else { + //Add signature to the store + match self.transactions_for_signing.write().add_signature(&private_hash, signed_tx.signature()) { + Ok(_) => trace!("Signature stored for private transaction"), + Err(err) => { + trace!("Failed to add signature to signing store, error: {:?}", err); + bail!(err); + } + } + } Ok(()) } @@ -389,12 +397,12 @@ impl Provider where { /// Broadcast the private transaction message to the chain fn broadcast_private_transaction(&self, transaction_hash: H256, message: Bytes) { - self.notify(|notify| notify.broadcast(ChainMessageType::PrivateTransaction(transaction_hash.clone(), message.clone()))); + self.notify(|notify| notify.broadcast(ChainMessageType::PrivateTransaction(transaction_hash, message.clone()))); } /// Broadcast signed private transaction message to the chain fn broadcast_signed_private_transaction(&self, transaction_hash: H256, message: Bytes) { - self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(transaction_hash.clone(), message.clone()))); + self.notify(|notify| notify.broadcast(ChainMessageType::SignedPrivateTransaction(transaction_hash, message.clone()))); } fn iv_from_transaction(transaction: &SignedTransaction) -> H128 { @@ -624,6 +632,7 @@ impl Importer for Arc { fn import_private_transaction(&self, rlp: &[u8]) -> Result { trace!("Private transaction received"); let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?; + let private_tx_hash = private_tx.hash(); let contract = private_tx.contract; let contract_validators = self.get_validators(BlockId::Latest, &contract)?; @@ -637,7 +646,7 @@ impl Importer for Arc { self.transactions_for_verification.add_transaction( original_tx, validation_account.map(|&account| account), - private_tx.clone(), + private_tx, self.pool_client(&nonce_cache), )?; let provider = Arc::downgrade(self); @@ -650,19 +659,18 @@ impl Importer for Arc { })) { trace!("Error sending NewPrivateTransaction message: {:?}", e); } - Ok(private_tx.hash()) + Ok(private_tx_hash) } fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result { let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?; trace!("Signature for private transaction received: {:?}", tx); let private_hash = tx.private_transaction_hash(); - self.imported_signed_transactions.lock().push(tx); let provider = Arc::downgrade(self); if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { - if let Err(e) = provider.process_signatures() { - debug!("Unable to process the queue: {}", e); + if let Err(e) = provider.process_signature(tx.clone()) { + debug!("Unable to process the signature: {}", e); } } })) { diff --git a/ethcore/private-tx/src/messages.rs b/ethcore/private-tx/src/messages.rs index c9bfb2cbfb3..2629ae79b33 100644 --- a/ethcore/private-tx/src/messages.rs +++ b/ethcore/private-tx/src/messages.rs @@ -28,12 +28,28 @@ pub struct PrivateTransaction { pub encrypted: Bytes, /// Address of the contract pub contract: Address, + /// Hash + hash: H256, } impl PrivateTransaction { - /// Compute hash on private transaction + /// Constructor + pub fn new(encrypted: Bytes, contract: Address) -> Self { + PrivateTransaction { + encrypted, + contract, + hash: 0.into(), + }.compute_hash() + } + + fn compute_hash(mut self) -> PrivateTransaction { + self.hash = keccak(&*self.rlp_bytes()); + self + } + + /// Hash of the private transaction pub fn hash(&self) -> H256 { - keccak(&*self.rlp_bytes()) + self.hash } } @@ -49,6 +65,8 @@ pub struct SignedPrivateTransaction { r: U256, /// The S field of the signature s: U256, + /// Hash + hash: H256, } impl SignedPrivateTransaction { @@ -59,7 +77,13 @@ impl SignedPrivateTransaction { r: sig.r().into(), s: sig.s().into(), v: add_chain_replay_protection(sig.v() as u64, chain_id), - } + hash: 0.into(), + }.compute_hash() + } + + fn compute_hash(mut self) -> SignedPrivateTransaction { + self.hash = keccak(&*self.rlp_bytes()); + self } pub fn standard_v(&self) -> u8 { check_replay_protection(self.v) } @@ -76,6 +100,6 @@ impl SignedPrivateTransaction { /// Own hash pub fn hash(&self) -> H256 { - keccak(&*self.rlp_bytes()) + self.hash } } diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index 20d63487fe0..21d61d36ad9 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -31,7 +31,7 @@ use txpool; use txpool::{VerifiedTransaction, Verifier}; use error::{Error, ErrorKind}; -type Pool = txpool::Pool; +type Pool = txpool::Pool; /// Maximum length for private transactions queues. const MAX_QUEUE_LEN: usize = 8312; @@ -72,9 +72,9 @@ impl txpool::VerifiedTransaction for VerifiedPrivateTransaction { } #[derive(Debug)] -pub struct PrivateScorying; +pub struct PrivateScoring; -impl txpool::Scoring for PrivateScorying { +impl txpool::Scoring for PrivateScoring { type Score = U256; type Event = (); @@ -182,7 +182,7 @@ impl Default for VerificationStore { verification_pool: RwLock::new( txpool::Pool::new( txpool::NoopListener, - PrivateScorying, + PrivateScoring, pool::Options { max_count: MAX_QUEUE_LEN, max_per_sender: MAX_QUEUE_LEN / 10, @@ -231,22 +231,12 @@ impl VerificationStore { /// Returns only one transaction per sender because several cannot be verified in a row without verification from other peers pub fn drain(&self, client: C) -> Vec> { let ready = PrivateReadyState::new(client); - let mut hashes: Vec = Vec::new(); - let res: Vec> = self.verification_pool.read().pending(ready).collect(); - res - .iter() - .for_each(|tx| { - hashes.push(tx.hash().clone()); - } - ); + let transactions: Vec<_> = self.verification_pool.read().pending(ready).collect(); let mut pool = self.verification_pool.write(); - hashes - .iter() - .for_each(|hash| { - pool.remove(&hash, true); - } - ); - res + for tx in &transactions { + pool.remove(tx.hash(), true); + } + transactions } } diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 686c6d340e7..c8792a702e3 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -36,7 +36,8 @@ use std::net::{SocketAddr, AddrParseError}; use std::str::FromStr; use parking_lot::RwLock; use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62, - PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3}; + PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, + PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; use light::client::AsLightClient; use light::Provider; use light::net::{self as light_net, LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext}; @@ -503,9 +504,9 @@ impl ChainNotify for EthSync { match message_type { ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message), ChainMessageType::PrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PRIVATE_TRANSACTION_PACKET, message), ChainMessageType::SignedPrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_signed_private_transaction(&mut sync_io, transaction_hash, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, message), } }); } diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 1c966cf4fd1..818834de6d4 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -627,7 +627,6 @@ impl SyncHandler { ask_time: Instant::now(), last_sent_transactions: HashSet::new(), last_sent_private_transactions: HashSet::new(), - last_sent_signed_private_transactions: HashSet::new(), expired: false, confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed }, asking_snapshot_data: None, @@ -728,7 +727,7 @@ impl SyncHandler { Ok(transaction_hash) => { //don't send the packet back if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { - peer.last_sent_signed_private_transactions.insert(transaction_hash); + peer.last_sent_private_transactions.insert(transaction_hash); } }, Err(e) => { diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 0510eb8077f..de3c5741da7 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -178,8 +178,8 @@ pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12; pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13; pub const SNAPSHOT_DATA_PACKET: u8 = 0x14; pub const CONSENSUS_DATA_PACKET: u8 = 0x15; -const PRIVATE_TRANSACTION_PACKET: u8 = 0x16; -const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17; +pub const PRIVATE_TRANSACTION_PACKET: u8 = 0x16; +pub const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17; const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; @@ -329,10 +329,8 @@ pub struct PeerInfo { ask_time: Instant, /// Holds a set of transactions recently sent to this peer to avoid spamming. last_sent_transactions: HashSet, - /// Holds a set of private transactions recently sent to this peer to avoid spamming. + /// Holds a set of private transactions and their signatures recently sent to this peer to avoid spamming. last_sent_private_transactions: HashSet, - /// Holds a set of signed private transactions recently sent to this peer to avoid spamming. - last_sent_signed_private_transactions: HashSet, /// Pending request is expired and result should be ignored expired: bool, /// Peer fork confirmation status @@ -365,7 +363,6 @@ impl PeerInfo { fn reset_private_stats(&mut self) { self.last_sent_private_transactions.clear(); - self.last_sent_signed_private_transactions.clear(); } } @@ -1070,24 +1067,6 @@ impl ChainSync { ).collect() } - fn get_signed_private_transaction_peers(&self, transaction_hash: &H256) -> Vec { - self.peers.iter().filter_map( - |(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 - && !p.last_sent_signed_private_transactions.contains(transaction_hash) { - Some(*id) - } else { - None - } - ).collect() - } - - /// Clear private packets stats from peer infos - fn clear_private_stats(&mut self) { - for (_, ref mut peer) in &mut self.peers { - peer.reset_private_stats(); - } - } - /// Maintain other peers. Send out any new blocks and transactions pub fn maintain_sync(&mut self, io: &mut SyncIo) { self.maybe_start_snapshot_sync(io); @@ -1116,10 +1095,12 @@ impl ChainSync { self.peers.values_mut().nth(peer).map(|peer_info| peer_info.last_sent_transactions.clear() ); - } - // reset stats for private transaction packets - self.clear_private_stats(); + //re-broadcast private packets as well + self.peers.values_mut().nth(peer).map(|peer_info| + peer_info.reset_private_stats() + ); + } } /// Dispatch incoming requests and responses @@ -1158,13 +1139,8 @@ impl ChainSync { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { - SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet); - } - - /// Broadcast signed private transaction message to peers. - pub fn propagate_signed_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { - SyncPropagator::propagate_signed_private_transaction(self, io, transaction_hash, packet); + pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { + SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet); } } @@ -1288,7 +1264,6 @@ pub mod tests { ask_time: Instant::now(), last_sent_transactions: HashSet::new(), last_sent_private_transactions: HashSet::new(), - last_sent_signed_private_transactions: HashSet::new(), expired: false, confirmation: super::ForkConfirmation::Confirmed, snapshot_number: None, diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 918fd8d234a..19a1798eb01 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -38,8 +38,6 @@ use super::{ CONSENSUS_DATA_PACKET, NEW_BLOCK_HASHES_PACKET, NEW_BLOCK_PACKET, - PRIVATE_TRANSACTION_PACKET, - SIGNED_PRIVATE_TRANSACTION_PACKET, TRANSACTIONS_PACKET, }; @@ -289,26 +287,14 @@ impl SyncPropagator { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { + pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers); for peer_id in lucky_peers { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { peer.last_sent_private_transactions.insert(transaction_hash); } - SyncPropagator::send_packet(io, peer_id, PRIVATE_TRANSACTION_PACKET, packet.clone()); - } - } - - /// Broadcast signed private transaction message to peers. - pub fn propagate_signed_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet: Bytes) { - let lucky_peers = ChainSync::select_random_peers(&sync.get_signed_private_transaction_peers(&transaction_hash)); - trace!(target: "sync", "Sending signed private transaction packet to {:?}", lucky_peers); - for peer_id in lucky_peers { - if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { - peer.last_sent_signed_private_transactions.insert(transaction_hash); - } - SyncPropagator::send_packet(io, peer_id, SIGNED_PRIVATE_TRANSACTION_PACKET, packet.clone()); + SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone()); } } @@ -431,7 +417,6 @@ mod tests { ask_time: Instant::now(), last_sent_transactions: HashSet::new(), last_sent_private_transactions: HashSet::new(), - last_sent_signed_private_transactions: HashSet::new(), expired: false, confirmation: ForkConfirmation::Confirmed, snapshot_number: None, diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 4f8bf0ed06d..f800266193d 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -32,7 +32,7 @@ use ethcore::miner::Miner; use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; use api::WARP_SYNC_PROTOCOL_ID; -use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3}; +use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; use SyncConfig; use private_tx::SimplePrivateTxHandler; @@ -230,9 +230,9 @@ impl EthPeer where C: FlushingBlockChainClient { match message { ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data), ChainMessageType::PrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_private_transaction(&mut io, transaction_hash, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PRIVATE_TRANSACTION_PACKET, data), ChainMessageType::SignedPrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_signed_private_transaction(&mut io, transaction_hash, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, data), } } diff --git a/ethcore/sync/src/tests/private.rs b/ethcore/sync/src/tests/private.rs index 06d6e73a1f0..7f817c29f0f 100644 --- a/ethcore/sync/src/tests/private.rs +++ b/ethcore/sync/src/tests/private.rs @@ -24,11 +24,12 @@ use ethcore::CreateContractAddress; use transaction::{Transaction, Action}; use ethcore::executive::{contract_address}; use ethcore::test_helpers::{push_block_with_transactions}; -use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer}; +use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer, SignedPrivateTransaction}; use ethcore::account_provider::AccountProvider; use ethkey::{KeyPair}; use tests::helpers::{TestNet, TestIoHandler}; use rustc_hex::FromHex; +use rlp::Rlp; use SyncConfig; fn seal_spec() -> Spec { @@ -144,7 +145,8 @@ fn send_private_transaction() { //process signed response let signed_private_transaction = received_signed_private_transactions[0].clone(); assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok()); - assert!(pm0.process_signatures().is_ok()); + let signature: SignedPrivateTransaction = Rlp::new(&signed_private_transaction).as_val().unwrap(); + assert!(pm0.process_signature(signature).is_ok()); let local_transactions = net.peer(0).miner.local_transactions(); assert_eq!(local_transactions.len(), 1); } From c8898cde3eb6808140254ea79811cb1d318a489b Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Fri, 13 Jul 2018 16:54:53 +0200 Subject: [PATCH 09/19] Signed transaction taken from verified --- ethcore/private-tx/src/private_transactions.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index 21d61d36ad9..623a5a54795 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -213,14 +213,16 @@ impl VerificationStore { let options = self.verification_options.clone(); // Use pool's verifying pipeline for original transaction's verification let verifier = pool::verifier::Verifier::new(client, options, Default::default()); - let _verified_tx = verifier.verify_transaction(pool::verifier::Transaction::Unverified(transaction.clone()))?; - let signed_tx = SignedTransaction::new(transaction)?; + let verified_tx = verifier.verify_transaction(pool::verifier::Transaction::Unverified(transaction.clone()))?; + let signed_tx: SignedTransaction = verified_tx.signed().clone(); + let signed_hash = signed_tx.hash(); + let signed_sender = signed_tx.sender(); let verified = VerifiedPrivateTransaction { private_transaction, validator_account, - transaction: signed_tx.clone(), - transaction_hash: signed_tx.hash(), - transaction_sender: signed_tx.sender(), + transaction: signed_tx, + transaction_hash: signed_hash, + transaction_sender: signed_sender, }; let mut pool = self.verification_pool.write(); pool.import(verified)?; From 59ddce1eed4c04f5d1f2feaa8fb20eaa8a283250 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Tue, 17 Jul 2018 12:25:15 +0200 Subject: [PATCH 10/19] Fix after merge --- ethcore/private-tx/src/private_transactions.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index 623a5a54795..670f9a435f7 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -114,15 +114,17 @@ impl txpool::Scoring for PrivateScoring { } } - fn should_replace(&self, old: &VerifiedPrivateTransaction, new: &VerifiedPrivateTransaction) -> bool { + fn should_replace(&self, old: &VerifiedPrivateTransaction, new: &VerifiedPrivateTransaction) -> txpool::scoring::Choice { if old.sender() == new.sender() { // prefer earliest transaction - if new.transaction.nonce < old.transaction.nonce { - return true - } + return match new.transaction.nonce.cmp(&old.transaction.nonce) { + cmp::Ordering::Less => txpool::scoring::Choice::ReplaceOld, + cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew, + cmp::Ordering::Equal => self.choose(old, new), + }; } - self.choose(old, new) == txpool::scoring::Choice::ReplaceOld + self.choose(old, new) } } @@ -212,7 +214,7 @@ impl VerificationStore { let options = self.verification_options.clone(); // Use pool's verifying pipeline for original transaction's verification - let verifier = pool::verifier::Verifier::new(client, options, Default::default()); + let verifier = pool::verifier::Verifier::new(client, options, Default::default(), None); let verified_tx = verifier.verify_transaction(pool::verifier::Transaction::Unverified(transaction.clone()))?; let signed_tx: SignedTransaction = verified_tx.signed().clone(); let signed_hash = signed_tx.hash(); From 7e15ebbd63caa4850add770f9d5e41d523224bf7 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Wed, 18 Jul 2018 13:32:07 +0200 Subject: [PATCH 11/19] Pool scoring generalized in order to use externally --- .../private-tx/src/private_transactions.rs | 66 ++++--------------- miner/src/pool/local_transactions.rs | 2 +- miner/src/pool/mod.rs | 37 +++++++++-- miner/src/pool/scoring.rs | 60 ++++++----------- 4 files changed, 63 insertions(+), 102 deletions(-) diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index 670f9a435f7..400511458bd 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -31,13 +31,10 @@ use txpool; use txpool::{VerifiedTransaction, Verifier}; use error::{Error, ErrorKind}; -type Pool = txpool::Pool; +type Pool = txpool::Pool; /// Maximum length for private transactions queues. const MAX_QUEUE_LEN: usize = 8312; -/// Transaction with the same (sender, nonce) can be replaced only if -/// `new_gas_price > old_gas_price + old_gas_price >> SHIFT` -const GAS_PRICE_BUMP_SHIFT: usize = 3; // 2 = 25%, 3 = 12.5%, 4 = 6.25% /// Desriptor for private transaction stored in queue for verification #[derive(Debug, Clone, PartialEq, Eq)] @@ -71,60 +68,19 @@ impl txpool::VerifiedTransaction for VerifiedPrivateTransaction { } } -#[derive(Debug)] -pub struct PrivateScoring; - -impl txpool::Scoring for PrivateScoring { - type Score = U256; - type Event = (); - - fn compare(&self, old: &VerifiedPrivateTransaction, other: &VerifiedPrivateTransaction) -> cmp::Ordering { - old.transaction.nonce.cmp(&other.transaction.nonce) - } - - fn choose(&self, old: &VerifiedPrivateTransaction, new: &VerifiedPrivateTransaction) -> txpool::scoring::Choice { - if old.transaction.nonce != new.transaction.nonce { - return txpool::scoring::Choice::InsertNew - } - - let old_gp = old.transaction.gas_price; - let new_gp = new.transaction.gas_price; - - let min_required_gp = old_gp + (old_gp >> GAS_PRICE_BUMP_SHIFT); - - match min_required_gp.cmp(&new_gp) { - cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew, - _ => txpool::scoring::Choice::ReplaceOld, - } +impl pool::ScoredTransaction for VerifiedPrivateTransaction { + fn priority(&self) -> pool::Priority { + pool::Priority::Regular } - fn update_scores(&self, txs: &[txpool::Transaction], scores: &mut [U256], change: txpool::scoring::Change) { - use self::txpool::scoring::Change; - - match change { - Change::Culled(_) => {}, - Change::RemovedAt(_) => {} - Change::InsertedAt(i) | Change::ReplacedAt(i) => { - assert!(i < txs.len()); - assert!(i < scores.len()); - - scores[i] = txs[i].transaction.transaction.gas_price; - }, - Change::Event(_) => {} - } + /// Gets transaction gas price. + fn gas_price(&self) -> U256 { + self.transaction.gas_price } - fn should_replace(&self, old: &VerifiedPrivateTransaction, new: &VerifiedPrivateTransaction) -> txpool::scoring::Choice { - if old.sender() == new.sender() { - // prefer earliest transaction - return match new.transaction.nonce.cmp(&old.transaction.nonce) { - cmp::Ordering::Less => txpool::scoring::Choice::ReplaceOld, - cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew, - cmp::Ordering::Equal => self.choose(old, new), - }; - } - - self.choose(old, new) + /// Gets transaction nonce. + fn nonce(&self) -> U256 { + self.transaction.nonce } } @@ -184,7 +140,7 @@ impl Default for VerificationStore { verification_pool: RwLock::new( txpool::Pool::new( txpool::NoopListener, - PrivateScoring, + pool::scoring::NonceAndGasPrice(pool::PrioritizationStrategy::GasPriceOnly), pool::Options { max_count: MAX_QUEUE_LEN, max_per_sender: MAX_QUEUE_LEN / 10, diff --git a/miner/src/pool/local_transactions.rs b/miner/src/pool/local_transactions.rs index d69da3347ac..e3287613944 100644 --- a/miner/src/pool/local_transactions.rs +++ b/miner/src/pool/local_transactions.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use ethereum_types::H256; use linked_hash_map::LinkedHashMap; -use pool::VerifiedTransaction as Transaction; +use pool::{VerifiedTransaction as Transaction, ScoredTransaction}; use txpool::{self, VerifiedTransaction}; /// Status of local transaction. diff --git a/miner/src/pool/mod.rs b/miner/src/pool/mod.rs index 4a1223226d4..753e558b07a 100644 --- a/miner/src/pool/mod.rs +++ b/miner/src/pool/mod.rs @@ -24,10 +24,10 @@ use txpool; mod listener; mod queue; mod ready; -mod scoring; pub mod client; pub mod local_transactions; +pub mod scoring; pub mod verifier; #[cfg(test)] @@ -84,7 +84,7 @@ impl PendingSettings { /// Transaction priority. #[derive(Debug, PartialEq, Eq, PartialOrd, Clone, Copy)] -pub(crate) enum Priority { +pub enum Priority { /// Regular transactions received over the network. (no priority boost) Regular, /// Transactions from retracted blocks (medium priority) @@ -108,6 +108,18 @@ impl Priority { } } +/// Scoring properties for verified transaction. +pub trait ScoredTransaction { + /// Gets transaction priority. + fn priority(&self) -> Priority; + + /// Gets transaction gas price. + fn gas_price(&self) -> U256; + + /// Gets transaction nonce. + fn nonce(&self) -> U256; +} + /// Verified transaction stored in the pool. #[derive(Debug, PartialEq, Eq)] pub struct VerifiedTransaction { @@ -137,11 +149,6 @@ impl VerifiedTransaction { } } - /// Gets transaction priority. - pub(crate) fn priority(&self) -> Priority { - self.priority - } - /// Gets transaction insertion id. pub(crate) fn insertion_id(&self) -> usize { self.insertion_id @@ -175,3 +182,19 @@ impl txpool::VerifiedTransaction for VerifiedTransaction { &self.sender } } + +impl ScoredTransaction for VerifiedTransaction { + fn priority(&self) -> Priority { + self.priority + } + + /// Gets transaction gas price. + fn gas_price(&self) -> U256 { + self.transaction.gas_price + } + + /// Gets transaction nonce. + fn nonce(&self) -> U256 { + self.transaction.nonce + } +} diff --git a/miner/src/pool/scoring.rs b/miner/src/pool/scoring.rs index 3592204054d..f66305dc712 100644 --- a/miner/src/pool/scoring.rs +++ b/miner/src/pool/scoring.rs @@ -31,7 +31,7 @@ use std::cmp; use ethereum_types::U256; use txpool::{self, scoring}; -use super::{verifier, PrioritizationStrategy, VerifiedTransaction}; +use super::{verifier, PrioritizationStrategy, VerifiedTransaction, ScoredTransaction}; /// Transaction with the same (sender, nonce) can be replaced only if /// `new_gas_price > old_gas_price + old_gas_price >> SHIFT` @@ -67,21 +67,21 @@ impl NonceAndGasPrice { } } -impl txpool::Scoring for NonceAndGasPrice { +impl txpool::Scoring

for NonceAndGasPrice where P: ScoredTransaction + txpool::VerifiedTransaction + Sized { type Score = U256; type Event = (); - fn compare(&self, old: &VerifiedTransaction, other: &VerifiedTransaction) -> cmp::Ordering { - old.transaction.nonce.cmp(&other.transaction.nonce) + fn compare(&self, old: &P, other: &P) -> cmp::Ordering { + old.nonce().cmp(&other.nonce()) } - fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice { - if old.transaction.nonce != new.transaction.nonce { + fn choose(&self, old: &P, new: &P) -> scoring::Choice { + if old.nonce() != new.nonce() { return scoring::Choice::InsertNew } - let old_gp = old.transaction.gas_price; - let new_gp = new.transaction.gas_price; + let old_gp = old.gas_price(); + let new_gp = new.gas_price(); let min_required_gp = bump_gas_price(old_gp); @@ -91,7 +91,7 @@ impl txpool::Scoring for NonceAndGasPrice { } } - fn update_scores(&self, txs: &[txpool::Transaction], scores: &mut [U256], change: scoring::Change) { + fn update_scores(&self, txs: &[txpool::Transaction

], scores: &mut [U256], change: scoring::Change) { use self::scoring::Change; match change { @@ -101,7 +101,7 @@ impl txpool::Scoring for NonceAndGasPrice { assert!(i < txs.len()); assert!(i < scores.len()); - scores[i] = txs[i].transaction.transaction.gas_price; + scores[i] = txs[i].transaction.gas_price(); let boost = match txs[i].priority() { super::Priority::Local => 15, super::Priority::Retracted => 10, @@ -122,10 +122,10 @@ impl txpool::Scoring for NonceAndGasPrice { } } - fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice { - if old.sender == new.sender { + fn should_replace(&self, old: &P, new: &P) -> scoring::Choice { + if old.sender() == new.sender() { // prefer earliest transaction - match new.transaction.nonce.cmp(&old.transaction.nonce) { + match new.nonce().cmp(&old.nonce()) { cmp::Ordering::Less => scoring::Choice::ReplaceOld, cmp::Ordering::Greater => scoring::Choice::RejectNew, cmp::Ordering::Equal => self.choose(old, new), @@ -134,8 +134,8 @@ impl txpool::Scoring for NonceAndGasPrice { // accept local transactions over the limit scoring::Choice::InsertNew } else { - let old_score = (old.priority(), old.transaction.gas_price); - let new_score = (new.priority(), new.transaction.gas_price); + let old_score = (old.priority(), old.gas_price()); + let new_score = (new.priority(), new.gas_price()); if new_score > old_score { scoring::Choice::ReplaceOld } else { @@ -181,12 +181,8 @@ mod tests { }; let keypair = Random.generate().unwrap(); - let txs = vec![tx1, tx2, tx3, tx4].into_iter().enumerate().map(|(i, tx)| { - let verified = tx.unsigned().sign(keypair.secret(), None).verified(); - txpool::Transaction { - insertion_id: i as u64, - transaction: Arc::new(verified), - } + let txs = vec![tx1, tx2, tx3, tx4].into_iter().enumerate().map(|(_, tx)| { + tx.unsigned().sign(keypair.secret(), None).verified() }).collect::>(); assert_eq!(scoring.should_replace(&txs[0], &txs[1]), RejectNew); @@ -209,11 +205,7 @@ mod tests { gas_price: 1, ..Default::default() }; - let verified_tx = tx.signed().verified(); - txpool::Transaction { - insertion_id: 0, - transaction: Arc::new(verified_tx), - } + tx.signed().verified() }; let tx_regular_high_gas = { let tx = Tx { @@ -221,11 +213,7 @@ mod tests { gas_price: 10, ..Default::default() }; - let verified_tx = tx.signed().verified(); - txpool::Transaction { - insertion_id: 1, - transaction: Arc::new(verified_tx), - } + tx.signed().verified() }; let tx_local_low_gas = { let tx = Tx { @@ -235,10 +223,7 @@ mod tests { }; let mut verified_tx = tx.signed().verified(); verified_tx.priority = ::pool::Priority::Local; - txpool::Transaction { - insertion_id: 2, - transaction: Arc::new(verified_tx), - } + verified_tx }; let tx_local_high_gas = { let tx = Tx { @@ -248,10 +233,7 @@ mod tests { }; let mut verified_tx = tx.signed().verified(); verified_tx.priority = ::pool::Priority::Local; - txpool::Transaction { - insertion_id: 3, - transaction: Arc::new(verified_tx), - } + verified_tx }; assert_eq!(scoring.should_replace(&tx_regular_low_gas, &tx_regular_high_gas), ReplaceOld); From a11c5e5d9d6e98412692cd2e80c51b37fbd99c9a Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Thu, 9 Aug 2018 16:00:21 +0200 Subject: [PATCH 12/19] Lib refactored according to the review comments --- ethcore/private-tx/src/lib.rs | 71 +++++++++++++++--------------- ethcore/private-tx/src/messages.rs | 14 +++++- 2 files changed, 47 insertions(+), 38 deletions(-) diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 945ff8c5a65..6a8d63d2566 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -193,7 +193,7 @@ impl Provider where { pub fn create_private_transaction(&self, signed_transaction: SignedTransaction) -> Result { trace!("Creating private transaction from regular transaction: {:?}", signed_transaction); if self.signer_account.is_none() { - trace!("Signing account not set"); + warn!("Signing account not set"); bail!(ErrorKind::SignerAccountNotSet); } let tx_hash = signed_transaction.hash(); @@ -222,7 +222,7 @@ impl Provider where { self.broadcast_private_transaction(private.hash(), private.rlp_bytes().into_vec()); Ok(Receipt { hash: tx_hash, - contract_address: None, + contract_address: Some(contract), status_code: 0, }) } @@ -238,14 +238,6 @@ impl Provider where { keccak(&state_buf.as_ref()) } - /// Extract signed transaction from private transaction - fn extract_original_transaction(&self, private: PrivateTransaction, contract: &Address) -> Result { - let encrypted_transaction = private.encrypted; - let transaction_bytes = self.decrypt(contract, &encrypted_transaction)?; - let original_transaction: UnverifiedTransaction = Rlp::new(&transaction_bytes).as_val()?; - Ok(original_transaction) - } - fn pool_client<'a>(&'a self, nonce_cache: &'a NonceCache) -> miner::pool_client::PoolClient<'a, Client> { let engine = self.client.engine(); let refuse_service_transactions = true; @@ -261,34 +253,31 @@ impl Provider where { /// Retrieve and verify the first available private transaction for every sender fn process_verification_queue(&self) -> Result<(), Error> { let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE); - let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache)); - for transaction in ready_transactions { + let process_transaction = |transaction: &VerifiedPrivateTransaction| -> Result<_, String> { let private_hash = transaction.private_transaction.hash(); match transaction.validator_account { None => { trace!("Propagating transaction further"); self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); - continue; + return Ok(()); } Some(validator_account) => { if !self.validator_accounts.contains(&validator_account) { trace!("Propagating transaction further"); self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); - continue; + return Ok(()); } let tx_action = transaction.transaction.action.clone(); if let Action::Call(contract) = tx_action { // TODO [ToDr] Usage of BlockId::Latest let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest); if let Err(e) = contract_nonce { - warn!("Cannot retrieve contract nonce: {:?}", e); - continue; + bail!("Cannot retrieve contract nonce: {:?}", e); } let contract_nonce = contract_nonce.expect("Error was checked before"); let private_state = self.execute_private_transaction(BlockId::Latest, &transaction.transaction); if let Err(e) = private_state { - warn!("Cannot retrieve private state: {:?}", e); - continue; + bail!("Cannot retrieve private state: {:?}", e); } let private_state = private_state.expect("Error was checked before"); let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); @@ -296,18 +285,24 @@ impl Provider where { let password = find_account_password(&self.passwords, &*self.accounts, &validator_account); let signed_state = self.accounts.sign(validator_account, password, private_state_hash); if let Err(e) = signed_state { - warn!("Cannot sign the state: {:?}", e); - continue; + bail!("Cannot sign the state: {:?}", e); } let signed_state = signed_state.expect("Error was checked before"); let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None); trace!("Sending signature for private transaction: {:?}", signed_private_transaction); self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes().into_vec()); } else { - warn!("Incorrect type of action for the transaction"); + bail!("Incorrect type of action for the transaction"); } } } + return Ok(()); + }; + let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache)); + for transaction in ready_transactions { + if let Err(e) = process_transaction(&transaction) { + warn!("Error: {:?}", e); + } } Ok(()) } @@ -350,7 +345,7 @@ impl Provider where { match self.miner.import_own_transaction(&*self.client, signed.into()) { Ok(_) => trace!("Public transaction added to queue"), Err(err) => { - trace!("Failed to add transaction to queue, error: {:?}", err); + warn!("Failed to add transaction to queue, error: {:?}", err); bail!(err); } } @@ -358,7 +353,7 @@ impl Provider where { match self.transactions_for_signing.write().remove(&private_hash) { Ok(_) => {} Err(err) => { - trace!("Failed to remove transaction from signing store, error: {:?}", err); + warn!("Failed to remove transaction from signing store, error: {:?}", err); bail!(err); } } @@ -367,7 +362,7 @@ impl Provider where { match self.transactions_for_signing.write().add_signature(&private_hash, signed_tx.signature()) { Ok(_) => trace!("Signature stored for private transaction"), Err(err) => { - trace!("Failed to add signature to signing store, error: {:?}", err); + warn!("Failed to add signature to signing store, error: {:?}", err); bail!(err); } } @@ -388,13 +383,13 @@ impl Provider where { Ok(desc.received_signatures.len() + 1 == desc.validators.len()) } false => { - trace!("Sender's state doesn't correspond to validator's"); + warn!("Sender's state doesn't correspond to validator's"); bail!(ErrorKind::StateIncorrect); } } } Err(err) => { - trace!("Sender's state doesn't correspond to validator's, error {:?}", err); + warn!("Sender's state doesn't correspond to validator's, error {:?}", err); bail!(err); } } @@ -638,16 +633,19 @@ impl Importer for Arc { trace!("Private transaction received"); let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?; let private_tx_hash = private_tx.hash(); - let contract = private_tx.contract; + let contract = private_tx.contract(); let contract_validators = self.get_validators(BlockId::Latest, &contract)?; let validation_account = contract_validators .iter() .find(|address| self.validator_accounts.contains(address)); - let original_tx = self.extract_original_transaction(private_tx.clone(), &contract)?; - trace!("Original transaction: {:?}", original_tx); + //extract the original transaction + let encrypted_data = private_tx.encrypted(); + let transaction_bytes = self.decrypt(&contract, &encrypted_data)?; + let original_tx: UnverifiedTransaction = Rlp::new(&transaction_bytes).as_val()?; let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE); + //add to the queue for further verification self.transactions_for_verification.add_transaction( original_tx, validation_account.map(|&account| account), @@ -655,14 +653,15 @@ impl Importer for Arc { self.pool_client(&nonce_cache), )?; let provider = Arc::downgrade(self); - if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| { + let result = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { if let Err(e) = provider.process_verification_queue() { - debug!("Unable to process the queue: {}", e); + warn!("Unable to process the queue: {}", e); } } - })) { - trace!("Error sending NewPrivateTransaction message: {:?}", e); + })); + if let Err(e) = result { + warn!("Error sending NewPrivateTransaction message: {:?}", e); } Ok(private_tx_hash) } @@ -675,11 +674,11 @@ impl Importer for Arc { if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { if let Err(e) = provider.process_signature(tx.clone()) { - debug!("Unable to process the signature: {}", e); + warn!("Unable to process the signature: {}", e); } } })) { - trace!("Error sending NewSignedPrivateTransaction message: {:?}", e); + warn!("Error sending NewSignedPrivateTransaction message: {:?}", e); } Ok(private_hash) } @@ -700,7 +699,7 @@ impl ChainNotify for Provider { if !imported.is_empty() { trace!("New blocks imported, try to prune the queue"); if let Err(err) = self.process_verification_queue() { - trace!("Cannot prune private transactions queue. error: {:?}", err); + warn!("Cannot prune private transactions queue. error: {:?}", err); } } } diff --git a/ethcore/private-tx/src/messages.rs b/ethcore/private-tx/src/messages.rs index 2629ae79b33..c0825fb59b6 100644 --- a/ethcore/private-tx/src/messages.rs +++ b/ethcore/private-tx/src/messages.rs @@ -25,9 +25,9 @@ use transaction::signature::{add_chain_replay_protection, check_replay_protectio #[derive(Default, Debug, Clone, PartialEq, RlpEncodable, RlpDecodable, Eq)] pub struct PrivateTransaction { /// Encrypted data - pub encrypted: Bytes, + encrypted: Bytes, /// Address of the contract - pub contract: Address, + contract: Address, /// Hash hash: H256, } @@ -51,6 +51,16 @@ impl PrivateTransaction { pub fn hash(&self) -> H256 { self.hash } + + /// Address of the contract + pub fn contract(&self) -> Address { + self.contract + } + + /// Encrypted data + pub fn encrypted(&self) -> Bytes { + self.encrypted.clone() + } } /// Message about private transaction's signing From 32e558c02db91726127ef8811a3e586f005d1cda Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Thu, 9 Aug 2018 19:59:43 +0200 Subject: [PATCH 13/19] Ready state refactored --- .../private-tx/src/private_transactions.rs | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index 400511458bd..22ea2f2b5c2 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -16,8 +16,7 @@ use std::sync::Arc; use std::cmp; -use std::collections::HashMap; -use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use bytes::Bytes; use ethcore_miner::pool; @@ -84,11 +83,11 @@ impl pool::ScoredTransaction for VerifiedPrivateTransaction { } } -/// Checks readiness of transactions by comparing the nonce to state nonce. +/// Checks readiness of transactions by looking if the transaction from sender already exists. /// Guarantees only one transaction per sender #[derive(Debug)] pub struct PrivateReadyState { - nonces: HashMap, + senders: HashSet

, state: C, } @@ -98,7 +97,7 @@ impl PrivateReadyState { state: C, ) -> Self { PrivateReadyState { - nonces: Default::default(), + senders: Default::default(), state, } } @@ -109,20 +108,14 @@ impl txpool::Ready for let sender = tx.sender(); let state = &self.state; let state_nonce = state.account_nonce(sender); - match self.nonces.entry(*sender) { - Entry::Vacant(entry) => { - let nonce = entry.insert(state_nonce); - match tx.transaction.nonce.cmp(nonce) { - cmp::Ordering::Greater => txpool::Readiness::Future, - cmp::Ordering::Less => txpool::Readiness::Stale, - cmp::Ordering::Equal => { - *nonce = *nonce + 1.into(); - txpool::Readiness::Ready - }, - } - } - Entry::Occupied(_) => { - txpool::Readiness::Future + if self.senders.contains(sender) { + txpool::Readiness::Future + } else { + self.senders.insert(*sender); + match tx.transaction.nonce.cmp(&state_nonce) { + cmp::Ordering::Greater => txpool::Readiness::Future, + cmp::Ordering::Less => txpool::Readiness::Stale, + cmp::Ordering::Equal => txpool::Readiness::Ready, } } } From 3735cf4cfcea8b1cb16462023fecfd8d44a58da0 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Thu, 9 Aug 2018 20:42:26 +0200 Subject: [PATCH 14/19] Redundant bound and copying removed --- ethcore/private-tx/src/private_transactions.rs | 4 ++-- ethcore/sync/src/chain/mod.rs | 11 ++++------- miner/src/pool/mod.rs | 6 +++--- miner/src/pool/scoring.rs | 6 +++--- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index 22ea2f2b5c2..e4b7970aaca 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -73,8 +73,8 @@ impl pool::ScoredTransaction for VerifiedPrivateTransaction { } /// Gets transaction gas price. - fn gas_price(&self) -> U256 { - self.transaction.gas_price + fn gas_price(&self) -> &U256 { + &self.transaction.gas_price } /// Gets transaction nonce. diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index fa3b5beb948..30304d4558e 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -1093,13 +1093,10 @@ impl ChainSync { // Select random peer to re-broadcast transactions to. let peer = random::new().gen_range(0, self.peers.len()); trace!(target: "sync", "Re-broadcasting transactions to a random peer."); - self.peers.values_mut().nth(peer).map(|peer_info| - peer_info.last_sent_transactions.clear() - ); - - //re-broadcast private packets as well - self.peers.values_mut().nth(peer).map(|peer_info| - peer_info.reset_private_stats() + self.peers.values_mut().nth(peer).map(|peer_info| { + peer_info.last_sent_transactions.clear(); + peer_info.reset_private_stats() + } ); } } diff --git a/miner/src/pool/mod.rs b/miner/src/pool/mod.rs index 753e558b07a..ccfbba7f800 100644 --- a/miner/src/pool/mod.rs +++ b/miner/src/pool/mod.rs @@ -114,7 +114,7 @@ pub trait ScoredTransaction { fn priority(&self) -> Priority; /// Gets transaction gas price. - fn gas_price(&self) -> U256; + fn gas_price(&self) -> &U256; /// Gets transaction nonce. fn nonce(&self) -> U256; @@ -189,8 +189,8 @@ impl ScoredTransaction for VerifiedTransaction { } /// Gets transaction gas price. - fn gas_price(&self) -> U256 { - self.transaction.gas_price + fn gas_price(&self) -> &U256 { + &self.transaction.gas_price } /// Gets transaction nonce. diff --git a/miner/src/pool/scoring.rs b/miner/src/pool/scoring.rs index f66305dc712..f4e75556cd7 100644 --- a/miner/src/pool/scoring.rs +++ b/miner/src/pool/scoring.rs @@ -67,7 +67,7 @@ impl NonceAndGasPrice { } } -impl txpool::Scoring

for NonceAndGasPrice where P: ScoredTransaction + txpool::VerifiedTransaction + Sized { +impl

txpool::Scoring

for NonceAndGasPrice where P: ScoredTransaction + txpool::VerifiedTransaction { type Score = U256; type Event = (); @@ -83,7 +83,7 @@ impl txpool::Scoring

for NonceAndGasPrice where P: ScoredTransacti let old_gp = old.gas_price(); let new_gp = new.gas_price(); - let min_required_gp = bump_gas_price(old_gp); + let min_required_gp = bump_gas_price(*old_gp); match min_required_gp.cmp(&new_gp) { cmp::Ordering::Greater => scoring::Choice::RejectNew, @@ -101,7 +101,7 @@ impl txpool::Scoring

for NonceAndGasPrice where P: ScoredTransacti assert!(i < txs.len()); assert!(i < scores.len()); - scores[i] = txs[i].transaction.gas_price(); + scores[i] = *txs[i].transaction.gas_price(); let boost = match txs[i].priority() { super::Priority::Local => 15, super::Priority::Retracted => 10, From eb2ec3f019494800ea9819a0eab08ce71655756c Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Fri, 10 Aug 2018 13:36:38 +0200 Subject: [PATCH 15/19] Fixed build after the merge --- Cargo.lock | 8 ++++---- miner/src/pool/scoring.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38deb679ec5..7e62fd7662f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -668,7 +668,7 @@ dependencies = [ "rlp 0.2.1 (git+https://github.com/paritytech/parity-common)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "trace-time 0.1.0", - "transaction-pool 1.12.1", + "transaction-pool 1.12.2", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -758,7 +758,7 @@ dependencies = [ "serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "transaction-pool 1.12.1", + "transaction-pool 1.12.2", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2171,7 +2171,7 @@ dependencies = [ "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "transaction-pool 1.12.1", + "transaction-pool 1.12.2", "transient-hashmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "vm 0.1.0", ] @@ -3330,7 +3330,7 @@ dependencies = [ [[package]] name = "transaction-pool" -version = "1.12.1" +version = "1.12.2" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/miner/src/pool/scoring.rs b/miner/src/pool/scoring.rs index 636071c29c7..fab3208dc57 100644 --- a/miner/src/pool/scoring.rs +++ b/miner/src/pool/scoring.rs @@ -144,7 +144,7 @@ impl

txpool::Scoring

for NonceAndGasPrice where P: ScoredTransaction + txp } } - fn should_ignore_sender_limit(&self, new: &VerifiedTransaction) -> bool { + fn should_ignore_sender_limit(&self, new: &P) -> bool { new.priority().is_local() } } From 2778d4e99bb25c8e2df3f15617e57b16156fbc10 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Fri, 10 Aug 2018 13:44:02 +0200 Subject: [PATCH 16/19] Forgotten case reworked --- ethcore/private-tx/src/lib.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index cda54d1c5a3..8364b75bbc4 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -656,12 +656,12 @@ impl Importer for Arc { )?; let provider = Arc::downgrade(self); let result = self.channel.send(ClientIoMessage::execute(move |_| { - if let Some(provider) = provider.upgrade() { - if let Err(e) = provider.process_verification_queue() { - warn!("Unable to process the queue: {}", e); - } - } - })); + if let Some(provider) = provider.upgrade() { + if let Err(e) = provider.process_verification_queue() { + warn!("Unable to process the queue: {}", e); + } + } + })); if let Err(e) = result { warn!("Error sending NewPrivateTransaction message: {:?}", e); } @@ -673,13 +673,14 @@ impl Importer for Arc { trace!("Signature for private transaction received: {:?}", tx); let private_hash = tx.private_transaction_hash(); let provider = Arc::downgrade(self); - if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| { - if let Some(provider) = provider.upgrade() { - if let Err(e) = provider.process_signature(tx.clone()) { - warn!("Unable to process the signature: {}", e); - } - } - })) { + let result = self.channel.send(ClientIoMessage::execute(move |_| { + if let Some(provider) = provider.upgrade() { + if let Err(e) = provider.process_signature(tx.clone()) { + warn!("Unable to process the signature: {}", e); + } + } + })); + if let Err(e) = result { warn!("Error sending NewSignedPrivateTransaction message: {:?}", e); } Ok(private_hash) From af282360f1b0880783dee46a5d5623cec315f516 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Fri, 24 Aug 2018 15:53:27 +0200 Subject: [PATCH 17/19] Review comments fixed --- ethcore/private-tx/src/lib.rs | 15 ++++++--------- ethcore/private-tx/src/private_transactions.rs | 11 ++++++----- ethcore/sync/src/private_tx.rs | 4 ++-- miner/src/pool/scoring.rs | 2 +- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 8364b75bbc4..7702f469a57 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -296,7 +296,7 @@ impl Provider where { } } } - return Ok(()); + Ok(()) }; let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache)); for transaction in ready_transactions { @@ -309,7 +309,7 @@ impl Provider where { /// Add signed private transaction into the store /// Creates corresponding public transaction if last required signature collected and sends it to the chain - pub fn process_signature(&self, signed_tx: SignedPrivateTransaction) -> Result<(), Error> { + pub fn process_signature(&self, signed_tx: &SignedPrivateTransaction) -> Result<(), Error> { trace!("Processing signed private transaction"); let private_hash = signed_tx.private_transaction_hash(); let desc = match self.transactions_for_signing.read().get(&private_hash) { @@ -350,12 +350,9 @@ impl Provider where { } } //Remove from store for signing - match self.transactions_for_signing.write().remove(&private_hash) { - Ok(_) => {} - Err(err) => { - warn!("Failed to remove transaction from signing store, error: {:?}", err); - bail!(err); - } + if let Err(err) = self.transactions_for_signing.write().remove(&private_hash) { + warn!("Failed to remove transaction from signing store, error: {:?}", err); + bail!(err); } } else { //Add signature to the store @@ -675,7 +672,7 @@ impl Importer for Arc { let provider = Arc::downgrade(self); let result = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { - if let Err(e) = provider.process_signature(tx.clone()) { + if let Err(e) = provider.process_signature(&tx) { warn!("Unable to process the signature: {}", e); } } diff --git a/ethcore/private-tx/src/private_transactions.rs b/ethcore/private-tx/src/private_transactions.rs index 60647438b90..a0f58f9cabc 100644 --- a/ethcore/private-tx/src/private_transactions.rs +++ b/ethcore/private-tx/src/private_transactions.rs @@ -35,18 +35,18 @@ type Pool = txpool::Pool, - /// Resulted verified + /// Resulting verified transaction pub transaction: SignedTransaction, - /// Original transaction's hash + /// Original transaction hash pub transaction_hash: H256, - /// Original transaction's sender + /// Original transaction sender pub transaction_sender: Address, } @@ -165,7 +165,8 @@ impl VerificationStore { let options = self.verification_options.clone(); // Use pool's verifying pipeline for original transaction's verification let verifier = pool::verifier::Verifier::new(client, options, Default::default(), None); - let verified_tx = verifier.verify_transaction(pool::verifier::Transaction::Unverified(transaction.clone()))?; + let unverified = pool::verifier::Transaction::Unverified(transaction); + let verified_tx = verifier.verify_transaction(unverified)?; let signed_tx: SignedTransaction = verified_tx.signed().clone(); let signed_hash = signed_tx.hash(); let signed_sender = signed_tx.sender(); diff --git a/ethcore/sync/src/private_tx.rs b/ethcore/sync/src/private_tx.rs index c8c8528ed91..03928c22d6f 100644 --- a/ethcore/sync/src/private_tx.rs +++ b/ethcore/sync/src/private_tx.rs @@ -20,11 +20,11 @@ use ethereum_types::H256; /// Trait which should be implemented by a private transaction handler. pub trait PrivateTxHandler: Send + Sync + 'static { /// Function called on new private transaction received. - /// Returns hash of the imported transaction + /// Returns the hash of the imported transaction fn import_private_transaction(&self, rlp: &[u8]) -> Result; /// Function called on new signed private transaction received. - /// Returns hash of the imported transaction + /// Returns the hash of the imported transaction fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result; } diff --git a/miner/src/pool/scoring.rs b/miner/src/pool/scoring.rs index fab3208dc57..0ea4e258b5c 100644 --- a/miner/src/pool/scoring.rs +++ b/miner/src/pool/scoring.rs @@ -185,7 +185,7 @@ mod tests { }; let keypair = Random.generate().unwrap(); - let txs = vec![tx1, tx2, tx3, tx4].into_iter().enumerate().map(|(_, tx)| { + let txs = vec![tx1, tx2, tx3, tx4].into_iter().map(|(_, tx)| { tx.unsigned().sign(keypair.secret(), None).verified() }).collect::>(); From f6338ebfc3152a8426bfd366505ea7bb7b2f8ec1 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Fri, 24 Aug 2018 16:28:12 +0200 Subject: [PATCH 18/19] Logging reworked, target added --- ethcore/private-tx/src/encryptor.rs | 2 +- ethcore/private-tx/src/lib.rs | 62 ++++++++++++++--------------- ethcore/service/src/service.rs | 16 +++++++- 3 files changed, 46 insertions(+), 34 deletions(-) diff --git a/ethcore/private-tx/src/encryptor.rs b/ethcore/private-tx/src/encryptor.rs index e64917add0a..c1d3d3fb8a6 100644 --- a/ethcore/private-tx/src/encryptor.rs +++ b/ethcore/private-tx/src/encryptor.rs @@ -208,7 +208,7 @@ impl Encryptor for SecretStoreEncryptor { let key = match self.retrieve_key("", false, contract_address, &*accounts) { Ok(key) => Ok(key), Err(Error(ErrorKind::EncryptionKeyNotFound(_), _)) => { - trace!("Key for account wasnt found in sstore. Creating. Address: {:?}", contract_address); + trace!(target: "privatetx", "Key for account wasnt found in sstore. Creating. Address: {:?}", contract_address); self.retrieve_key(&format!("/{}", self.config.threshold), true, contract_address, &*accounts) } Err(err) => Err(err), diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 7702f469a57..24727fe0f5b 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -191,9 +191,9 @@ impl Provider where { /// 3. Save it with state returned on prev step to the queue for signing /// 4. Broadcast corresponding message to the chain pub fn create_private_transaction(&self, signed_transaction: SignedTransaction) -> Result { - trace!("Creating private transaction from regular transaction: {:?}", signed_transaction); + trace!(target: "privatetx", "Creating private transaction from regular transaction: {:?}", signed_transaction); if self.signer_account.is_none() { - warn!("Signing account not set"); + warn!(target: "privatetx", "Signing account not set"); bail!(ErrorKind::SignerAccountNotSet); } let tx_hash = signed_transaction.hash(); @@ -213,11 +213,11 @@ impl Provider where { // in private-tx to avoid such mistakes. let contract_nonce = self.get_contract_nonce(&contract, BlockId::Latest)?; let private_state = self.execute_private_transaction(BlockId::Latest, &signed_transaction)?; - trace!("Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state); + trace!(target: "privatetx", "Private transaction created, encrypted transaction: {:?}, private state: {:?}", private, private_state); let contract_validators = self.get_validators(BlockId::Latest, &contract)?; - trace!("Required validators: {:?}", contract_validators); + trace!(target: "privatetx", "Required validators: {:?}", contract_validators); let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); - trace!("Hashed effective private state for sender: {:?}", private_state_hash); + trace!(target: "privatetx", "Hashed effective private state for sender: {:?}", private_state_hash); self.transactions_for_signing.write().add_transaction(private.hash(), signed_transaction, contract_validators, private_state, contract_nonce)?; self.broadcast_private_transaction(private.hash(), private.rlp_bytes().into_vec()); Ok(Receipt { @@ -257,13 +257,13 @@ impl Provider where { let private_hash = transaction.private_transaction.hash(); match transaction.validator_account { None => { - trace!("Propagating transaction further"); + trace!(target: "privatetx", "Propagating transaction further"); self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); return Ok(()); } Some(validator_account) => { if !self.validator_accounts.contains(&validator_account) { - trace!("Propagating transaction further"); + trace!(target: "privatetx", "Propagating transaction further"); self.broadcast_private_transaction(private_hash, transaction.private_transaction.rlp_bytes().into_vec()); return Ok(()); } @@ -281,7 +281,7 @@ impl Provider where { } let private_state = private_state.expect("Error was checked before"); let private_state_hash = self.calculate_state_hash(&private_state, contract_nonce); - trace!("Hashed effective private state for validator: {:?}", private_state_hash); + trace!(target: "privatetx", "Hashed effective private state for validator: {:?}", private_state_hash); let password = find_account_password(&self.passwords, &*self.accounts, &validator_account); let signed_state = self.accounts.sign(validator_account, password, private_state_hash); if let Err(e) = signed_state { @@ -289,7 +289,7 @@ impl Provider where { } let signed_state = signed_state.expect("Error was checked before"); let signed_private_transaction = SignedPrivateTransaction::new(private_hash, signed_state, None); - trace!("Sending signature for private transaction: {:?}", signed_private_transaction); + trace!(target: "privatetx", "Sending signature for private transaction: {:?}", signed_private_transaction); self.broadcast_signed_private_transaction(signed_private_transaction.hash(), signed_private_transaction.rlp_bytes().into_vec()); } else { bail!("Incorrect type of action for the transaction"); @@ -301,7 +301,7 @@ impl Provider where { let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache)); for transaction in ready_transactions { if let Err(e) = process_transaction(&transaction) { - warn!("Error: {:?}", e); + warn!(target: "privatetx", "Error: {:?}", e); } } Ok(()) @@ -310,7 +310,7 @@ impl Provider where { /// Add signed private transaction into the store /// Creates corresponding public transaction if last required signature collected and sends it to the chain pub fn process_signature(&self, signed_tx: &SignedPrivateTransaction) -> Result<(), Error> { - trace!("Processing signed private transaction"); + trace!(target: "privatetx", "Processing signed private transaction"); let private_hash = signed_tx.private_transaction_hash(); let desc = match self.transactions_for_signing.read().get(&private_hash) { None => { @@ -334,7 +334,7 @@ impl Provider where { desc.original_transaction.nonce, desc.original_transaction.gas_price )?; - trace!("Last required signature received, public transaction created: {:?}", public_tx); + trace!(target: "privatetx", "Last required signature received, public transaction created: {:?}", public_tx); //Sign and add it to the queue let chain_id = desc.original_transaction.chain_id(); let hash = public_tx.hash(chain_id); @@ -343,23 +343,23 @@ impl Provider where { let signature = self.accounts.sign(signer_account, password, hash)?; let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?; match self.miner.import_own_transaction(&*self.client, signed.into()) { - Ok(_) => trace!("Public transaction added to queue"), + Ok(_) => trace!(target: "privatetx", "Public transaction added to queue"), Err(err) => { - warn!("Failed to add transaction to queue, error: {:?}", err); + warn!(target: "privatetx", "Failed to add transaction to queue, error: {:?}", err); bail!(err); } } //Remove from store for signing if let Err(err) = self.transactions_for_signing.write().remove(&private_hash) { - warn!("Failed to remove transaction from signing store, error: {:?}", err); + warn!(target: "privatetx", "Failed to remove transaction from signing store, error: {:?}", err); bail!(err); } } else { //Add signature to the store match self.transactions_for_signing.write().add_signature(&private_hash, signed_tx.signature()) { - Ok(_) => trace!("Signature stored for private transaction"), + Ok(_) => trace!(target: "privatetx", "Signature stored for private transaction"), Err(err) => { - warn!("Failed to add signature to signing store, error: {:?}", err); + warn!(target: "privatetx", "Failed to add signature to signing store, error: {:?}", err); bail!(err); } } @@ -380,13 +380,13 @@ impl Provider where { Ok(desc.received_signatures.len() + 1 == desc.validators.len()) } false => { - warn!("Sender's state doesn't correspond to validator's"); + warn!(target: "privatetx", "Sender's state doesn't correspond to validator's"); bail!(ErrorKind::StateIncorrect); } } } Err(err) => { - warn!("Sender's state doesn't correspond to validator's, error {:?}", err); + warn!(target: "privatetx", "Sender's state doesn't correspond to validator's, error {:?}", err); bail!(err); } } @@ -415,12 +415,12 @@ impl Provider where { } fn encrypt(&self, contract_address: &Address, initialisation_vector: &H128, data: &[u8]) -> Result { - trace!("Encrypt data using key(address): {:?}", contract_address); + trace!(target: "privatetx", "Encrypt data using key(address): {:?}", contract_address); Ok(self.encryptor.encrypt(contract_address, &*self.accounts, initialisation_vector, data)?) } fn decrypt(&self, contract_address: &Address, data: &[u8]) -> Result { - trace!("Decrypt data using key(address): {:?}", contract_address); + trace!(target: "privatetx", "Decrypt data using key(address): {:?}", contract_address); Ok(self.encryptor.decrypt(contract_address, &*self.accounts, data)?) } @@ -485,7 +485,7 @@ impl Provider where { Action::Call(ref contract_address) => { let contract_code = Arc::new(self.get_decrypted_code(contract_address, block)?); let contract_state = self.get_decrypted_state(contract_address, block)?; - trace!("Patching contract at {:?}, code: {:?}, state: {:?}", contract_address, contract_code, contract_state); + trace!(target: "privatetx", "Patching contract at {:?}, code: {:?}, state: {:?}", contract_address, contract_code, contract_state); state.patch_account(contract_address, contract_code, Self::snapshot_to_storage(contract_state))?; Some(*contract_address) }, @@ -513,7 +513,7 @@ impl Provider where { (enc_code, self.encrypt(&address, &Self::iv_from_transaction(transaction), &Self::snapshot_from_storage(&storage))?) }, }; - trace!("Private contract executed. code: {:?}, state: {:?}, result: {:?}", encrypted_code, encrypted_storage, result.output); + trace!(target: "privatetx", "Private contract executed. code: {:?}, state: {:?}, result: {:?}", encrypted_code, encrypted_storage, result.output); Ok(PrivateExecutionResult { code: encrypted_code, state: encrypted_storage, @@ -629,7 +629,7 @@ pub trait Importer { impl Importer for Arc { fn import_private_transaction(&self, rlp: &[u8]) -> Result { - trace!("Private transaction received"); + trace!(target: "privatetx", "Private transaction received"); let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?; let private_tx_hash = private_tx.hash(); let contract = private_tx.contract(); @@ -655,30 +655,30 @@ impl Importer for Arc { let result = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { if let Err(e) = provider.process_verification_queue() { - warn!("Unable to process the queue: {}", e); + warn!(target: "privatetx", "Unable to process the queue: {}", e); } } })); if let Err(e) = result { - warn!("Error sending NewPrivateTransaction message: {:?}", e); + warn!(target: "privatetx", "Error sending NewPrivateTransaction message: {:?}", e); } Ok(private_tx_hash) } fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result { let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?; - trace!("Signature for private transaction received: {:?}", tx); + trace!(target: "privatetx", "Signature for private transaction received: {:?}", tx); let private_hash = tx.private_transaction_hash(); let provider = Arc::downgrade(self); let result = self.channel.send(ClientIoMessage::execute(move |_| { if let Some(provider) = provider.upgrade() { if let Err(e) = provider.process_signature(&tx) { - warn!("Unable to process the signature: {}", e); + warn!(target: "privatetx", "Unable to process the signature: {}", e); } } })); if let Err(e) = result { - warn!("Error sending NewSignedPrivateTransaction message: {:?}", e); + warn!(target: "privatetx", "Error sending NewSignedPrivateTransaction message: {:?}", e); } Ok(private_hash) } @@ -697,9 +697,9 @@ fn find_account_password(passwords: &Vec, account_provider: &AccountPr impl ChainNotify for Provider { fn new_blocks(&self, imported: Vec, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { if !imported.is_empty() { - trace!("New blocks imported, try to prune the queue"); + trace!(target: "privatetx", "New blocks imported, try to prune the queue"); if let Err(err) = self.process_verification_queue() { - warn!("Cannot prune private transactions queue. error: {:?}", err); + warn!(target: "privatetx", "Cannot prune private transactions queue. error: {:?}", err); } } } diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index c247507f218..fb01f30ec04 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -56,11 +56,23 @@ impl PrivateTxService { impl PrivateTxHandler for PrivateTxService { fn import_private_transaction(&self, rlp: &[u8]) -> Result { - self.provider.import_private_transaction(rlp).map_err(|e| e.to_string()) + match self.provider.import_private_transaction(rlp) { + Ok(import_result) => Ok(import_result), + Err(err) => { + warn!(target: "privatetx", "Unable to import private transaction packet: {}", err); + bail!(err.to_string()) + } + } } fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result { - self.provider.import_signed_private_transaction(rlp).map_err(|e| e.to_string()) + match self.provider.import_signed_private_transaction(rlp) { + Ok(import_result) => Ok(import_result), + Err(err) => { + warn!(target: "privatetx", "Unable to import signed private transaction packet: {}", err); + bail!(err.to_string()) + } + } } } From 064d81441dd1aadc0fa01a6729fb3ed5f8407f48 Mon Sep 17 00:00:00 2001 From: Anton Gavrilov Date: Fri, 24 Aug 2018 17:35:32 +0200 Subject: [PATCH 19/19] Fix after merge --- Cargo.lock | 2 +- ethcore/sync/src/tests/private.rs | 2 +- miner/src/pool/scoring.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c24f5cbd7f..efdc7214ef5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -760,7 +760,7 @@ dependencies = [ "serde_derive 1.0.37 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "transaction-pool 1.12.2", + "transaction-pool 1.13.1", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/ethcore/sync/src/tests/private.rs b/ethcore/sync/src/tests/private.rs index d7f2ad943f2..9b39aed76d7 100644 --- a/ethcore/sync/src/tests/private.rs +++ b/ethcore/sync/src/tests/private.rs @@ -146,7 +146,7 @@ fn send_private_transaction() { let signed_private_transaction = received_signed_private_transactions[0].clone(); assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok()); let signature: SignedPrivateTransaction = Rlp::new(&signed_private_transaction).as_val().unwrap(); - assert!(pm0.process_signature(signature).is_ok()); + assert!(pm0.process_signature(&signature).is_ok()); let local_transactions = net.peer(0).miner.local_transactions(); assert_eq!(local_transactions.len(), 1); } diff --git a/miner/src/pool/scoring.rs b/miner/src/pool/scoring.rs index 0ea4e258b5c..61fcf4e418a 100644 --- a/miner/src/pool/scoring.rs +++ b/miner/src/pool/scoring.rs @@ -185,7 +185,7 @@ mod tests { }; let keypair = Random.generate().unwrap(); - let txs = vec![tx1, tx2, tx3, tx4].into_iter().map(|(_, tx)| { + let txs = vec![tx1, tx2, tx3, tx4].into_iter().map(|tx| { tx.unsigned().sign(keypair.secret(), None).verified() }).collect::>();