diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs
index 1763b8fd5ed..77429a4e56e 100644
--- a/ethcore/service/src/service.rs
+++ b/ethcore/service/src/service.rs
@@ -106,7 +106,13 @@ impl ClientService {
info!("Configured for {} using {} engine", Colour::White.bold().paint(spec.name.clone()), Colour::Yellow.bold().paint(spec.engine.name()));
let pruning = config.pruning;
- let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?;
+ let client = Client::new(
+ config,
+ &spec,
+ blockchain_db.clone(),
+ miner.clone(),
+ io_service.channel(),
+ )?;
miner.set_io_channel(io_service.channel());
miner.set_in_chain_checker(&client.clone());
diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs
index e5af6420861..4286dc41482 100644
--- a/ethcore/src/blockchain/blockchain.rs
+++ b/ethcore/src/blockchain/blockchain.rs
@@ -1187,8 +1187,8 @@ impl BlockChain {
let mut pending_block_details = self.pending_block_details.write();
let mut pending_write_txs = self.pending_transaction_addresses.write();
- let mut best_ancient_block = self.best_ancient_block.write();
let mut best_block = self.best_block.write();
+ let mut best_ancient_block = self.best_ancient_block.write();
let mut write_block_details = self.block_details.write();
let mut write_hashes = self.block_hashes.write();
let mut write_txs = self.transaction_addresses.write();
diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs
index ebfe7bdef5c..3d576ae12aa 100644
--- a/ethcore/src/client/chain_notify.rs
+++ b/ethcore/src/client/chain_notify.rs
@@ -15,7 +15,7 @@
// along with Parity. If not, see .
use bytes::Bytes;
-use ethereum_types::H256;
+use ethereum_types::{H256, U256};
use transaction::UnverifiedTransaction;
use blockchain::ImportRoute;
use std::time::Duration;
@@ -141,7 +141,15 @@ pub trait ChainNotify : Send + Sync {
}
/// fires when chain broadcasts a message
- fn broadcast(&self, _message_type: ChainMessageType) {}
+ fn broadcast(&self, _message_type: ChainMessageType) {
+ // does nothing by default
+ }
+
+ /// fires when new block is about to be imported
+ /// implementations should be light
+ fn block_pre_import(&self, _bytes: &Bytes, _hash: &H256, _difficulty: &U256) {
+ // does nothing by default
+ }
/// fires when new transactions are received from a peer
fn transactions_received(&self,
diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs
index d5ae9d80f6a..00dc9cab53e 100644
--- a/ethcore/src/client/client.rs
+++ b/ethcore/src/client/client.rs
@@ -881,7 +881,7 @@ impl Client {
/// Flush the block import queue.
pub fn flush_queue(&self) {
self.importer.block_queue.flush();
- while !self.importer.block_queue.queue_info().is_empty() {
+ while !self.importer.block_queue.is_empty() {
self.import_verified_blocks();
}
}
@@ -1423,8 +1423,21 @@ impl ImportBlock for Client {
bail!(EthcoreErrorKind::Block(BlockError::UnknownParent(unverified.parent_hash())));
}
+ let raw = if self.importer.block_queue.is_empty() {
+ Some((
+ unverified.bytes.clone(),
+ unverified.header.hash(),
+ *unverified.header.difficulty(),
+ ))
+ } else { None };
+
match self.importer.block_queue.import(unverified) {
- Ok(res) => Ok(res),
+ Ok(hash) => {
+ if let Some((raw, hash, difficulty)) = raw {
+ self.notify(move |n| n.block_pre_import(&raw, &hash, &difficulty));
+ }
+ Ok(hash)
+ },
// we only care about block errors (not import errors)
Err((block, EthcoreError(EthcoreErrorKind::Block(err), _))) => {
self.importer.bad_blocks.report(block.bytes, format!("{:?}", err));
@@ -1878,6 +1891,10 @@ impl BlockChainClient for Client {
self.importer.block_queue.queue_info()
}
+ fn is_queue_empty(&self) -> bool {
+ self.importer.block_queue.is_empty()
+ }
+
fn clear_queue(&self) {
self.importer.block_queue.clear();
}
@@ -2288,7 +2305,11 @@ impl ScheduleInfo for Client {
impl ImportSealedBlock for Client {
fn import_sealed_block(&self, block: SealedBlock) -> EthcoreResult {
let start = Instant::now();
+ let raw = block.rlp_bytes();
let header = block.header().clone();
+ let hash = header.hash();
+ self.notify(|n| n.block_pre_import(&raw, &hash, header.difficulty()));
+
let route = {
// Do a super duper basic verification to detect potential bugs
if let Err(e) = self.engine.verify_block_basic(&header) {
@@ -2306,15 +2327,14 @@ impl ImportSealedBlock for Client {
let block_data = block.rlp_bytes();
let route = self.importer.commit_block(block, &header, encoded::Block::new(block_data), self);
- trace!(target: "client", "Imported sealed block #{} ({})", header.number(), header.hash());
+ trace!(target: "client", "Imported sealed block #{} ({})", header.number(), hash);
self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
route
};
- let h = header.hash();
let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(
self,
- &[h],
+ &[hash],
&[],
route.enacted(),
route.retracted(),
@@ -2322,16 +2342,16 @@ impl ImportSealedBlock for Client {
);
self.notify(|notify| {
notify.new_blocks(
- vec![h],
+ vec![hash],
vec![],
route.clone(),
- vec![h],
+ vec![hash],
vec![],
start.elapsed(),
);
});
self.db.read().key_value().flush().expect("DB flush failed.");
- Ok(h)
+ Ok(hash)
}
}
diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs
index 5b78a54b34e..55d527013ec 100644
--- a/ethcore/src/client/traits.rs
+++ b/ethcore/src/client/traits.rs
@@ -300,6 +300,11 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
/// Get block queue information.
fn queue_info(&self) -> BlockQueueInfo;
+ /// Returns true if block queue is empty.
+ fn is_queue_empty(&self) -> bool {
+ self.queue_info().is_empty()
+ }
+
/// Clear block queue and abort all import activity.
fn clear_queue(&self);
diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs
index 39dac1f2b21..c73887800c8 100644
--- a/ethcore/src/miner/miner.rs
+++ b/ethcore/src/miner/miner.rs
@@ -576,7 +576,7 @@ impl Miner {
trace!(target: "miner", "requires_reseal: sealing enabled");
// Disable sealing if there were no requests for SEALING_TIMEOUT_IN_BLOCKS
- let had_requests = sealing.last_request.map(|last_request|
+ let had_requests = sealing.last_request.map(|last_request|
best_block.saturating_sub(last_request) <= SEALING_TIMEOUT_IN_BLOCKS
).unwrap_or(false);
diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs
index 9b1597439b0..b9242f47b06 100644
--- a/ethcore/src/verification/queue/mod.rs
+++ b/ethcore/src/verification/queue/mod.rs
@@ -583,6 +583,13 @@ impl VerificationQueue {
result
}
+ /// Returns true if there is nothing currently in the queue.
+ /// TODO [ToDr] Optimize to avoid locking
+ pub fn is_empty(&self) -> bool {
+ let v = &self.verification;
+ v.unverified.lock().is_empty() && v.verifying.lock().is_empty() && v.verified.lock().is_empty()
+ }
+
/// Get queue status.
pub fn queue_info(&self) -> QueueInfo {
use std::mem::size_of;
diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs
index 4296a4c2d07..6fef887a0fe 100644
--- a/ethcore/sync/src/api.rs
+++ b/ethcore/sync/src/api.rs
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
-use std::sync::Arc;
+use std::sync::{Arc, mpsc, atomic};
use std::collections::{HashMap, BTreeMap};
use std::io;
use std::ops::Range;
@@ -33,10 +33,10 @@ use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageTyp
use ethcore::snapshot::SnapshotService;
use ethcore::header::BlockNumber;
use sync_io::NetSyncIo;
-use chain::{ChainSync, SyncStatus as EthSyncStatus};
+use chain::{ChainSyncApi, SyncStatus as EthSyncStatus};
use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr;
-use parking_lot::RwLock;
+use parking_lot::{RwLock, Mutex};
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3,
PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
@@ -228,6 +228,37 @@ impl AttachedProtocol {
}
}
+/// A prioritized tasks run in a specialised timer.
+/// Every task should be completed within a hard deadline,
+/// if it's not it's either cancelled or split into multiple tasks.
+/// NOTE These tasks might not complete at all, so anything
+/// that happens here should work even if the task is cancelled.
+#[derive(Debug)]
+pub enum PriorityTask {
+ /// Propagate given block
+ PropagateBlock {
+ /// When the task was initiated
+ started: ::std::time::Instant,
+ /// Raw block RLP to propagate
+ block: Bytes,
+ /// Block hash
+ hash: H256,
+ /// Blocks difficulty
+ difficulty: U256,
+ },
+ /// Propagate a list of transactions
+ PropagateTransactions(::std::time::Instant, Arc),
+}
+impl PriorityTask {
+ /// Mark the task as being processed, right after it's retrieved from the queue.
+ pub fn starting(&self) {
+ match *self {
+ PriorityTask::PropagateTransactions(_, ref is_ready) => is_ready.store(true, atomic::Ordering::SeqCst),
+ _ => {},
+ }
+ }
+}
+
/// EthSync initialization parameters.
pub struct Params {
/// Configuration.
@@ -260,6 +291,8 @@ pub struct EthSync {
subprotocol_name: [u8; 3],
/// Light subprotocol name.
light_subprotocol_name: [u8; 3],
+ /// Priority tasks notification channel
+ priority_tasks: Mutex>,
}
fn light_params(
@@ -312,13 +345,19 @@ impl EthSync {
})
};
- let chain_sync = ChainSync::new(params.config, &*params.chain, params.private_tx_handler.clone());
+ let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel();
+ let sync = ChainSyncApi::new(
+ params.config,
+ &*params.chain,
+ params.private_tx_handler.clone(),
+ priority_tasks_rx,
+ );
let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?;
let sync = Arc::new(EthSync {
network: service,
eth_handler: Arc::new(SyncProtocolHandler {
- sync: RwLock::new(chain_sync),
+ sync,
chain: params.chain,
snapshot_service: params.snapshot_service,
overlay: RwLock::new(HashMap::new()),
@@ -327,26 +366,32 @@ impl EthSync {
subprotocol_name: params.config.subprotocol_name,
light_subprotocol_name: params.config.light_subprotocol_name,
attached_protos: params.attached_protos,
+ priority_tasks: Mutex::new(priority_tasks_tx),
});
Ok(sync)
}
+
+ /// Priority tasks producer
+ pub fn priority_tasks(&self) -> mpsc::Sender {
+ self.priority_tasks.lock().clone()
+ }
}
impl SyncProvider for EthSync {
/// Get sync status
fn status(&self) -> EthSyncStatus {
- self.eth_handler.sync.read().status()
+ self.eth_handler.sync.status()
}
/// Get sync peers
fn peers(&self) -> Vec {
self.network.with_context_eval(self.subprotocol_name, |ctx| {
let peer_ids = self.network.connected_peers();
- let eth_sync = self.eth_handler.sync.read();
let light_proto = self.light_proto.as_ref();
- peer_ids.into_iter().filter_map(|peer_id| {
+ let peer_info = self.eth_handler.sync.peer_info(&peer_ids);
+ peer_ids.into_iter().zip(peer_info).filter_map(|(peer_id, peer_info)| {
let session_info = match ctx.session_info(peer_id) {
None => return None,
Some(info) => info,
@@ -358,7 +403,7 @@ impl SyncProvider for EthSync {
capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
remote_address: session_info.remote_address,
local_address: session_info.local_address,
- eth_info: eth_sync.peer_info(&peer_id),
+ eth_info: peer_info,
pip_info: light_proto.as_ref().and_then(|lp| lp.peer_status(peer_id)).map(Into::into),
})
}).collect()
@@ -370,17 +415,16 @@ impl SyncProvider for EthSync {
}
fn transactions_stats(&self) -> BTreeMap {
- let sync = self.eth_handler.sync.read();
- sync.transactions_stats()
- .iter()
- .map(|(hash, stats)| (*hash, stats.into()))
- .collect()
+ self.eth_handler.sync.transactions_stats()
}
}
const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2;
+const PRIORITY_TIMER: TimerToken = 3;
+
+pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);
struct SyncProtocolHandler {
/// Shared blockchain client.
@@ -388,7 +432,7 @@ struct SyncProtocolHandler {
/// Shared snapshot service.
snapshot_service: Arc,
/// Sync strategy
- sync: RwLock,
+ sync: ChainSyncApi,
/// Chain overlay used to cache data such as fork block.
overlay: RwLock>,
}
@@ -399,11 +443,13 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
+
+ io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer");
}
}
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
- ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
+ self.sync.dispatch_packet(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
@@ -429,15 +475,26 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
- TX_TIMER => {
- self.sync.write().propagate_new_transactions(&mut io);
- },
+ TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
+ PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
_ => warn!("Unknown timer {} triggered.", timer),
}
}
}
impl ChainNotify for EthSync {
+ fn block_pre_import(&self, bytes: &Bytes, hash: &H256, difficulty: &U256) {
+ let task = PriorityTask::PropagateBlock {
+ started: ::std::time::Instant::now(),
+ block: bytes.clone(),
+ hash: *hash,
+ difficulty: *difficulty,
+ };
+ if let Err(e) = self.priority_tasks.lock().send(task) {
+ warn!(target: "sync", "Unexpected error during priority block propagation: {:?}", e);
+ }
+ }
+
fn new_blocks(&self,
imported: Vec,
invalid: Vec,
diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs
index e6702974349..104a80320e9 100644
--- a/ethcore/sync/src/chain/handler.rs
+++ b/ethcore/sync/src/chain/handler.rs
@@ -29,7 +29,6 @@ use rlp::Rlp;
use snapshot::ChunkType;
use std::cmp;
use std::mem;
-use std::collections::HashSet;
use std::time::Instant;
use sync_io::SyncIo;
@@ -58,7 +57,6 @@ use super::{
SNAPSHOT_DATA_PACKET,
SNAPSHOT_MANIFEST_PACKET,
STATUS_PACKET,
- TRANSACTIONS_PACKET,
};
/// The Chain Sync Handler: handles responses from peers
@@ -67,14 +65,9 @@ pub struct SyncHandler;
impl SyncHandler {
/// Handle incoming packet from peer
pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
- if packet_id != STATUS_PACKET && !sync.peers.contains_key(&peer) {
- debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
- return;
- }
let rlp = Rlp::new(data);
let result = match packet_id {
STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp),
- TRANSACTIONS_PACKET => SyncHandler::on_peer_transactions(sync, io, peer, &rlp),
BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
@@ -109,10 +102,9 @@ impl SyncHandler {
}
/// Called when peer sends us new consensus packet
- pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
+ pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) {
trace!(target: "sync", "Received consensus packet from {:?}", peer_id);
io.chain().queue_consensus_message(r.as_raw().to_vec());
- Ok(())
}
/// Called by peer when it is disconnecting
@@ -578,8 +570,8 @@ impl SyncHandler {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: Instant::now(),
- last_sent_transactions: HashSet::new(),
- last_sent_private_transactions: HashSet::new(),
+ last_sent_transactions: Default::default(),
+ last_sent_private_transactions: Default::default(),
expired: false,
confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
asking_snapshot_data: None,
@@ -635,7 +627,7 @@ impl SyncHandler {
}
/// Called when peer sends us new transactions
- fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
+ pub fn on_peer_transactions(sync: &ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
// Accept transactions only when fully synced
if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs
index e0fc8ecddb6..cdedd56303c 100644
--- a/ethcore/sync/src/chain/mod.rs
+++ b/ethcore/sync/src/chain/mod.rs
@@ -92,17 +92,17 @@ mod propagator;
mod requester;
mod supplier;
-use std::sync::Arc;
-use std::collections::{HashSet, HashMap};
+use std::sync::{Arc, mpsc};
+use std::collections::{HashSet, HashMap, BTreeMap};
use std::cmp;
use std::time::{Duration, Instant};
use hash::keccak;
use heapsize::HeapSizeOf;
use ethereum_types::{H256, U256};
-use fastmap::H256FastMap;
-use parking_lot::RwLock;
+use fastmap::{H256FastMap, H256FastSet};
+use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use bytes::Bytes;
-use rlp::{Rlp, RlpStream, DecoderError};
+use rlp::{RlpStream, DecoderError};
use network::{self, PeerId, PacketId};
use ethcore::header::{BlockNumber};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
@@ -112,7 +112,7 @@ use super::{WarpSync, SyncConfig};
use block_sync::{BlockDownloader, DownloadAction};
use rand::Rng;
use snapshot::{Snapshot};
-use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
+use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask};
use private_tx::PrivateTxHandler;
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
use transaction::UnverifiedTransaction;
@@ -120,7 +120,7 @@ use transaction::UnverifiedTransaction;
use self::handler::SyncHandler;
use self::propagator::SyncPropagator;
use self::requester::SyncRequester;
-use self::supplier::SyncSupplier;
+pub(crate) use self::supplier::SyncSupplier;
known_heap_size!(0, PeerInfo);
@@ -187,6 +187,11 @@ const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120);
+/// Defines how much time we have to complete priority transaction or block propagation.
+/// after the deadline is reached the task is considered finished
+/// (so we might sent only to some part of the peers we originally intended to send to)
+const PRIORITY_TASK_DEADLINE: Duration = Duration::from_millis(100);
+
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state
pub enum SyncState {
@@ -323,9 +328,9 @@ pub struct PeerInfo {
/// Request timestamp
ask_time: Instant,
/// Holds a set of transactions recently sent to this peer to avoid spamming.
- last_sent_transactions: HashSet,
+ last_sent_transactions: H256FastSet,
/// Holds a set of private transactions and their signatures recently sent to this peer to avoid spamming.
- last_sent_private_transactions: HashSet,
+ last_sent_private_transactions: H256FastSet,
/// Pending request is expired and result should be ignored
expired: bool,
/// Peer fork confirmation status
@@ -375,6 +380,217 @@ pub mod random {
pub type RlpResponseResult = Result