From 60b16bec3ede5f4001af61d62ad13463b4a84b6c Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Mon, 26 Nov 2018 17:54:09 +0100 Subject: [PATCH 1/4] Don't sync all peers after each response --- Cargo.lock | 4 +-- ethcore/sync/src/api.rs | 11 +++++--- ethcore/sync/src/chain/handler.rs | 31 ++++++++++----------- ethcore/sync/src/chain/mod.rs | 46 +++++++++++++++---------------- 4 files changed, 46 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 623a5967e4c..b2fbf7f78c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -685,8 +685,8 @@ dependencies = [ "rlp_compress 0.1.0", "rlp_derive 0.1.0", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.78 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.78 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "stats 0.1.0", "stop-guard 0.1.0", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 4296a4c2d07..d7998dd5ada 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -379,8 +379,9 @@ impl SyncProvider for EthSync { } const PEERS_TIMER: TimerToken = 0; -const SYNC_TIMER: TimerToken = 1; -const TX_TIMER: TimerToken = 2; +const MAINTAIN_SYNC_TIMER: TimerToken = 1; +const CONTINUE_SYNC_TIMER: TimerToken = 2; +const TX_TIMER: TimerToken = 3; struct SyncProtocolHandler { /// Shared blockchain client. @@ -397,7 +398,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler { fn initialize(&self, io: &NetworkContext) { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); - io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); + io.register_timer(MAINTAIN_SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); + io.register_timer(CONTINUE_SYNC_TIMER, Duration::from_millis(2_500)).expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); } } @@ -428,7 +430,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler { let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); match timer { PEERS_TIMER => self.sync.write().maintain_peers(&mut io), - SYNC_TIMER => self.sync.write().maintain_sync(&mut io), + MAINTAIN_SYNC_TIMER => self.sync.write().maintain_sync(&mut io), + CONTINUE_SYNC_TIMER => self.sync.write().continue_sync(&mut io), TX_TIMER => { self.sync.write().propagate_new_transactions(&mut io); }, diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index e6702974349..254de62a681 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -72,21 +72,21 @@ impl SyncHandler { return; } let rlp = Rlp::new(data); - let result = match packet_id { - STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp), - TRANSACTIONS_PACKET => SyncHandler::on_peer_transactions(sync, io, peer, &rlp), - BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp), - BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp), - RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp), - NEW_BLOCK_PACKET => SyncHandler::on_peer_new_block(sync, io, peer, &rlp), - NEW_BLOCK_HASHES_PACKET => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp), - SNAPSHOT_MANIFEST_PACKET => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp), - SNAPSHOT_DATA_PACKET => SyncHandler::on_snapshot_data(sync, io, peer, &rlp), - PRIVATE_TRANSACTION_PACKET => SyncHandler::on_private_transaction(sync, io, peer, &rlp), - SIGNED_PRIVATE_TRANSACTION_PACKET => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp), + let (sync_peer, result) = match packet_id { + STATUS_PACKET => (true, SyncHandler::on_peer_status(sync, io, peer, &rlp)), + TRANSACTIONS_PACKET => (false, SyncHandler::on_peer_transactions(sync, io, peer, &rlp)), + BLOCK_HEADERS_PACKET => (true, SyncHandler::on_peer_block_headers(sync, io, peer, &rlp)), + BLOCK_BODIES_PACKET => (true, SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp)), + RECEIPTS_PACKET => (true, SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp)), + NEW_BLOCK_PACKET => (true, SyncHandler::on_peer_new_block(sync, io, peer, &rlp)), + NEW_BLOCK_HASHES_PACKET => (true, SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp)), + SNAPSHOT_MANIFEST_PACKET => (true, SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp)), + SNAPSHOT_DATA_PACKET => (true, SyncHandler::on_snapshot_data(sync, io, peer, &rlp)), + PRIVATE_TRANSACTION_PACKET => (true, SyncHandler::on_private_transaction(sync, io, peer, &rlp)), + SIGNED_PRIVATE_TRANSACTION_PACKET => (true, SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp)), _ => { debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id); - Ok(()) + (false, Ok(())) } }; @@ -99,13 +99,12 @@ impl SyncHandler { Err(DownloaderImportError::Useless) => { sync.deactivate_peer(io, peer); }, - Ok(()) => { + Ok(()) if sync_peer => { // give a task to the same peer first sync.sync_peer(io, peer, false); }, + _ => (), } - // give tasks to other peers - sync.continue_sync(io); } /// Called when peer sends us new consensus packet diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index e0fc8ecddb6..e88517c16fb 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -655,37 +655,35 @@ impl ChainSync { } /// Resume downloading - fn continue_sync(&mut self, io: &mut SyncIo) { - // Collect active peers that can sync - let confirmed_peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)| - if peer.can_sync() { - Some((*peer_id, peer.protocol_version)) - } else { - None - } - ).collect(); - - trace!( - target: "sync", - "Syncing with peers: {} active, {} confirmed, {} total", - self.active_peers.len(), confirmed_peers.len(), self.peers.len() - ); - + pub fn continue_sync(&mut self, io: &mut SyncIo) { if self.state == SyncState::Waiting { trace!(target: "sync", "Waiting for the block queue"); } else if self.state == SyncState::SnapshotWaiting { trace!(target: "sync", "Waiting for the snapshot restoration"); } else { - let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)| - self.active_peers.contains(&peer_id) - ).map(|v| *v).collect(); + // Collect active peers that can sync + let mut peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)| + if peer.can_sync() && peer.asking == PeerAsking::Nothing && self.active_peers.contains(&peer_id) { + Some((*peer_id, peer.protocol_version)) + } else { + None + } + ).collect(); - random::new().shuffle(&mut peers); //TODO: sort by rating - // prefer peers with higher protocol version - peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); + if peers.len() > 0 { + trace!( + target: "sync", + "Syncing with peers: {} active, {} available, {} total", + self.active_peers.len(), peers.len(), self.peers.len() + ); - for (peer_id, _) in peers { - self.sync_peer(io, peer_id, false); + random::new().shuffle(&mut peers); //TODO: sort by rating + // prefer peers with higher protocol version + peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); + + for (peer_id, _) in peers { + self.sync_peer(io, peer_id, false); + } } } From 6b86de918ec2e6e49c4c526911c53cd36ba7a39c Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Tue, 27 Nov 2018 15:44:20 +0100 Subject: [PATCH 2/4] Update formating --- ethcore/sync/src/api.rs | 2 +- ethcore/sync/src/chain/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index d7998dd5ada..f9b4635b97a 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -399,7 +399,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); io.register_timer(MAINTAIN_SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); - io.register_timer(CONTINUE_SYNC_TIMER, Duration::from_millis(2_500)).expect("Error registering sync timer"); + io.register_timer(CONTINUE_SYNC_TIMER, Duration::from_millis(2500)).expect("Error registering sync timer"); io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index e88517c16fb..fb05cf9fc0d 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -677,7 +677,7 @@ impl ChainSync { self.active_peers.len(), peers.len(), self.peers.len() ); - random::new().shuffle(&mut peers); //TODO: sort by rating + random::new().shuffle(&mut peers); // TODO: sort by rating // prefer peers with higher protocol version peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2)); From a33b54aa3611e25ebfd6bde3ab1eb3cf54bb8b23 Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Tue, 27 Nov 2018 15:55:43 +0100 Subject: [PATCH 3/4] Fix tests: add `continue_sync` to `Sync_step` --- ethcore/sync/src/tests/helpers.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index d75d71ea90a..e97fd331bd0 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -286,10 +286,12 @@ impl Peer for EthPeer { } fn sync_step(&self) { + let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None); self.chain.flush(); - self.sync.write().maintain_peers(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); - self.sync.write().maintain_sync(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); - self.sync.write().propagate_new_transactions(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None)); + self.sync.write().maintain_peers(&mut io); + self.sync.write().maintain_sync(&mut io); + self.sync.write().continue_sync(&mut io); + self.sync.write().propagate_new_transactions(&mut io); } fn restart_sync(&self) { From e3fd6b06d69e4c5a2dc24a44c1bacc6e3bdd31db Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Wed, 28 Nov 2018 11:33:49 +0100 Subject: [PATCH 4/4] Update ethcore/sync/src/chain/mod.rs Co-Authored-By: ngotchac --- ethcore/sync/src/chain/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index fb05cf9fc0d..b10842c3e38 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -677,7 +677,7 @@ impl ChainSync { self.active_peers.len(), peers.len(), self.peers.len() ); - random::new().shuffle(&mut peers); // TODO: sort by rating + random::new().shuffle(&mut peers); // TODO (#646): sort by rating // prefer peers with higher protocol version peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));