Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 115 additions & 12 deletions core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1587,24 +1587,28 @@ impl<'a, B, E, Block, RA> consensus::BlockImport<Block> for &'a Client<B, E, Blo
}
}

match self.block_status(&BlockId::Hash(parent_hash))
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued => {},
BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
BlockStatus::InChainPruned if allow_missing_state => {},
BlockStatus::InChainPruned => return Ok(ImportResult::MissingState),
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}

// Own status must be checked first. If the block and ancestry is pruned
// this function must return `AlreadyInChain` rather than `MissingState`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment seems incongruous to the code that follows it:

BlockStatus::InChainPruned => {}

so it doesn't return AlreadyInChain ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

match self.block_status(&BlockId::Hash(hash))
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued => return Ok(ImportResult::AlreadyInChain),
BlockStatus::Unknown | BlockStatus::InChainPruned => {},
BlockStatus::InChainPruned => return Ok(ImportResult::AlreadyInChain),
BlockStatus::Unknown => {},
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}

match self.block_status(&BlockId::Hash(parent_hash))
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued => {},
BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
BlockStatus::InChainPruned if allow_missing_state => {},
BlockStatus::InChainPruned => return Ok(ImportResult::MissingState),
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}


Ok(ImportResult::imported(false))
}
}
Expand Down Expand Up @@ -1919,7 +1923,7 @@ pub(crate) mod tests {
use super::*;
use primitives::blake2_256;
use sr_primitives::DigestItem;
use consensus::{BlockOrigin, SelectChain};
use consensus::{BlockOrigin, SelectChain, BlockImport};
use test_client::{
prelude::*,
client_db::{Backend, DatabaseSettings, DatabaseSettingsSrc, PruningMode},
Expand Down Expand Up @@ -2889,4 +2893,103 @@ pub(crate) mod tests {
expected_err.to_string(),
);
}

#[test]
fn returns_status_for_pruned_blocks() {
let _ = env_logger::try_init();
let tmp = tempfile::tempdir().unwrap();

// set to prune after 1 block
// states
let backend = Arc::new(Backend::new(
DatabaseSettings {
state_cache_size: 1 << 20,
state_cache_child_ratio: None,
pruning: PruningMode::keep_blocks(1),
source: DatabaseSettingsSrc::Path {
path: tmp.path().into(),
cache_size: None,
}
},
u64::max_value(),
).unwrap());

let mut client = TestClientBuilder::with_backend(backend).build();

let a1 = client.new_block_at(&BlockId::Number(0), Default::default())
.unwrap().bake().unwrap();

let mut b1 = client.new_block_at(&BlockId::Number(0), Default::default()).unwrap();

// b1 is created, but not imported
b1.push_transfer(Transfer {
from: AccountKeyring::Alice.into(),
to: AccountKeyring::Ferdie.into(),
amount: 1,
nonce: 0,
}).unwrap();
let b1 = b1.bake().unwrap();

let check_block_a1 = BlockCheckParams {
hash: a1.hash().clone(),
number: 0,
parent_hash: a1.header().parent_hash().clone(),
allow_missing_state: false
};

assert_eq!(client.check_block(check_block_a1.clone()).unwrap(), ImportResult::imported(false));
assert_eq!(client.block_status(&BlockId::hash(check_block_a1.hash)).unwrap(), BlockStatus::Unknown);

client.import_as_final(BlockOrigin::Own, a1.clone()).unwrap();

assert_eq!(client.check_block(check_block_a1.clone()).unwrap(), ImportResult::AlreadyInChain);
assert_eq!(client.block_status(&BlockId::hash(check_block_a1.hash)).unwrap(), BlockStatus::InChainWithState);

let a2 = client.new_block_at(&BlockId::Hash(a1.hash()), Default::default())
.unwrap().bake().unwrap();
client.import_as_final(BlockOrigin::Own, a2.clone()).unwrap();

let check_block_a2 = BlockCheckParams {
hash: a2.hash().clone(),
number: 1,
parent_hash: a1.header().parent_hash().clone(),
allow_missing_state: false
};

assert_eq!(client.check_block(check_block_a1.clone()).unwrap(), ImportResult::AlreadyInChain);
assert_eq!(client.block_status(&BlockId::hash(check_block_a1.hash)).unwrap(), BlockStatus::InChainPruned);
assert_eq!(client.check_block(check_block_a2.clone()).unwrap(), ImportResult::AlreadyInChain);
assert_eq!(client.block_status(&BlockId::hash(check_block_a2.hash)).unwrap(), BlockStatus::InChainWithState);

let a3 = client.new_block_at(&BlockId::Hash(a2.hash()), Default::default())
.unwrap().bake().unwrap();

client.import_as_final(BlockOrigin::Own, a3.clone()).unwrap();
let check_block_a3 = BlockCheckParams {
hash: a3.hash().clone(),
number: 2,
parent_hash: a2.header().parent_hash().clone(),
allow_missing_state: false
};

// a1 and a2 are both pruned at this point
assert_eq!(client.check_block(check_block_a1.clone()).unwrap(), ImportResult::AlreadyInChain);
assert_eq!(client.block_status(&BlockId::hash(check_block_a1.hash)).unwrap(), BlockStatus::InChainPruned);
assert_eq!(client.check_block(check_block_a2.clone()).unwrap(), ImportResult::AlreadyInChain);
assert_eq!(client.block_status(&BlockId::hash(check_block_a2.hash)).unwrap(), BlockStatus::InChainPruned);
assert_eq!(client.check_block(check_block_a3.clone()).unwrap(), ImportResult::AlreadyInChain);
assert_eq!(client.block_status(&BlockId::hash(check_block_a3.hash)).unwrap(), BlockStatus::InChainWithState);

let mut check_block_b1 = BlockCheckParams {
hash: b1.hash().clone(),
number: 0,
parent_hash: b1.header().parent_hash().clone(),
allow_missing_state: false
};
assert_eq!(client.check_block(check_block_b1.clone()).unwrap(), ImportResult::MissingState);
check_block_b1.allow_missing_state = true;
assert_eq!(client.check_block(check_block_b1.clone()).unwrap(), ImportResult::imported(false));
check_block_b1.parent_hash = H256::random();
assert_eq!(client.check_block(check_block_b1.clone()).unwrap(), ImportResult::UnknownParent);
}
}
1 change: 1 addition & 0 deletions core/consensus/common/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub enum ForkChoiceStrategy {
}

/// Data required to check validity of a Block.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BlockCheckParams<Block: BlockT> {
/// Hash of the block that we verify.
pub hash: Block::Hash,
Expand Down
4 changes: 3 additions & 1 deletion core/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ pub enum BlockImportError {
VerificationFailed(Option<Origin>, String),
/// Block is known to be Bad
BadBlock(Option<Origin>),
/// Parent state is missing.
MissingState,
/// Block has an unknown parent
UnknownParent,
/// Block import has been cancelled. This can happen if the parent block fails to be imported.
Expand Down Expand Up @@ -207,7 +209,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux, peer.clone())),
Ok(ImportResult::MissingState) => {
debug!(target: "sync", "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash);
Err(BlockImportError::UnknownParent)
Err(BlockImportError::MissingState)
},
Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash);
Expand Down
16 changes: 8 additions & 8 deletions core/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,12 +1009,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who.clone(), status.roles, status.best_number);
match self.sync.new_peer(who.clone(), info) {
Ok(None) => (),
Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)),
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu)
if info.roles.is_full() {
match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) {
Ok(None) => (),
Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)),
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu)
}
}
}
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
Expand Down Expand Up @@ -1341,12 +1343,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
count: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {
let peers = self.context_data.peers.clone();
let results = self.sync.on_blocks_processed(
imported,
count,
results,
|peer_id| peers.get(peer_id).map(|i| i.info.clone())
);
for result in results {
match result {
Expand Down
68 changes: 34 additions & 34 deletions core/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::{
config::{Roles, BoxFinalityProofRequestBuilder},
message::{self, generic::FinalityProofRequest, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse,
FinalityProofResponse},
protocol
};
use either::Either;
use extra_requests::ExtraRequests;
Expand Down Expand Up @@ -347,23 +346,22 @@ impl<B: BlockT> ChainSync<B> {
/// Handle a new connected peer.
///
/// Call this method whenever we connect to a new peer.
pub fn new_peer(&mut self, who: PeerId, info: protocol::PeerInfo<B>) -> Result<Option<BlockRequest<B>>, BadPeer> {
pub fn new_peer(&mut self, who: PeerId, best_hash: B::Hash, best_number: NumberFor<B>)
-> Result<Option<BlockRequest<B>>, BadPeer>
{
// There is nothing sync can get from the node that has no blockchain data.
if !info.roles.is_full() {
return Ok(None)
}
match self.block_status(&info.best_hash) {
match self.block_status(&best_hash) {
Err(e) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
Err(BadPeer(who, BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE))
}
Ok(BlockStatus::KnownBad) => {
info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number);
info!("New peer with known bad best block {} ({}).", best_hash, best_number);
Err(BadPeer(who, i32::min_value()))
}
Ok(BlockStatus::Unknown) => {
if info.best_number.is_zero() {
info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
if best_number.is_zero() {
info!("New peer with unknown genesis hash {} ({}).", best_hash, best_number);
return Err(BadPeer(who, i32::min_value()))
}
// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
Expand All @@ -378,8 +376,8 @@ impl<B: BlockT> ChainSync<B> {
);
self.peers.insert(who, PeerSync {
common_number: self.best_queued_number,
best_hash: info.best_hash,
best_number: info.best_number,
best_hash,
best_number,
state: PeerSyncState::Available,
recently_announced: Default::default()
});
Expand All @@ -388,30 +386,30 @@ impl<B: BlockT> ChainSync<B> {

// If we are at genesis, just start downloading.
if self.best_queued_number.is_zero() {
debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with best hash {} ({}).", best_hash, best_number);
self.peers.insert(who.clone(), PeerSync {
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
best_hash,
best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
self.is_idle = false;
return Ok(None)
}

let common_best = std::cmp::min(self.best_queued_number, info.best_number);
let common_best = std::cmp::min(self.best_queued_number, best_number);

debug!(target:"sync",
"New peer with unknown best hash {} ({}), searching for common ancestor.",
info.best_hash,
info.best_number
best_hash,
best_number
);

self.peers.insert(who, PeerSync {
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
best_hash,
best_number,
state: PeerSyncState::AncestorSearch(
common_best,
AncestorSearchState::ExponentialBackoff(One::one())
Expand All @@ -423,11 +421,11 @@ impl<B: BlockT> ChainSync<B> {
Ok(Some(ancestry_request::<B>(common_best)))
}
Ok(BlockStatus::Queued) | Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => {
debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with known best hash {} ({}).", best_hash, best_number);
self.peers.insert(who.clone(), PeerSync {
common_number: info.best_number,
best_hash: info.best_hash,
best_number: info.best_number,
common_number: best_number,
best_hash,
best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
Expand Down Expand Up @@ -849,7 +847,6 @@ impl<B: BlockT> ChainSync<B> {
imported: usize,
count: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>,
mut peer_info: impl FnMut(&PeerId) -> Option<protocol::PeerInfo<B>>
) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
trace!(target: "sync", "Imported {} of {}", imported, count);

Expand Down Expand Up @@ -902,27 +899,33 @@ impl<B: BlockT> ChainSync<B> {
if let Some(peer) = who {
info!("Peer sent block with incomplete header to import");
output.push(Err(BadPeer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE)));
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
}
},
Err(BlockImportError::VerificationFailed(who, e)) => {
if let Some(peer) = who {
info!("Verification failed from peer: {}", e);
output.push(Err(BadPeer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE)));
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
}
},
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
info!("Bad block");
output.push(Err(BadPeer(peer, BAD_BLOCK_REPUTATION_CHANGE)));
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
}
},
Err(BlockImportError::MissingState) => {
// This may happen if the chain we were requesting upon has been discarded
// in the meantime becasue other chain has been finalized.
// Don't mark it as bad as it still may be synced if explicitly requested.
trace!(target: "sync", "Obsolete block");
},
Err(BlockImportError::UnknownParent) |
Err(BlockImportError::Cancelled) |
Err(BlockImportError::Other(_)) => {
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
},
};
}
Expand Down Expand Up @@ -1121,9 +1124,7 @@ impl<B: BlockT> ChainSync<B> {
}

/// Restart the sync process.
fn restart<'a, F>
(&'a mut self, mut peer_info: F) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a
where F: FnMut(&PeerId) -> Option<protocol::PeerInfo<B>> + 'a
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a
{
self.queue_blocks.clear();
self.best_importing_number = Zero::zero();
Expand All @@ -1134,9 +1135,8 @@ impl<B: BlockT> ChainSync<B> {
self.is_idle = false;
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::replace(&mut self.peers, HashMap::new());
old_peers.into_iter().filter_map(move |(id, _)| {
let info = peer_info(&id)?;
match self.new_peer(id.clone(), info) {
old_peers.into_iter().filter_map(move |(id, p)| {
match self.new_peer(id.clone(), p.best_hash, p.best_number) {
Ok(None) => None,
Ok(Some(x)) => Some(Ok((id, x))),
Err(e) => Some(Err(e))
Expand Down
Loading