Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ impl ClientService {
info!("Configured for {} using {} engine", Colour::White.bold().paint(spec.name.clone()), Colour::Yellow.bold().paint(spec.engine.name()));

let pruning = config.pruning;
let client = Client::new(config, &spec, blockchain_db.clone(), miner.clone(), io_service.channel())?;
let client = Client::new(
config,
&spec,
blockchain_db.clone(),
miner.clone(),
io_service.channel(),
)?;
miner.set_io_channel(io_service.channel());
miner.set_in_chain_checker(&client.clone());

Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,8 +1187,8 @@ impl BlockChain {
let mut pending_block_details = self.pending_block_details.write();
let mut pending_write_txs = self.pending_transaction_addresses.write();

let mut best_ancient_block = self.best_ancient_block.write();
let mut best_block = self.best_block.write();
let mut best_ancient_block = self.best_ancient_block.write();
let mut write_block_details = self.block_details.write();
let mut write_hashes = self.block_hashes.write();
let mut write_txs = self.transaction_addresses.write();
Expand Down
12 changes: 10 additions & 2 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use bytes::Bytes;
use ethereum_types::H256;
use ethereum_types::{H256, U256};
use transaction::UnverifiedTransaction;
use blockchain::ImportRoute;
use std::time::Duration;
Expand Down Expand Up @@ -141,7 +141,15 @@ pub trait ChainNotify : Send + Sync {
}

/// fires when chain broadcasts a message
fn broadcast(&self, _message_type: ChainMessageType) {}
fn broadcast(&self, _message_type: ChainMessageType) {
// does nothing by default
}

/// fires when new block is about to be imported
/// implementations should be light
fn block_pre_import(&self, _bytes: &Bytes, _hash: &H256, _difficulty: &U256) {
// does nothing by default
}

/// fires when new transactions are received from a peer
fn transactions_received(&self,
Expand Down
36 changes: 28 additions & 8 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ impl Client {
/// Flush the block import queue.
pub fn flush_queue(&self) {
self.importer.block_queue.flush();
while !self.importer.block_queue.queue_info().is_empty() {
while !self.importer.block_queue.is_empty() {
self.import_verified_blocks();
}
}
Expand Down Expand Up @@ -1423,8 +1423,21 @@ impl ImportBlock for Client {
bail!(EthcoreErrorKind::Block(BlockError::UnknownParent(unverified.parent_hash())));
}

let raw = if self.importer.block_queue.is_empty() {
Some((
unverified.bytes.clone(),
unverified.header.hash(),
*unverified.header.difficulty(),
))
} else { None };

match self.importer.block_queue.import(unverified) {
Ok(res) => Ok(res),
Ok(hash) => {
if let Some((raw, hash, difficulty)) = raw {
self.notify(move |n| n.block_pre_import(&raw, &hash, &difficulty));
}
Ok(hash)
},
// we only care about block errors (not import errors)
Err((block, EthcoreError(EthcoreErrorKind::Block(err), _))) => {
self.importer.bad_blocks.report(block.bytes, format!("{:?}", err));
Expand Down Expand Up @@ -1878,6 +1891,10 @@ impl BlockChainClient for Client {
self.importer.block_queue.queue_info()
}

fn is_queue_empty(&self) -> bool {
self.importer.block_queue.is_empty()
}

fn clear_queue(&self) {
self.importer.block_queue.clear();
}
Expand Down Expand Up @@ -2288,7 +2305,11 @@ impl ScheduleInfo for Client {
impl ImportSealedBlock for Client {
fn import_sealed_block(&self, block: SealedBlock) -> EthcoreResult<H256> {
let start = Instant::now();
let raw = block.rlp_bytes();
let header = block.header().clone();
let hash = header.hash();
self.notify(|n| n.block_pre_import(&raw, &hash, header.difficulty()));

let route = {
// Do a super duper basic verification to detect potential bugs
if let Err(e) = self.engine.verify_block_basic(&header) {
Expand All @@ -2306,32 +2327,31 @@ impl ImportSealedBlock for Client {
let block_data = block.rlp_bytes();

let route = self.importer.commit_block(block, &header, encoded::Block::new(block_data), self);
trace!(target: "client", "Imported sealed block #{} ({})", header.number(), header.hash());
trace!(target: "client", "Imported sealed block #{} ({})", header.number(), hash);
self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
route
};
let h = header.hash();
let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(
self,
&[h],
&[hash],
&[],
route.enacted(),
route.retracted(),
self.engine.seals_internally().is_some(),
);
self.notify(|notify| {
notify.new_blocks(
vec![h],
vec![hash],
vec![],
route.clone(),
vec![h],
vec![hash],
vec![],
start.elapsed(),
);
});
self.db.read().key_value().flush().expect("DB flush failed.");
Ok(h)
Ok(hash)
}
}

Expand Down
5 changes: 5 additions & 0 deletions ethcore/src/client/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
/// Get block queue information.
fn queue_info(&self) -> BlockQueueInfo;

/// Returns true if block queue is empty.
fn is_queue_empty(&self) -> bool {
self.queue_info().is_empty()
}

/// Clear block queue and abort all import activity.
fn clear_queue(&self);

Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ impl Miner {
trace!(target: "miner", "requires_reseal: sealing enabled");

// Disable sealing if there were no requests for SEALING_TIMEOUT_IN_BLOCKS
let had_requests = sealing.last_request.map(|last_request|
let had_requests = sealing.last_request.map(|last_request|
best_block.saturating_sub(last_request) <= SEALING_TIMEOUT_IN_BLOCKS
).unwrap_or(false);

Expand Down
7 changes: 7 additions & 0 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ impl<K: Kind> VerificationQueue<K> {
result
}

/// Returns true if there is nothing currently in the queue.
/// TODO [ToDr] Optimize to avoid locking
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind to create a new issue and include an issue number for this TODO item?

pub fn is_empty(&self) -> bool {
let v = &self.verification;
v.unverified.lock().is_empty() && v.verifying.lock().is_empty() && v.verified.lock().is_empty()
}

/// Get queue status.
pub fn queue_info(&self) -> QueueInfo {
use std::mem::size_of;
Expand Down
95 changes: 76 additions & 19 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::sync::Arc;
use std::sync::{Arc, mpsc, atomic};
use std::collections::{HashMap, BTreeMap};
use std::io;
use std::ops::Range;
Expand All @@ -33,10 +33,10 @@ use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageTyp
use ethcore::snapshot::SnapshotService;
use ethcore::header::BlockNumber;
use sync_io::NetSyncIo;
use chain::{ChainSync, SyncStatus as EthSyncStatus};
use chain::{ChainSyncApi, SyncStatus as EthSyncStatus};
use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr;
use parking_lot::RwLock;
use parking_lot::{RwLock, Mutex};
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3,
PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
Expand Down Expand Up @@ -228,6 +228,37 @@ impl AttachedProtocol {
}
}

/// A prioritized tasks run in a specialised timer.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// A prioritized tasks run in a specialised timer.
/// A prioritized task runs in a specialised timer.

/// Every task should be completed within a hard deadline,
/// if it's not it's either cancelled or split into multiple tasks.
/// NOTE These tasks might not complete at all, so anything
/// that happens here should work even if the task is cancelled.
#[derive(Debug)]
pub enum PriorityTask {
/// Propagate given block
PropagateBlock {
/// When the task was initiated
started: ::std::time::Instant,
/// Raw block RLP to propagate
block: Bytes,
/// Block hash
hash: H256,
/// Blocks difficulty
difficulty: U256,
},
/// Propagate a list of transactions
PropagateTransactions(::std::time::Instant, Arc<atomic::AtomicBool>),
}
impl PriorityTask {
/// Mark the task as being processed, right after it's retrieved from the queue.
pub fn starting(&self) {
match *self {
PriorityTask::PropagateTransactions(_, ref is_ready) => is_ready.store(true, atomic::Ordering::SeqCst),
_ => {},
}
}
}

/// EthSync initialization parameters.
pub struct Params {
/// Configuration.
Expand Down Expand Up @@ -260,6 +291,8 @@ pub struct EthSync {
subprotocol_name: [u8; 3],
/// Light subprotocol name.
light_subprotocol_name: [u8; 3],
/// Priority tasks notification channel
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
}

fn light_params(
Expand Down Expand Up @@ -312,13 +345,19 @@ impl EthSync {
})
};

let chain_sync = ChainSync::new(params.config, &*params.chain, params.private_tx_handler.clone());
let (priority_tasks_tx, priority_tasks_rx) = mpsc::channel();
let sync = ChainSyncApi::new(
params.config,
&*params.chain,
params.private_tx_handler.clone(),
priority_tasks_rx,
);
let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?;

let sync = Arc::new(EthSync {
network: service,
eth_handler: Arc::new(SyncProtocolHandler {
sync: RwLock::new(chain_sync),
sync,
chain: params.chain,
snapshot_service: params.snapshot_service,
overlay: RwLock::new(HashMap::new()),
Expand All @@ -327,26 +366,32 @@ impl EthSync {
subprotocol_name: params.config.subprotocol_name,
light_subprotocol_name: params.config.light_subprotocol_name,
attached_protos: params.attached_protos,
priority_tasks: Mutex::new(priority_tasks_tx),
});

Ok(sync)
}

/// Priority tasks producer
pub fn priority_tasks(&self) -> mpsc::Sender<PriorityTask> {
self.priority_tasks.lock().clone()
}
}

impl SyncProvider for EthSync {
/// Get sync status
fn status(&self) -> EthSyncStatus {
self.eth_handler.sync.read().status()
self.eth_handler.sync.status()
}

/// Get sync peers
fn peers(&self) -> Vec<PeerInfo> {
self.network.with_context_eval(self.subprotocol_name, |ctx| {
let peer_ids = self.network.connected_peers();
let eth_sync = self.eth_handler.sync.read();
let light_proto = self.light_proto.as_ref();

peer_ids.into_iter().filter_map(|peer_id| {
let peer_info = self.eth_handler.sync.peer_info(&peer_ids);
peer_ids.into_iter().zip(peer_info).filter_map(|(peer_id, peer_info)| {
let session_info = match ctx.session_info(peer_id) {
None => return None,
Some(info) => info,
Expand All @@ -358,7 +403,7 @@ impl SyncProvider for EthSync {
capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
remote_address: session_info.remote_address,
local_address: session_info.local_address,
eth_info: eth_sync.peer_info(&peer_id),
eth_info: peer_info,
pip_info: light_proto.as_ref().and_then(|lp| lp.peer_status(peer_id)).map(Into::into),
})
}).collect()
Expand All @@ -370,25 +415,24 @@ impl SyncProvider for EthSync {
}

fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
let sync = self.eth_handler.sync.read();
sync.transactions_stats()
.iter()
.map(|(hash, stats)| (*hash, stats.into()))
.collect()
self.eth_handler.sync.transactions_stats()
}
}

const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2;
const PRIORITY_TIMER: TimerToken = 3;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be careful when merging this PR with #9967 because if it doesn't conflict we end up with duplicate tokens.


pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);

struct SyncProtocolHandler {
/// Shared blockchain client.
chain: Arc<BlockChainClient>,
/// Shared snapshot service.
snapshot_service: Arc<SnapshotService>,
/// Sync strategy
sync: RwLock<ChainSync>,
sync: ChainSyncApi,
/// Chain overlay used to cache data such as fork block.
overlay: RwLock<HashMap<BlockNumber, Bytes>>,
}
Expand All @@ -399,11 +443,13 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");

io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer");
}
}

fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
self.sync.dispatch_packet(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
}

fn connected(&self, io: &NetworkContext, peer: &PeerId) {
Expand All @@ -429,15 +475,26 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
TX_TIMER => {
self.sync.write().propagate_new_transactions(&mut io);
},
TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
_ => warn!("Unknown timer {} triggered.", timer),
}
}
}

impl ChainNotify for EthSync {
fn block_pre_import(&self, bytes: &Bytes, hash: &H256, difficulty: &U256) {
let task = PriorityTask::PropagateBlock {
started: ::std::time::Instant::now(),
block: bytes.clone(),
hash: *hash,
difficulty: *difficulty,
};
if let Err(e) = self.priority_tasks.lock().send(task) {
warn!(target: "sync", "Unexpected error during priority block propagation: {:?}", e);
}
}

fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
Expand Down
Loading