Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
315f784
Pause GetBlockHeaders requests while importing blocks with possible fork
grbIzl Nov 15, 2019
7a062eb
Lookup for the route instead of direct parent
grbIzl Feb 17, 2020
b5d25ec
Check fork status in cycle
grbIzl Feb 17, 2020
7b71b69
Add requests to queue instead of waiting on mutex
grbIzl Feb 19, 2020
e95f4f3
Store amount of children for the hash in order optimize traverse of q…
grbIzl Feb 19, 2020
cfa0ad3
Do not process empty queues
grbIzl Feb 21, 2020
811bc0d
Use send method instead of respond
grbIzl Feb 24, 2020
6852123
Revert of route usage and children counting
grbIzl Mar 30, 2020
465a9ca
Retrieve parent hash and difficulty when new block comes
grbIzl Mar 30, 2020
4f3b917
Reset block sync to the initial state, if non recovable issue occured
grbIzl Mar 30, 2020
8d7eb48
Fix after merge with head
grbIzl Mar 31, 2020
8e654f6
Proper derive_more used
grbIzl Mar 31, 2020
c5ebc29
Compute route instead of simple check
grbIzl Apr 2, 2020
2d10873
Forgotten chain parameter added
grbIzl Apr 2, 2020
7753087
Checked sub and not public method
grbIzl May 29, 2020
0bb5860
Replace set with vec
grbIzl Jun 2, 2020
98d2885
Response handling re-written
grbIzl Jun 2, 2020
1d239b2
Response handling re-written
grbIzl Jun 2, 2020
01f9bd7
Proper destructuring of items
grbIzl Jun 2, 2020
2e598ac
Comment wording changed
grbIzl Jun 2, 2020
4ec07f3
Method renamed
grbIzl Jun 2, 2020
d093b83
Remove delayed requests for disconnected peer
grbIzl Jun 2, 2020
f7b709b
Use the correct default if difficulty is none
grbIzl Jun 8, 2020
65c92d4
Add packets ids array for the lookup
grbIzl Jun 18, 2020
4968d14
Fix incorrect import after merge
grbIzl Jun 18, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ethcore/client-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ pub trait BlockChainClient:
/// Schedule state-altering transaction to be executed on the next pending
/// block with the given gas and nonce parameters.
fn transact(&self, tx_request: TransactionRequest) -> Result<(), transaction::Error>;

/// Returns true, if underlying import queue is processing possible fork at the moment
fn is_processing_fork(&self) -> bool;
}

/// The data required for a `Client` to create a transaction.
Expand Down
5 changes: 5 additions & 0 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,11 @@ impl BlockChainClient for Client {
}
}

fn is_processing_fork(&self) -> bool {
let chain = self.chain.read();
self.importer.block_queue.is_processing_fork(&chain.best_block_hash(), &chain)
}

fn block_total_difficulty(&self, id: BlockId) -> Option<U256> {
let chain = self.chain.read();

Expand Down
2 changes: 2 additions & 0 deletions ethcore/src/test_helpers/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ impl BlockChainClient for TestBlockChainClient {
}
}

fn is_processing_fork(&self) -> bool { false }

// works only if blocks are one after another 1 -> 2 -> 3
fn tree_route(&self, from: &H256, to: &H256) -> Option<TreeRoute> {
Some(TreeRoute {
Expand Down
1 change: 1 addition & 0 deletions ethcore/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bytes = { package = "parity-bytes", version = "0.1" }
client-traits = { path = "../client-traits" }
common-types = { path = "../types" }
devp2p = { package = "ethcore-network-devp2p", path = "../../util/network-devp2p" }
derive_more = "0.99"
enum-primitive-derive = "0.2"
ethcore-io = { path = "../../util/io" }
ethcore-private-tx = { path = "../private-tx" }
Expand Down
3 changes: 3 additions & 0 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ const MAINTAIN_SYNC_TIMER: TimerToken = 1;
const CONTINUE_SYNC_TIMER: TimerToken = 2;
const TX_TIMER: TimerToken = 3;
const PRIORITY_TIMER: TimerToken = 4;
const DELAYED_PROCESSING_TIMER: TimerToken = 5;

pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);

Expand All @@ -489,6 +490,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
io.register_timer(MAINTAIN_SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(CONTINUE_SYNC_TIMER, Duration::from_millis(2500)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
io.register_timer(DELAYED_PROCESSING_TIMER, Duration::from_millis(2100)).expect("Error registering delayed processing timer");

io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer");
}
Expand Down Expand Up @@ -539,6 +541,7 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
CONTINUE_SYNC_TIMER => self.sync.write().continue_sync(&mut io),
TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
DELAYED_PROCESSING_TIMER => self.sync.process_delayed_requests(&mut io),
_ => warn!("Unknown timer {} triggered.", timer),
}
}
Expand Down
41 changes: 30 additions & 11 deletions ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,19 @@ impl BlockDownloader {
self.state = State::Blocks;
}

/// Reset sync to the specified block
fn reset_to_block(&mut self, start_hash: &H256, start_number: BlockNumber) {
self.reset();
self.last_imported_block = start_number;
self.last_imported_hash = start_hash.clone();
self.last_round_start = start_number;
self.last_round_start_hash = start_hash.clone();
self.imported_this_round = None;
self.round_parents = VecDeque::new();
self.target_hash = None;
self.retract_step = 1;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about self.blocks? It seems that reset_to does reset them too.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's being reset inside self.reset() call.

}

/// Returns best imported block number.
pub fn last_imported_block_number(&self) -> BlockNumber {
self.last_imported_block
Expand Down Expand Up @@ -439,22 +452,28 @@ impl BlockDownloader {
trace_sync!(self, "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
} else {
let best = io.chain().chain_info().best_block_number;
let best_hash = io.chain().chain_info().best_block_hash;
let oldest_reorg = io.chain().pruning_info().earliest_state;
if self.block_set == BlockSet::NewBlocks && best > start && start < oldest_reorg {
debug_sync!(self, "Could not revert to previous ancient block, last: {} ({})", start, start_hash);
self.reset();
self.reset_to_block(&best_hash, best);
} else {
let n = start - cmp::min(self.retract_step, start);
self.retract_step *= 2;
match io.chain().block_hash(BlockId::Number(n)) {
Some(h) => {
self.last_imported_block = n;
self.last_imported_hash = h;
trace_sync!(self, "Searching common header in the blockchain {} ({})", start, self.last_imported_hash);
}
None => {
debug_sync!(self, "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash);
self.reset();
if n == 0 {
debug_sync!(self, "Header not found, bottom line reached, resetting, last imported: {}", self.last_imported_hash);
self.reset_to_block(&best_hash, best);
Copy link
Copy Markdown
Collaborator Author

@grbIzl grbIzl Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, here the calling of simple reset is not enough, because it resets counters (last_imported_block) to the current round. In this case the rounds went too far and simple rollback inside round doesn't help. reset_to_block here actually resets block sync to its initial state

} else {
self.retract_step *= 2;
match io.chain().block_hash(BlockId::Number(n)) {
Some(h) => {
self.last_imported_block = n;
self.last_imported_hash = h;
trace_sync!(self, "Searching common header in the blockchain {} ({})", start, self.last_imported_hash);
}
None => {
debug_sync!(self, "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash);
self.reset_to_block(&best_hash, best);
}
}
}
}
Expand Down
23 changes: 14 additions & 9 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
SnapshotDataPacket, SnapshotManifestPacket, StatusPacket,
}
},
BlockSet, ChainSync, ForkConfirmation, PacketDecodeError, PeerAsking, PeerInfo, SyncRequester,
BlockSet, ChainSync, ForkConfirmation, PacketProcessError, PeerAsking, PeerInfo, SyncRequester,
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4,
}
Expand Down Expand Up @@ -114,6 +114,7 @@ impl SyncHandler {
debug!(target: "sync", "Disconnected {}", peer_id);
sync.clear_peer_download(peer_id);
sync.peers.remove(&peer_id);
sync.delayed_requests.retain(|(request_peer_id, _, _)| *request_peer_id != peer_id);
sync.active_peers.remove(&peer_id);

if sync.state == SyncState::SnapshotManifest {
Expand Down Expand Up @@ -149,23 +150,27 @@ impl SyncHandler {
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
return Ok(());
}
let difficulty: U256 = r.val_at(1)?;
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
peer.difficulty = Some(difficulty);
}
}
let block = Unverified::from_rlp(r.at(0)?.as_raw().to_vec())?;
let hash = block.header.hash();
let number = block.header.number();
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, hash);
if number > sync.highest_block.unwrap_or(0) {
sync.highest_block = Some(number);
}
let parent_hash = block.header.parent_hash();
let difficulty: U256 = r.val_at(1)?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why read that from RLP again if we have decoded block already? Isn't that simply part of the block.header?
EDIT: Ah, seems it's expected to be Total Difficulty, isn't that part of the decode struct anyway?

Copy link
Copy Markdown
Collaborator Author

@grbIzl grbIzl Jun 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not. There is the usual Header there only. Also see some details in the the next question

// Most probably the sent block is being imported by peer right now
Copy link
Copy Markdown
Collaborator Author

@grbIzl grbIzl Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See item 2 in the addressed issues in PR's description

// Use td and hash, that peer must have for now
let parent_td = difficulty.checked_sub(*block.header.difficulty());
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| parent_td.map_or(false, |td| td > pd)) {
peer.difficulty = parent_td;
}
}
let mut unknown = false;

if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
peer.latest_hash = hash;
peer.latest_hash = *parent_hash;
}

let last_imported_number = sync.new_blocks.last_imported_block_number();
Expand Down Expand Up @@ -675,7 +680,7 @@ impl SyncHandler {
}

/// Called when peer sends us new transactions
pub fn on_peer_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: Rlp) -> Result<(), PacketDecodeError> {
pub fn on_peer_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: Rlp) -> Result<(), PacketProcessError> {
// Accept transactions only when fully synced
if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
Expand Down
58 changes: 54 additions & 4 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,14 @@ use crate::{

use bytes::Bytes;
use client_traits::BlockChainClient;
use derive_more::Display;
use ethereum_types::{H256, U256};
use fastmap::{H256FastMap, H256FastSet};
use futures::sync::mpsc as futures_mpsc;
use keccak_hash::keccak;
use log::{error, trace, debug, warn};
use network::client_version::ClientVersion;
use network::{self, PeerId, PacketId};
use network::{self, PeerId};
use parity_util_mem::{MallocSizeOfExt, malloc_size_of_is_0};
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use rand::{Rng, seq::SliceRandom};
Expand Down Expand Up @@ -147,7 +148,23 @@ pub(crate) use self::supplier::SyncSupplier;

malloc_size_of_is_0!(PeerInfo);

pub type PacketDecodeError = DecoderError;
/// Possible errors during packet's processing
#[derive(Debug, Display)]
pub enum PacketProcessError {
/// Error of RLP decoder
#[display(fmt = "Decoder Error: {}", _0)]
Decoder(DecoderError),
/// Underlying client is busy and cannot process the packet
/// The packet should be postponed for later response
#[display(fmt = "Underlying client is busy")]
ClientBusy,
}

impl From<DecoderError> for PacketProcessError {
fn from(err: DecoderError) -> Self {
PacketProcessError::Decoder(err).into()
}
}

/// Version 64 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11);
Expand Down Expand Up @@ -411,7 +428,7 @@ pub mod random {
}
}

pub type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
pub type RlpResponseResult = Result<Option<(SyncPacket, RlpStream)>, PacketProcessError>;
pub type Peers = HashMap<PeerId, PeerInfo>;

/// Thread-safe wrapper for `ChainSync`.
Expand Down Expand Up @@ -468,6 +485,17 @@ impl ChainSyncApi {
SyncSupplier::dispatch_packet(&self.sync, io, peer, packet_id, data)
}

/// Process the queue with requests, that were delayed with response.
pub fn process_delayed_requests(&self, io: &mut dyn SyncIo) {
let requests = self.sync.write().retrieve_delayed_requests();
if !requests.is_empty() {
debug!(target: "sync", "Processing {} delayed requests", requests.len());
for (peer_id, packet_id, packet_data) in requests {
SyncSupplier::dispatch_delayed_request(&self.sync, io, peer_id, packet_id, &packet_data);
}
}
}

/// Process a priority propagation queue.
/// This task is run from a timer and should be time constrained.
/// Hence we set up a deadline for the execution and cancel the task if the deadline is exceeded.
Expand Down Expand Up @@ -672,6 +700,10 @@ pub struct ChainSync {
/// Connected peers pending Status message.
/// Value is request timestamp.
handshaking_peers: HashMap<PeerId, Instant>,
/// Requests, that can not be processed at the moment
delayed_requests: Vec<(PeerId, u8, Vec<u8>)>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried that this is unbounded. How do we know it doesn't grow too big?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed this by cleaning requests for disconnected peer (in terms of the other your question)

/// Ids of delayed requests, used for lookup, id is composed from peer id and packet id
delayed_requests_ids: HashSet<(PeerId, u8)>,
/// Sync start timestamp. Measured when first peer is connected
sync_start_time: Option<Instant>,
/// Transactions propagation statistics
Expand Down Expand Up @@ -707,6 +739,8 @@ impl ChainSync {
peers: HashMap::new(),
handshaking_peers: HashMap::new(),
active_peers: HashSet::new(),
delayed_requests: Vec::new(),
delayed_requests_ids: HashSet::new(),
new_blocks: BlockDownloader::new(BlockSet::NewBlocks, &chain_info.best_block_hash, chain_info.best_block_number),
old_blocks: None,
last_sent_block_number: 0,
Expand Down Expand Up @@ -821,6 +855,22 @@ impl ChainSync {
self.active_peers = self.peers.keys().cloned().collect();
}

/// Add a request for later processing
pub fn add_delayed_request(&mut self, peer: PeerId, packet_id: u8, data: &[u8]) {
// Ignore the request, if there is a request already in queue with the same id
if !self.delayed_requests_ids.contains(&(peer, packet_id)) {
self.delayed_requests_ids.insert((peer, packet_id));
self.delayed_requests.push((peer, packet_id, data.to_vec()));
debug!(target: "sync", "Delayed request with packet id {} from peer {} added", packet_id, peer);
}
}

/// Drain and return all delayed requests
pub fn retrieve_delayed_requests(&mut self) -> Vec<(PeerId, u8, Vec<u8>)> {
self.delayed_requests_ids.clear();
self.delayed_requests.drain(..).collect()
}

/// Restart sync
pub fn reset_and_continue(&mut self, io: &mut dyn SyncIo) {
trace!(target: "sync", "Restarting");
Expand Down Expand Up @@ -1261,7 +1311,7 @@ impl ChainSync {
packet.append(&chain.total_difficulty);
packet.append(&chain.best_block_hash);
packet.append(&chain.genesis_hash);
if eth_protocol_version >= ETH_PROTOCOL_VERSION_64.0 {
if eth_protocol_version >= ETH_PROTOCOL_VERSION_64.0 {
packet.append(&self.fork_filter.current(io.chain()));
}
if warp_protocol {
Expand Down
Loading