Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ pub struct Params {
/// Snapshot service.
pub snapshot_service: Arc<SnapshotService>,
/// Private tx service.
pub private_tx_handler: Arc<PrivateTxHandler>,
pub private_tx_handler: Option<Arc<PrivateTxHandler>>,
/// Light data provider.
pub provider: Arc<::light::Provider>,
/// Network layer configuration.
Expand Down Expand Up @@ -349,7 +349,7 @@ impl EthSync {
let sync = ChainSyncApi::new(
params.config,
&*params.chain,
params.private_tx_handler.clone(),
params.private_tx_handler.as_ref().cloned(),
priority_tasks_rx,
);
let service = NetworkService::new(params.network_config.clone().into_basic()?, connection_filter)?;
Expand Down
45 changes: 37 additions & 8 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,9 @@ impl SyncHandler {
fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
sync.handshaking_peers.remove(&peer_id);
let protocol_version: u8 = r.val_at(0)?;
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0;
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id);
let warp_protocol = warp_protocol_version != 0;
let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0;
let peer = PeerInfo {
protocol_version: protocol_version,
network_id: r.val_at(1)?,
Expand All @@ -576,10 +578,26 @@ impl SyncHandler {
snapshot_hash: if warp_protocol { Some(r.val_at(5)?) } else { None },
snapshot_number: if warp_protocol { Some(r.val_at(6)?) } else { None },
block_set: None,
private_tx_enabled: if private_tx_protocol { r.val_at(7).unwrap_or(false) } else { false },
};

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})",
peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number);
trace!(target: "sync", "New peer {} (\
protocol: {}, \
network: {:?}, \
difficulty: {:?}, \
latest:{}, \
genesis:{}, \
snapshot:{:?}, \
private_tx_enabled:{})",
peer_id,
peer.protocol_version,
peer.network_id,
peer.difficulty,
peer.latest_hash,
peer.genesis,
peer.snapshot_number,
peer.private_tx_enabled
);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
return Ok(());
Expand Down Expand Up @@ -654,9 +672,15 @@ impl SyncHandler {
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
return Ok(());
}

let private_handler = match sync.private_tx_handler {
Some(ref handler) => handler,
None => {
trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id);
return Ok(());
}
};
trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id);
match sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) {
match private_handler.import_signed_private_transaction(r.as_raw()) {
Ok(transaction_hash) => {
//don't send the packet back
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
Expand All @@ -676,10 +700,15 @@ impl SyncHandler {
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
return Ok(());
}

let private_handler = match sync.private_tx_handler {
Some(ref handler) => handler,
None => {
trace!(target: "sync", "{} Ignoring private tx packet from peer", peer_id);
return Ok(());
}
};
trace!(target: "sync", "Received private transaction packet from {:?}", peer_id);

match sync.private_tx_handler.import_private_transaction(r.as_raw()) {
match private_handler.import_private_transaction(r.as_raw()) {
Ok(transaction_hash) => {
//don't send the packet back
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
Expand Down
23 changes: 16 additions & 7 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ pub struct PeerInfo {
last_sent_private_transactions: H256FastSet,
/// Pending request is expired and result should be ignored
expired: bool,
/// Private transactions enabled
private_tx_enabled: bool,
/// Peer fork confirmation status
confirmation: ForkConfirmation,
/// Best snapshot hash
Expand Down Expand Up @@ -395,7 +397,7 @@ impl ChainSyncApi {
pub fn new(
config: SyncConfig,
chain: &BlockChainClient,
private_tx_handler: Arc<PrivateTxHandler>,
private_tx_handler: Option<Arc<PrivateTxHandler>>,
priority_tasks: mpsc::Receiver<PriorityTask>,
) -> Self {
ChainSyncApi {
Expand Down Expand Up @@ -626,7 +628,7 @@ pub struct ChainSync {
/// Enable ancient block downloading
download_old_blocks: bool,
/// Shared private tx service.
private_tx_handler: Arc<PrivateTxHandler>,
private_tx_handler: Option<Arc<PrivateTxHandler>>,
/// Enable warp sync.
warp_sync: WarpSync,
}
Expand All @@ -636,7 +638,7 @@ impl ChainSync {
pub fn new(
config: SyncConfig,
chain: &BlockChainClient,
private_tx_handler: Arc<PrivateTxHandler>,
private_tx_handler: Option<Arc<PrivateTxHandler>>,
) -> Self {
let chain_info = chain.chain_info();
let best_block = chain.chain_info().best_block_number;
Expand Down Expand Up @@ -1120,9 +1122,11 @@ impl ChainSync {
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> {
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer);
let warp_protocol = warp_protocol_version != 0;
let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0;
let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63.0 };
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 });
let mut packet = RlpStream::new();
packet.begin_unbounded_list();
let chain = io.chain().chain_info();
packet.append(&(protocol as u32));
packet.append(&self.network_id);
Expand All @@ -1135,7 +1139,11 @@ impl ChainSync {
let manifest_hash = manifest.map_or(H256::new(), |m| keccak(m.into_rlp()));
packet.append(&manifest_hash);
packet.append(&block_number);
if private_tx_protocol {
packet.append(&self.private_tx_handler.is_some());
}
}
packet.complete_unbounded_list();
io.respond(STATUS_PACKET, packet.out())
}

Expand Down Expand Up @@ -1246,7 +1254,8 @@ impl ChainSync {
fn get_private_transaction_peers(&self, transaction_hash: &H256) -> Vec<PeerId> {
self.peers.iter().filter_map(
|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0
&& !p.last_sent_private_transactions.contains(transaction_hash) {
&& !p.last_sent_private_transactions.contains(transaction_hash)
&& p.private_tx_enabled {
Some(*id)
} else {
None
Expand Down Expand Up @@ -1342,7 +1351,6 @@ pub mod tests {
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo};
use ethcore::miner::{MinerService, PendingOrdering};
use types::header::Header;
use private_tx::NoopPrivateTxHandler;

pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
let mut header = Header::new();
Expand Down Expand Up @@ -1426,7 +1434,7 @@ pub mod tests {
}

pub fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync {
let mut sync = ChainSync::new(SyncConfig::default(), client, Arc::new(NoopPrivateTxHandler));
let mut sync = ChainSync::new(SyncConfig::default(), client, None);
insert_dummy_peer(&mut sync, 0, peer_latest_hash);
sync
}
Expand All @@ -1446,6 +1454,7 @@ pub mod tests {
last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(),
expired: false,
private_tx_enabled: false,
confirmation: super::ForkConfirmation::Confirmed,
snapshot_number: None,
snapshot_hash: None,
Expand Down
24 changes: 14 additions & 10 deletions ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,16 @@ impl SyncPropagator {
/// Broadcast private transaction message to peers.
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) {
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers);
for peer_id in lucky_peers {
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
peer.last_sent_private_transactions.insert(transaction_hash);
if lucky_peers.is_empty() {
error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected");
} else {
trace!(target: "privatetx", "Sending private transaction packet to {:?}", lucky_peers);
for peer_id in lucky_peers {
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
peer.last_sent_private_transactions.insert(transaction_hash);
}
SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone());
}
SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone());
}
}

Expand Down Expand Up @@ -350,7 +354,6 @@ impl SyncPropagator {
mod tests {
use ethcore::client::{BlockInfo, ChainInfo, EachBlockWith, TestBlockChainClient};
use parking_lot::RwLock;
use private_tx::NoopPrivateTxHandler;
use rlp::{Rlp};
use std::collections::{VecDeque};
use tests::helpers::{TestIo};
Expand Down Expand Up @@ -426,7 +429,7 @@ mod tests {
client.add_blocks(2, EachBlockWith::Uncle);
let queue = RwLock::new(VecDeque::new());
let block = client.block(BlockId::Latest).unwrap().into_inner();
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
sync.peers.insert(0,
PeerInfo {
// Messaging protocol
Expand All @@ -442,6 +445,7 @@ mod tests {
last_sent_transactions: Default::default(),
last_sent_private_transactions: Default::default(),
expired: false,
private_tx_enabled: false,
confirmation: ForkConfirmation::Confirmed,
snapshot_number: None,
snapshot_hash: None,
Expand Down Expand Up @@ -514,7 +518,7 @@ mod tests {
client.add_blocks(100, EachBlockWith::Uncle);
client.insert_transaction_to_queue();
// Sync with no peers
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
Expand Down Expand Up @@ -584,7 +588,7 @@ mod tests {
let mut client = TestBlockChainClient::new();
client.insert_transaction_with_gas_price_to_queue(U256::zero());
let block_hash = client.block_hash_delta_minus(1);
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
Expand Down Expand Up @@ -617,7 +621,7 @@ mod tests {
let tx1_hash = client.insert_transaction_to_queue();
let tx2_hash = client.insert_transaction_with_gas_price_to_queue(U256::zero());
let block_hash = client.block_hash_delta_minus(1);
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
let mut sync = ChainSync::new(SyncConfig::default(), &client, None);
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &queue, None);
Expand Down
4 changes: 2 additions & 2 deletions ethcore/sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl TestNet<EthPeer<TestBlockChainClient>> {
let chain = TestBlockChainClient::new();
let ss = Arc::new(TestSnapshotService::new());
let private_tx_handler = Arc::new(SimplePrivateTxHandler::default());
let sync = ChainSync::new(config.clone(), &chain, private_tx_handler.clone());
let sync = ChainSync::new(config.clone(), &chain, Some(private_tx_handler.clone()));
net.peers.push(Arc::new(EthPeer {
sync: RwLock::new(sync),
snapshot_service: ss,
Expand Down Expand Up @@ -395,7 +395,7 @@ impl TestNet<EthPeer<EthcoreClient>> {

let private_tx_handler = Arc::new(SimplePrivateTxHandler::default());
let ss = Arc::new(TestSnapshotService::new());
let sync = ChainSync::new(config, &*client, private_tx_handler.clone());
let sync = ChainSync::new(config, &*client, Some(private_tx_handler.clone()));
let peer = Arc::new(EthPeer {
sync: RwLock::new(sync),
snapshot_service: ss,
Expand Down
2 changes: 1 addition & 1 deletion parity/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn sync(
network_config: NetworkConfiguration,
chain: Arc<BlockChainClient>,
snapshot_service: Arc<SnapshotService>,
private_tx_handler: Arc<PrivateTxHandler>,
private_tx_handler: Option<Arc<PrivateTxHandler>>,
provider: Arc<Provider>,
_log_settings: &LogConfig,
attached_protos: Vec<AttachedProtocol>,
Expand Down
9 changes: 7 additions & 2 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use ethcore::verification::queue::VerifierSettings;
use ethcore_logger::{Config as LogConfig, RotatingLogger};
use ethcore_service::ClientService;
use ethereum_types::Address;
use sync::{self, SyncConfig};
use sync::{self, SyncConfig, PrivateTxHandler};
use miner::work_notify::WorkPoster;
use futures::IntoFuture;
use hash_fetch::{self, fetch};
Expand Down Expand Up @@ -666,13 +666,18 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
None
};

let private_tx_sync: Option<Arc<PrivateTxHandler>> = match cmd.private_tx_enabled {
true => Some(private_tx_service.clone() as Arc<PrivateTxHandler>),
false => None,
};

// create sync object
let (sync_provider, manage_network, chain_notify, priority_tasks) = modules::sync(
sync_config,
net_conf.clone().into(),
client.clone(),
snapshot_service.clone(),
private_tx_service.clone(),
private_tx_sync,
client.clone(),
&cmd.logger_config,
attached_protos,
Expand Down