Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify the method Relayer::accept_block to return StatusCode::BlockIsInvalid when shared.insert_new_block() produces an error. #4333

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ ckb-chain = { path = "../chain", version = "= 0.114.0-pre", features = ["mock"]
faux = "^0.1"
once_cell = "1.8.0"
ckb-systemtime = { path = "../util/systemtime", version = "= 0.114.0-pre" , features = ["enable_faketime"]}
ckb-proposal-table = { path = "../util/proposal-table", version = "= 0.114.0-pre" }

[features]
default = []
Expand Down
5 changes: 3 additions & 2 deletions sync/src/relayer/block_transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ impl<'a> BlockTransactionsProcess<'a> {
match ret {
ReconstructionResult::Block(block) => {
pending.remove();
self.relayer
let status = self
.relayer
.accept_block(self.nc.as_ref(), self.peer, block);
return Status::ok();
return status;
}
ReconstructionResult::Missing(transactions, uncles) => {
// We need to get all transactions and uncles that do not exist locally
Expand Down
5 changes: 3 additions & 2 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,16 @@ impl<'a> CompactBlockProcess<'a> {
>= block.epoch().number()
});
shrink_to_fit!(pending_compact_blocks, 20);
self.relayer
let status = self
.relayer
.accept_block(self.nc.as_ref(), self.peer, block);

if let Some(metrics) = ckb_metrics::handle() {
metrics
.ckb_relay_cb_verify_duration
.observe(instant.elapsed().as_secs_f64());
}
Status::ok()
status
}
ReconstructionResult::Missing(transactions, uncles) => {
let missing_transactions: Vec<u32> =
Expand Down
166 changes: 95 additions & 71 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use self::transaction_hashes_process::TransactionHashesProcess;
use self::transactions_process::TransactionsProcess;
use crate::block_status::BlockStatus;
use crate::types::{ActiveChain, BlockNumberAndHash, SyncShared};
use crate::utils::{metric_ckb_message_bytes, send_message_to, MetricDirection};
use crate::utils::{
is_internal_db_error, metric_ckb_message_bytes, send_message_to, MetricDirection,
};
use crate::{Status, StatusCode};
use ckb_chain::chain::ChainController;
use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
Expand Down Expand Up @@ -284,99 +286,121 @@ impl Relayer {
nc: &dyn CKBProtocolContext,
peer: PeerIndex,
block: core::BlockView,
) {
) -> Status {
if self
.shared()
.active_chain()
.contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
{
return;
return Status::ok();
}

let boxed = Arc::new(block);
if self
let boxed: Arc<BlockView> = Arc::new(block);
match self
.shared()
.insert_new_block(&self.chain, Arc::clone(&boxed))
.unwrap_or(false)
{
Ok(true) => self.broadcast_compact_block(nc, peer, &boxed),
Ok(false) => debug_target!(
crate::LOG_TARGET_RELAY,
"Relayer accept_block received an uncle block, don't broadcast compact block"
),
Err(err) => {
if !is_internal_db_error(&err) {
return StatusCode::BlockIsInvalid.with_context(format!(
"{}, error: {}",
boxed.hash(),
err,
));
}
}
}
Status::ok()
}

fn broadcast_compact_block(
&self,
nc: &dyn CKBProtocolContext,
peer: PeerIndex,
boxed: &Arc<BlockView>,
) {
debug_target!(
crate::LOG_TARGET_RELAY,
"[block_relay] relayer accept_block {} {}",
boxed.header().hash(),
unix_time_as_millis()
);
let block_hash = boxed.hash();
self.shared().state().remove_header_view(&block_hash);
let cb = packed::CompactBlock::build_from_block(boxed, &HashSet::new());
let message = packed::RelayMessage::new_builder().set(cb).build();

let selected_peers: Vec<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter(|target_peer| peer != *target_peer)
.take(MAX_RELAY_PEERS)
.collect();
if let Err(err) = nc.quick_filter_broadcast(
TargetSession::Multi(Box::new(selected_peers.into_iter())),
message.as_bytes(),
) {
debug_target!(
crate::LOG_TARGET_RELAY,
"[block_relay] relayer accept_block {} {}",
boxed.header().hash(),
unix_time_as_millis()
"relayer send block when accept block error: {:?}",
err,
);
let block_hash = boxed.hash();
self.shared().state().remove_header_view(&block_hash);
let cb = packed::CompactBlock::build_from_block(&boxed, &HashSet::new());
let message = packed::RelayMessage::new_builder().set(cb).build();
}

if let Some(p2p_control) = nc.p2p_control() {
let snapshot = self.shared.shared().snapshot();
let parent_chain_root = {
let mmr = snapshot.chain_root_mmr(boxed.header().number() - 1);
match mmr.get_root() {
Ok(root) => root,
Err(err) => {
error_target!(
crate::LOG_TARGET_RELAY,
"Generate last state to light client failed: {:?}",
err
);
return;
}
}
};

let selected_peers: Vec<PeerIndex> = nc
let tip_header = packed::VerifiableHeader::new_builder()
.header(boxed.header().data())
.uncles_hash(boxed.calc_uncles_hash())
.extension(Pack::pack(&boxed.extension()))
.parent_chain_root(parent_chain_root)
.build();
let light_client_message = {
let content = packed::SendLastState::new_builder()
.last_header(tip_header)
.build();
packed::LightClientMessage::new_builder()
.set(content)
.build()
};
let light_client_peers: HashSet<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter(|target_peer| peer != *target_peer)
.take(MAX_RELAY_PEERS)
.filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
.filter(|(_id, peer)| peer.if_lightclient_subscribed)
.map(|(id, _)| id)
.collect();
if let Err(err) = nc.quick_filter_broadcast(
TargetSession::Multi(Box::new(selected_peers.into_iter())),
message.as_bytes(),
if let Err(err) = p2p_control.filter_broadcast(
TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
SupportProtocols::LightClient.protocol_id(),
light_client_message.as_bytes(),
) {
debug_target!(
crate::LOG_TARGET_RELAY,
"relayer send block when accept block error: {:?}",
"relayer send last state to light client when accept block, error: {:?}",
err,
);
}

if let Some(p2p_control) = nc.p2p_control() {
let snapshot = self.shared.shared().snapshot();
let parent_chain_root = {
let mmr = snapshot.chain_root_mmr(boxed.header().number() - 1);
match mmr.get_root() {
Ok(root) => root,
Err(err) => {
error_target!(
crate::LOG_TARGET_RELAY,
"Generate last state to light client failed: {:?}",
err
);
return;
}
}
};

let tip_header = packed::VerifiableHeader::new_builder()
.header(boxed.header().data())
.uncles_hash(boxed.calc_uncles_hash())
.extension(Pack::pack(&boxed.extension()))
.parent_chain_root(parent_chain_root)
.build();
let light_client_message = {
let content = packed::SendLastState::new_builder()
.last_header(tip_header)
.build();
packed::LightClientMessage::new_builder()
.set(content)
.build()
};
let light_client_peers: HashSet<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
.filter(|(_id, peer)| peer.if_lightclient_subscribed)
.map(|(id, _)| id)
.collect();
if let Err(err) = p2p_control.filter_broadcast(
TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
SupportProtocols::LightClient.protocol_id(),
light_client_message.as_bytes(),
) {
debug_target!(
crate::LOG_TARGET_RELAY,
"relayer send last state to light client when accept block, error: {:?}",
err,
);
}
}
}
}

Expand Down
78 changes: 46 additions & 32 deletions sync/src/relayer/tests/compact_block_process.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use crate::block_status::BlockStatus;
use crate::relayer::compact_block_process::CompactBlockProcess;
use crate::relayer::tests::helper::{build_chain, new_header_builder, MockProtocolContext};
use crate::relayer::tests::helper::{
build_chain, gen_block, new_header_builder, MockProtocolContext,
};
use crate::{Status, StatusCode};
use ckb_chain::chain::ChainService;
use ckb_network::{PeerIndex, SupportProtocols};
use ckb_store::ChainStore;
use ckb_systemtime::unix_time_as_millis;
use ckb_tx_pool::{PlugTarget, TxEntry};
use ckb_types::core::{BlockView, HeaderView};
use ckb_types::prelude::*;
use ckb_types::{
bytes::Bytes,
Expand Down Expand Up @@ -152,25 +156,36 @@ fn test_unknow_parent() {
#[test]
fn test_accept_not_a_better_block() {
let (relayer, _) = build_chain(5);
let header = {
let tip_header = {
let active_chain = relayer.shared.active_chain();
active_chain.tip_header()
};
let second_to_last_header: HeaderView = {
let tip_header: HeaderView = relayer.shared().store().get_tip_header().unwrap();
let second_to_last_header = relayer
.shared()
.store()
.get_block_header(&tip_header.data().raw().parent_hash())
.unwrap();
second_to_last_header
};

// The timestamp is random, so it may be not a better block.
let not_sure_a_better_header = header
.as_advanced_builder()
.timestamp((header.timestamp() + 1).pack())
.build();

let block = BlockBuilder::default()
.header(not_sure_a_better_header)
.transaction(TransactionBuilder::default().build())
.build();
let uncle_block: BlockView = gen_block(
&second_to_last_header,
relayer.shared().shared(),
1,
1,
None,
);
//uncle_block's block_hash must not equal to tip's block_hash
assert_ne!(uncle_block.header().hash(), tip_header.hash());
// uncle_block's difficulty must less than tip's difficulty
assert!(uncle_block.difficulty().lt(&tip_header.difficulty()));

let mut prefilled_transactions_indexes = HashSet::new();
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);
let compact_block =
CompactBlock::build_from_block(&uncle_block, &prefilled_transactions_indexes);

let mock_protocol_context = MockProtocolContext::new(SupportProtocols::RelayV2);
let nc = Arc::new(mock_protocol_context);
Expand All @@ -182,7 +197,7 @@ fn test_accept_not_a_better_block() {
Arc::<MockProtocolContext>::clone(&nc),
peer_index,
);
assert_eq!(compact_block_process.execute(), Status::ok(),);
assert_eq!(compact_block_process.execute(), Status::ok());
}

#[test]
Expand Down Expand Up @@ -323,18 +338,15 @@ fn test_accept_block() {
active_chain.tip_header()
};

let header = new_header_builder(relayer.shared.shared(), &parent).build();

let uncle = BlockBuilder::default().build();
let ext = packed::BlockExtBuilder::default()
.verified(Some(true).pack())
.build();
let uncle = gen_block(&parent, relayer.shared().shared(), 0, 1, None);

let block = BlockBuilder::default()
.header(header)
.transaction(TransactionBuilder::default().build())
.uncle(uncle.as_uncle())
.build();
let block = gen_block(
&parent,
relayer.shared().shared(),
0,
1,
Some(uncle.as_uncle()),
);

let mock_block_1 = BlockBuilder::default()
.number(4.pack())
Expand Down Expand Up @@ -365,16 +377,18 @@ fn test_accept_block() {
);
}

let uncle_hash = uncle.hash();
{
let db_txn = relayer.shared().shared().store().begin_transaction();
db_txn.insert_block(&uncle).unwrap();
db_txn.attach_block(&uncle).unwrap();
db_txn.insert_block_ext(&uncle_hash, &ext.unpack()).unwrap();
db_txn.commit().unwrap();
let chain_controller = {
let proposal_window = ckb_proposal_table::ProposalTable::new(
relayer.shared().shared().consensus().tx_proposal_window(),
);
let chain_service =
ChainService::new(relayer.shared().shared().to_owned(), proposal_window);
chain_service.start::<&str>(None)
};
chain_controller.process_block(Arc::new(uncle)).unwrap();
}

relayer.shared().shared().refresh_snapshot();
let mut prefilled_transactions_indexes = HashSet::new();
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);
Expand Down
Loading
Loading