Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions ethcore/light/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use common_types::blockchain_info::BlockChainInfo;
use common_types::encoded;
use common_types::header::Header;
use common_types::ids::BlockId;
use common_types::verification_queue_info::VerificationQueueInfo as BlockQueueInfo;

use kvdb::KeyValueDB;

Expand Down Expand Up @@ -91,6 +92,9 @@ pub trait LightChainClient: Send + Sync {
/// Attempt to get a block hash by block id.
fn block_hash(&self, id: BlockId) -> Option<H256>;

/// Get block queue information.
fn queue_info(&self) -> BlockQueueInfo;

/// Attempt to get block header by block id.
fn block_header(&self, id: BlockId) -> Option<encoded::Header>;

Expand Down Expand Up @@ -125,9 +129,6 @@ pub trait LightChainClient: Send + Sync {
/// Flush the queue.
fn flush_queue(&self);

/// Get queue info.
fn queue_info(&self) -> queue::QueueInfo;

/// Get the `i`th CHT root.
fn cht_root(&self, i: usize) -> Option<H256>;

Expand Down Expand Up @@ -534,13 +535,18 @@ impl<T: ChainDataFetcher> Client<T> {
}
}


impl<T: ChainDataFetcher> LightChainClient for Client<T> {
fn add_listener(&self, listener: Weak<LightChainNotify>) {
Client::add_listener(self, listener)
}

fn chain_info(&self) -> BlockChainInfo { Client::chain_info(self) }

fn queue_info(&self) -> queue::QueueInfo {
self.queue.queue_info()
}

fn queue_header(&self, header: Header) -> EthcoreResult<H256> {
self.import_header(header)
}
Expand Down Expand Up @@ -600,10 +606,6 @@ impl<T: ChainDataFetcher> LightChainClient for Client<T> {
Client::flush_queue(self);
}

fn queue_info(&self) -> queue::QueueInfo {
self.queue.queue_info()
}

fn cht_root(&self, i: usize) -> Option<H256> {
Client::cht_root(self, i)
}
Expand Down
8 changes: 4 additions & 4 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,10 @@ impl BlockChainClient for Client {
r
}

fn queue_info(&self) -> BlockQueueInfo {
self.importer.block_queue.queue_info()
}

fn disable(&self) {
self.set_mode(Mode::Off);
self.enabled.store(false, AtomicOrdering::Relaxed);
Expand Down Expand Up @@ -1934,10 +1938,6 @@ impl BlockChainClient for Client {
self.chain.read().block_receipts(hash)
}

fn queue_info(&self) -> BlockQueueInfo {
self.importer.block_queue.queue_info()
}

fn is_queue_empty(&self) -> bool {
self.importer.block_queue.is_empty()
}
Expand Down
26 changes: 13 additions & 13 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use client::{
TransactionId, UncleId, TraceId, TraceFilter, LastHashes, CallAnalytics,
ProvingBlockChainClient, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock, StateOrBlock,
Call, StateClient, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter, IoClient,
BadBlocks,
BadBlocks
};
use engines::EthEngine;
use error::{Error, EthcoreResult};
Expand All @@ -68,7 +68,7 @@ use spec::Spec;
use state::StateInfo;
use state_db::StateDB;
use trace::LocalizedTrace;
use verification::queue::QueueInfo;
use verification::queue::QueueInfo as BlockQueueInfo;
use verification::queue::kind::blocks::Unverified;

/// Test client.
Expand Down Expand Up @@ -649,6 +649,17 @@ impl BlockChainClient for TestBlockChainClient {
self.execution_result.read().clone().unwrap()
}

fn queue_info(&self) -> BlockQueueInfo {
BlockQueueInfo {
verified_queue_size: self.queue_size.load(AtomicOrder::Relaxed),
unverified_queue_size: 0,
verifying_queue_size: 0,
max_queue_size: 0,
max_mem_use: 0,
mem_used: 0,
}
}

fn replay_block_transactions(&self, _block: BlockId, _analytics: CallAnalytics) -> Result<Box<Iterator<Item = (H256, Executed)>>, CallError> {
Ok(Box::new(self.traces.read().clone().unwrap().into_iter().map(|t| t.transaction_hash.unwrap_or(H256::new())).zip(self.execution_result.read().clone().unwrap().into_iter())))
}
Expand Down Expand Up @@ -817,17 +828,6 @@ impl BlockChainClient for TestBlockChainClient {
None
}

fn queue_info(&self) -> QueueInfo {
QueueInfo {
verified_queue_size: self.queue_size.load(AtomicOrder::Relaxed),
unverified_queue_size: 0,
verifying_queue_size: 0,
max_queue_size: 0,
max_mem_use: 0,
mem_used: 0,
}
}

fn clear_queue(&self) {
}

Expand Down
6 changes: 3 additions & 3 deletions ethcore/src/client/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
.expect("code will return Some if given BlockId::Latest; qed")
}

/// Get block queue information.
fn queue_info(&self) -> BlockQueueInfo;

/// Get address code hash at given block's state.

/// Get value of the storage at given position at the given block's state.
Expand Down Expand Up @@ -285,9 +288,6 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
/// Get block receipts data by block header hash.
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts>;

/// Get block queue information.
fn queue_info(&self) -> BlockQueueInfo;

/// Returns true if block queue is empty.
fn is_queue_empty(&self) -> bool {
self.queue_info().is_empty()
Expand Down
2 changes: 2 additions & 0 deletions ethcore/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ rand = "0.4"
rlp = { version = "0.3.0", features = ["ethereum"] }
Comment thread
seunlanlege marked this conversation as resolved.
trace-time = "0.1"
triehash-ethereum = {version = "0.2", path = "../../util/triehash-ethereum" }
futures = "0.1"
parity-runtime = { path = "../../util/runtime" }

[dev-dependencies]
env_logger = "0.5"
Expand Down
52 changes: 51 additions & 1 deletion ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use network::client_version::ClientVersion;

use types::pruning_info::PruningInfo;
use ethereum_types::{H256, H512, U256};
use futures::sync::mpsc as futures_mpsc;
use futures::Stream;
use io::{TimerToken};
use ethkey::Secret;
use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainMessageType};
Expand All @@ -39,14 +41,16 @@ use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr;
use parking_lot::{RwLock, Mutex};
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3, SyncState};
use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket};
use light::client::AsLightClient;
use light::Provider;
use light::net::{
self as light_net, LightProtocol, Params as LightParams,
Capabilities, Handler as LightHandler, EventContext, SampleStore,
};
use parity_runtime::Executor;
use std::sync::atomic::{AtomicBool, Ordering};
use network::IpFilter;
use private_tx::PrivateTxHandler;
use types::transaction::UnverifiedTransaction;
Expand Down Expand Up @@ -131,6 +135,9 @@ impl Default for SyncConfig {
}
}

/// receiving end of a futures::mpsc channel
pub type Notification<T> = futures_mpsc::UnboundedReceiver<T>;
Comment thread
seunlanlege marked this conversation as resolved.

/// Current sync status
pub trait SyncProvider: Send + Sync {
/// Get sync status
Expand All @@ -142,8 +149,14 @@ pub trait SyncProvider: Send + Sync {
/// Get the enode if available.
fn enode(&self) -> Option<String>;

/// gets sync status notifications
fn sync_notification(&self) -> Notification<SyncState>;

/// Returns propagation count for pending transactions.
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats>;

/// are we in the middle of a major sync?
fn is_major_syncing(&self) -> bool;
}

/// Transaction stats
Expand Down Expand Up @@ -266,6 +279,8 @@ impl PriorityTask {
pub struct Params {
/// Configuration.
pub config: SyncConfig,
/// Runtime executor
pub executor: Executor,
/// Blockchain client.
pub chain: Arc<BlockChainClient>,
/// Snapshot service.
Expand Down Expand Up @@ -296,6 +311,8 @@ pub struct EthSync {
light_subprotocol_name: [u8; 3],
/// Priority tasks notification channel
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
/// for state tracking
is_major_syncing: Arc<AtomicBool>
}

fn light_params(
Expand Down Expand Up @@ -355,6 +372,30 @@ impl EthSync {
params.private_tx_handler.as_ref().cloned(),
priority_tasks_rx,
);

let is_major_syncing = Arc::new(AtomicBool::new(false));

{
// spawn task that constantly updates EthSync.is_major_sync
let notifications = sync.write().sync_notifications();
let moved_client = Arc::downgrade(&params.chain);
let moved_is_major_syncing = is_major_syncing.clone();

params.executor.spawn(notifications.for_each(move |sync_status| {
if let Some(queue_info) = moved_client.upgrade().map(|client| client.queue_info()) {
let is_syncing_state = match sync_status {
SyncState::Idle | SyncState::NewBlocks => false,
_ => true
};
let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3;
moved_is_major_syncing.store(is_verifying || is_syncing_state, Ordering::SeqCst);
Comment thread
seunlanlege marked this conversation as resolved.
return Ok(())
}

// client has been dropped
return Err(())
}));
}
let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?;

let sync = Arc::new(EthSync {
Expand All @@ -370,6 +411,7 @@ impl EthSync {
light_subprotocol_name: params.config.light_subprotocol_name,
attached_protos: params.attached_protos,
priority_tasks: Mutex::new(priority_tasks_tx),
is_major_syncing
});

Ok(sync)
Expand Down Expand Up @@ -420,6 +462,14 @@ impl SyncProvider for EthSync {
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
self.eth_handler.sync.transactions_stats()
}

fn sync_notification(&self) -> Notification<SyncState> {
self.eth_handler.sync.write().sync_notifications()
}

fn is_major_syncing(&self) -> bool {
self.is_major_syncing.load(Ordering::SeqCst)
Comment thread
seunlanlege marked this conversation as resolved.
}
}

const PEERS_TIMER: TimerToken = 0;
Expand Down
Loading