Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethcore/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ parity-bytes = "0.1"
parking_lot = "0.7"
rand = "0.4"
rlp = { version = "0.3.0", features = ["ethereum"] }
syncpacketid_derive = { path = "../../util/syncpacketid_derive" }
trace-time = "0.1"
triehash-ethereum = {version = "0.2", path = "../../util/triehash-ethereum" }

Expand Down
8 changes: 4 additions & 4 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr;
use parking_lot::{RwLock, Mutex};
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3,
PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket};
use light::client::AsLightClient;
use light::Provider;
use light::net::{
Expand Down Expand Up @@ -579,9 +579,9 @@ impl ChainNotify for EthSync {
match message_type {
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message),
ChainMessageType::PrivateTransaction(transaction_hash, message) =>
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, message),
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PrivateTransactionPacket, message),
ChainMessageType::SignedPrivateTransaction(transaction_hash, message) =>
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, message),
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SignedPrivateTransactionPacket, message),
}
});
}
Expand Down
86 changes: 47 additions & 39 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ use types::BlockNumber;
use types::block_status::BlockStatus;
use types::ids::BlockId;

use super::sync_packet::{PacketInfo, SyncPacket};
use super::sync_packet::SyncPacket::{
StatusPacket,
NewBlockHashesPacket,
BlockHeadersPacket,
BlockBodiesPacket,
NewBlockPacket,
ReceiptsPacket,
SnapshotManifestPacket,
SnapshotDataPacket,
PrivateTransactionPacket,
SignedPrivateTransactionPacket,
};

use super::{
BlockSet,
ChainSync,
Expand All @@ -48,16 +62,6 @@ use super::{
MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1,
PAR_PROTOCOL_VERSION_3,
BLOCK_BODIES_PACKET,
BLOCK_HEADERS_PACKET,
NEW_BLOCK_HASHES_PACKET,
NEW_BLOCK_PACKET,
PRIVATE_TRANSACTION_PACKET,
RECEIPTS_PACKET,
SIGNED_PRIVATE_TRANSACTION_PACKET,
SNAPSHOT_DATA_PACKET,
SNAPSHOT_MANIFEST_PACKET,
STATUS_PACKET,
};

/// The Chain Sync Handler: handles responses from peers
Expand All @@ -67,36 +71,40 @@ impl SyncHandler {
/// Handle incoming packet from peer
pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
let rlp = Rlp::new(data);
let result = match packet_id {
STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp),
BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
NEW_BLOCK_PACKET => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
NEW_BLOCK_HASHES_PACKET => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
SNAPSHOT_MANIFEST_PACKET => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
SNAPSHOT_DATA_PACKET => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
PRIVATE_TRANSACTION_PACKET => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
SIGNED_PRIVATE_TRANSACTION_PACKET => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
_ => {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
Ok(())
}
};
if let Some(packet_id) = SyncPacket::from_u8(packet_id) {
let result = match packet_id {
StatusPacket => SyncHandler::on_peer_status(sync, io, peer, &rlp),
BlockHeadersPacket => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
BlockBodiesPacket => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
_ => {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
Ok(())
}
};

match result {
Err(DownloaderImportError::Invalid) => {
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id);
io.disable_peer(peer);
sync.deactivate_peer(io, peer);
},
Err(DownloaderImportError::Useless) => {
sync.deactivate_peer(io, peer);
},
Ok(()) => {
// give a task to the same peer first
sync.sync_peer(io, peer, false);
},
match result {
Err(DownloaderImportError::Invalid) => {
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id());
io.disable_peer(peer);
sync.deactivate_peer(io, peer);
},
Err(DownloaderImportError::Useless) => {
sync.deactivate_peer(io, peer);
},
Ok(()) => {
// give a task to the same peer first
sync.sync_peer(io, peer, false);
},
}
} else {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
}
}

Expand Down
41 changes: 13 additions & 28 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
//! All other messages are ignored.

mod handler;
pub mod sync_packet;
mod propagator;
mod requester;
mod supplier;
Expand All @@ -103,7 +104,7 @@ use fastmap::{H256FastMap, H256FastSet};
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use bytes::Bytes;
use rlp::{RlpStream, DecoderError};
use network::{self, PeerId, PacketId, ProtocolId};
use network::{self, PeerId, PacketId};
use network::client_version::ClientVersion;
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
use ethcore::snapshot::{RestorationStatus};
Expand All @@ -112,13 +113,19 @@ use super::{WarpSync, SyncConfig};
use block_sync::{BlockDownloader, DownloadAction};
use rand::Rng;
use snapshot::{Snapshot};
use api::{EthProtocolInfo as PeerInfoDigest, ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID, PriorityTask};
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask};
use private_tx::PrivateTxHandler;
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
use types::transaction::UnverifiedTransaction;
use types::BlockNumber;

use self::handler::SyncHandler;
use self::sync_packet::{PacketInfo, SyncPacket};
use self::sync_packet::SyncPacket::{
NewBlockPacket,
StatusPacket,
};

use self::propagator::SyncPropagator;
use self::requester::SyncRequester;
pub(crate) use self::supplier::SyncSupplier;
Expand Down Expand Up @@ -154,28 +161,6 @@ const MAX_TRANSACTION_PACKET_SIZE: usize = 5 * 1024 * 1024;
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000;
const SNAPSHOT_MIN_PEERS: usize = 3;

pub const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
const TRANSACTIONS_PACKET: u8 = 0x02;
pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03;
pub const BLOCK_HEADERS_PACKET: u8 = 0x04;
pub const GET_BLOCK_BODIES_PACKET: u8 = 0x05;
const BLOCK_BODIES_PACKET: u8 = 0x06;
const NEW_BLOCK_PACKET: u8 = 0x07;

pub const GET_NODE_DATA_PACKET: u8 = 0x0d;
pub const NODE_DATA_PACKET: u8 = 0x0e;
pub const GET_RECEIPTS_PACKET: u8 = 0x0f;
pub const RECEIPTS_PACKET: u8 = 0x10;

pub const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
pub const SNAPSHOT_DATA_PACKET: u8 = 0x14;
pub const CONSENSUS_DATA_PACKET: u8 = 0x15;
pub const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
pub const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;

const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;

const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -484,7 +469,7 @@ impl ChainSyncApi {
for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
check_deadline(deadline)?;
for peer in peers {
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer, NEW_BLOCK_PACKET, rlp.clone());
SyncPropagator::send_packet(io, *peer, NewBlockPacket, rlp.clone());
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
peer.latest_hash = hash;
}
Expand Down Expand Up @@ -1146,7 +1131,7 @@ impl ChainSync {
}
}
packet.complete_unbounded_list();
io.respond(STATUS_PACKET, packet.out())
io.respond(StatusPacket.id(), packet.out())
}

pub fn maintain_peers(&mut self, io: &mut SyncIo) {
Expand Down Expand Up @@ -1331,8 +1316,8 @@ impl ChainSync {
}

/// Broadcast private transaction message to peers.
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, protocol, packet_id, packet);
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet);
}
}

Expand Down
34 changes: 19 additions & 15 deletions ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,33 @@
use std::cmp;
use std::collections::HashSet;

use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID};
use bytes::Bytes;
use ethereum_types::H256;
use fastmap::H256FastSet;
use network::{PeerId, PacketId, ProtocolId};
use network::client_version::ClientCapabilities;
use network::PeerId;
use rand::Rng;
use rlp::{Encodable, RlpStream};
use sync_io::SyncIo;
use types::transaction::SignedTransaction;
use types::BlockNumber;
use types::blockchain_info::BlockChainInfo;

use super::sync_packet::SyncPacket;
use super::sync_packet::SyncPacket::{
NewBlockHashesPacket,
TransactionsPacket,
NewBlockPacket,
ConsensusDataPacket,
};

use super::{
random,
ChainSync,
MAX_TRANSACTION_PACKET_SIZE,
MAX_PEER_LAG_PROPAGATION,
MAX_PEERS_PROPAGATION,
MIN_PEERS_PROPAGATION,
CONSENSUS_DATA_PACKET,
NEW_BLOCK_HASHES_PACKET,
NEW_BLOCK_PACKET,
TRANSACTIONS_PACKET,
};

/// The Chain Sync Propagator: propagates data to peers
Expand All @@ -53,7 +56,8 @@ impl SyncPropagator {
let sent = peers.len();
let mut send_packet = |io: &mut SyncIo, rlp: Bytes| {
for peer_id in peers {
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());

if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
peer.latest_hash = chain_info.best_block_hash.clone();
}
Expand Down Expand Up @@ -88,7 +92,7 @@ impl SyncPropagator {
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
peer.latest_hash = best_block_hash;
}
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone());
SyncPropagator::send_packet(io, *peer_id, NewBlockHashesPacket, rlp.clone());
}
sent
}
Expand Down Expand Up @@ -156,7 +160,7 @@ impl SyncPropagator {

let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
let size = rlp.len();
SyncPropagator::send_packet(io, ETH_PROTOCOL, peer_id, TRANSACTIONS_PACKET, rlp);
SyncPropagator::send_packet(io, peer_id, TransactionsPacket, rlp);
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
};

Expand Down Expand Up @@ -275,7 +279,7 @@ impl SyncPropagator {
io.chain().chain_info().total_difficulty
);
for peer_id in &peers {
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());
}
}
}
Expand All @@ -285,12 +289,12 @@ impl SyncPropagator {
let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers());
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
for peer_id in lucky_peers {
SyncPropagator::send_packet(io, WARP_SYNC_PROTOCOL_ID, peer_id, CONSENSUS_DATA_PACKET, packet.clone());
SyncPropagator::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
}
}

/// Broadcast private transaction message to peers.
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
if lucky_peers.is_empty() {
error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected");
Expand All @@ -300,7 +304,7 @@ impl SyncPropagator {
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
peer.last_sent_private_transactions.insert(transaction_hash);
}
SyncPropagator::send_packet(io, protocol, peer_id, packet_id, packet.clone());
SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone());
}
}
}
Expand All @@ -321,8 +325,8 @@ impl SyncPropagator {
}

/// Generic packet sender
pub fn send_packet(sync: &mut SyncIo, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
if let Err(e) = sync.send_protocol(protocol, peer_id, packet_id, packet) {
pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: SyncPacket, packet: Bytes) {
if let Err(e) = sync.send(peer_id, packet_id, packet) {
debug!(target:"sync", "Error sending packet: {:?}", e);
sync.disconnect_peer(peer_id);
}
Expand Down
Loading