Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,7 @@ impl client::blockchain::Backend for BlockchainDb {
}

fn hash(&self, number: block::Number) -> Result<Option<block::HeaderHash>, client::error::Error> {
Ok(self.db.get(columns::BLOCK_INDEX, &number_to_db_key(number))
.map_err(db_err)?
.map(|hash| block::HeaderHash::from_slice(&hash)))
Ok(self.header(BlockId::Number(number))?.map(|hdr| hdr.blake2_256().into()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fixed and tested in #152

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

}
}

Expand Down
2 changes: 1 addition & 1 deletion substrate/network/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl BlockCollection {
};

// crop to peers best
if range.start >= peer_best {
if range.start > peer_best {
trace!(target: "sync", "Out of range for peer {} ({} vs {})", peer_id, range.start, peer_best);
return None;
}
Expand Down
64 changes: 43 additions & 21 deletions substrate/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@

use std::collections::{HashMap, HashSet};
use futures::sync::{oneshot, mpsc};
use std::time::{Instant, Duration};
use std::collections::hash_map::Entry;
use io::SyncIo;
use protocol::Protocol;
use network::PeerId;
use primitives::Hash;
use primitives::{Hash, block::HeaderHash, block::Id as BlockId};
use client::BlockStatus;
use message::{self, Message};
use runtime_support::Hashable;

// TODO: Add additional spam/DoS attack protection.
const MESSAGE_LIFETIME_SECONDS: u64 = 600;

struct CandidateRequest {
id: message::RequestId,
completion: oneshot::Sender<Vec<u8>>,
Expand All @@ -47,7 +43,8 @@ pub struct Consensus {
our_candidate: Option<(Hash, Vec<u8>)>,
statement_sink: Option<mpsc::UnboundedSender<message::Statement>>,
bft_message_sink: Option<(mpsc::UnboundedSender<message::LocalizedBftMessage>, Hash)>,
messages: HashMap<Hash, (Instant, message::Message)>,
messages: Vec<(Hash, message::Message)>,
message_hashes: HashSet<Hash>,
}

impl Consensus {
Expand All @@ -59,6 +56,7 @@ impl Consensus {
statement_sink: None,
bft_message_sink: None,
messages: Default::default(),
message_hashes: Default::default(),
}
}

Expand All @@ -75,9 +73,9 @@ impl Consensus {
// Send out all known messages.
// TODO: limit by size
let mut known_messages = HashSet::new();
for (hash, &(_, ref m)) in self.messages.iter() {
for &(ref hash, ref message) in self.messages.iter() {
known_messages.insert(hash.clone());
protocol.send_message(io, peer_id, m.clone());
protocol.send_message(io, peer_id, message.clone());
}
self.peers.insert(peer_id, PeerConsensus {
candidate_fetch: None,
Expand All @@ -96,13 +94,13 @@ impl Consensus {
}

fn register_message(&mut self, hash: Hash, message: message::Message) {
if let Entry::Vacant(entry) = self.messages.entry(hash) {
entry.insert((Instant::now(), message));
if self.message_hashes.insert(hash) {
self.messages.push((hash, message));
}
}

pub fn on_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, statement: message::Statement, hash: Hash) {
if self.messages.contains_key(&hash) {
if self.message_hashes.contains(&hash) {
trace!(target:"sync", "Ignored already known statement from {}", peer_id);
}
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
Expand Down Expand Up @@ -137,11 +135,24 @@ impl Consensus {
}

pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::LocalizedBftMessage, hash: Hash) {
if self.messages.contains_key(&hash) {
if self.message_hashes.contains(&hash) {
trace!(target:"sync", "Ignored already known BFT message from {}", peer_id);
return;
}

match protocol.chain().block_status(&BlockId::Hash(message.parent_hash)) {
Err(e) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
return;
},
Ok(status) => {
if status != BlockStatus::InChain {
trace!(target:"sync", "Ignored unknown parent BFT message from {}, hash={}", peer_id, message.parent_hash);
Copy link
Contributor

@rphmeier rphmeier May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem right.

  • we are building on parent A
  • our peers produce B and begin building on B. Due to latency we still think the chain head is A.
  • we shouldn't ignore messages about that consensus just because there was latency in delivering B to us.

Or imagine if we're syncing and well behind the chain head. our peers might think we have those messages but we just ignored them! then things will stall again.

note that the property for correctness of the BFT implementation is that messages eventually reach and are processed by the other validators. In practice, "eventually" isn't possible, but we can at least make a best-effort of 10 minutes or so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what we want to keep:

  • unrecognized parent_hash messages (for some time)
  • parent_hash == chain head messages

what we don't want to keep

  • parent_hash in chain but has children already

return;
}
},
}

if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.known_messages.insert(hash);
// TODO: validate signature?
Expand All @@ -168,9 +179,9 @@ impl Consensus {
pub fn bft_messages(&mut self, parent_hash: Hash) -> mpsc::UnboundedReceiver<message::LocalizedBftMessage>{
let (sink, stream) = mpsc::unbounded();

for (_, message) in self.messages.iter() {
for &(_, ref message) in self.messages.iter() {
let bft_message = match *message {
(_, Message::BftMessage(ref msg)) => msg,
Message::BftMessage(ref msg) => msg,
_ => continue,
};

Expand Down Expand Up @@ -266,17 +277,28 @@ impl Consensus {
self.peers.remove(&peer_id);
}

pub fn collect_garbage(&mut self) {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS);
let now = Instant::now();
pub fn collect_garbage(&mut self, best_block_parent: Option<HeaderHash>) {
let before = self.messages.len();
self.messages.retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
let hashes = &mut self.message_hashes;
self.messages.retain(|&(ref hash, ref message)| {
best_block_parent.map_or(true, |parent_hash| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this not a no-op when None is passed?

if match *message {
Message::BftMessage(ref msg) => msg.parent_hash != parent_hash,
Message::Statement(ref msg) => msg.parent_hash != parent_hash,
_ => true,
} {
hashes.remove(hash);
true
} else {
false
}
})
});
if self.messages.len() != before {
trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len());
}
let messages = &self.messages;
for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| messages.contains_key(h));
peer.known_messages.retain(|h| hashes.contains(h));
}
}
}
1 change: 1 addition & 0 deletions substrate/network/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub enum SignedConsensusMessage {
/// A vote.
Vote(SignedConsensusVote),
}

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// A network message.
pub enum Message {
Expand Down
12 changes: 8 additions & 4 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use io::SyncIo;
use error;
use super::header_hash;

const REQUEST_TIMEOUT_SEC: u64 = 15;
const REQUEST_TIMEOUT_SEC: u64 = 40;
const PROTOCOL_VERSION: u32 = 0;

// Maximum allowed entries in `BlockResponse`
Expand Down Expand Up @@ -344,7 +344,7 @@ impl Protocol {
/// Perform time based maintenance.
pub fn tick(&self, io: &mut SyncIo) {
self.maintain_peers(io);
self.consensus.lock().collect_garbage();
self.consensus.lock().collect_garbage(None);
}

fn maintain_peers(&self, io: &mut SyncIo) {
Expand Down Expand Up @@ -387,6 +387,8 @@ impl Protocol {
return;
}

let mut sync = self.sync.write();
let mut consensus = self.consensus.lock();
{
let mut peers = self.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
Expand Down Expand Up @@ -420,8 +422,8 @@ impl Protocol {
handshaking_peers.remove(&peer_id);
debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id));
}
self.sync.write().new_peer(io, self, peer_id);
self.consensus.lock().new_peer(io, self, peer_id, &status.roles);
sync.new_peer(io, self, peer_id);
consensus.new_peer(io, self, peer_id, &status.roles);
}

/// Called when peer sends us new transactions
Expand Down Expand Up @@ -511,6 +513,8 @@ impl Protocol {
}));
}
}

self.consensus.lock().collect_garbage(Some(header.parent_hash));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this doesn't fire every block then we will have a lot of garbage piling up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems better to keep track of the header hash of the last notification and pass all the parent headers since then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That list would be huge if this is called at block 0 before connecting to any peers and then at the end of the full sync. For now I'll just clear all messages if there is a gap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fairly dangerous but fine if we can assume there are always enough validators at the head of the chain to complete consensus.

}

pub fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
Expand Down
11 changes: 7 additions & 4 deletions substrate/network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ impl ChainSync {
io.disable_peer(peer_id);
},
(Ok(BlockStatus::Unknown), 0) => {
debug!(target:"sync", "New peer with unkown genesis hash {} ({}).", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
io.disable_peer(peer_id);
},
(Ok(BlockStatus::Unknown), _) => {
let our_best = self.best_queued_number;
if our_best > 0 {
debug!(target:"sync", "New peer with unkown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with unknown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
self.peers.insert(peer_id, PeerSync {
common_hash: self.genesis_hash,
common_number: 0,
Expand Down Expand Up @@ -186,13 +186,15 @@ impl ChainSync {
trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", peer_id, block.hash, n);
vec![]
},
Ok(_) if n > 0 => {
Ok(our_best) if n > 0 => {
trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", peer_id, block.hash, n, our_best);
let n = n - 1;
peer.state = PeerSyncState::AncestorSearch(n);
Self::request_ancestry(io, protocol, peer_id, n);
return;
},
Ok(_) => { // genesis mismatch
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", peer_id);
io.disable_peer(peer_id);
return;
},
Expand Down Expand Up @@ -325,7 +327,7 @@ impl ChainSync {
let stale = header.number <= self.best_queued_number;
if stale {
if !self.is_known_or_already_downloading(protocol, &header.parent_hash) {
trace!(target: "sync", "Ignoring unkown stale block announce from {}: {} {:?}", peer_id, hash, header);
trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", peer_id, hash, header);
} else {
trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", peer_id, hash, header);
self.download_stale(io, protocol, peer_id, &hash);
Expand Down Expand Up @@ -423,6 +425,7 @@ impl ChainSync {
}

fn request_ancestry(io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, block: BlockNumber) {
trace!(target: "sync", "Requesting potential common ancestor {} from {}", block, peer_id);
let request = message::BlockRequest {
id: 0,
fields: vec![message::BlockAttribute::Header, message::BlockAttribute::Justification],
Expand Down