Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node incorrectly suppresses header broadcast for full blocks (after failing to hydrate a compact block) #3089

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = "2018"
bitflags = "1"
bytes = "0.4"
enum_primitive = "0.1"
lru-cache = "0.1"
net2 = "0.2"
num = "0.1"
rand = "0.6"
Expand Down
59 changes: 26 additions & 33 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use lru_cache::LruCache;

use crate::chain;
use crate::conn;
use crate::core::core::hash::{Hash, Hashed};
Expand Down Expand Up @@ -364,10 +366,11 @@ impl Peer {
self.send(&h, msg::Type::GetTransaction)
}

/// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
/// Sends a request for a specific block by hash.
/// Takes opts so we can track if this request was due to our node syncing or otherwise.
pub fn send_block_request(&self, h: Hash, opts: chain::Options) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h);
self.tracking_adapter.push_req(h, opts);
self.send(&h, msg::Type::GetBlock)
}

Expand Down Expand Up @@ -429,51 +432,35 @@ impl Peer {
#[derive(Clone)]
struct TrackingAdapter {
adapter: Arc<dyn NetAdapter>,
known: Arc<RwLock<Vec<Hash>>>,
requested: Arc<RwLock<Vec<Hash>>>,
received: Arc<RwLock<LruCache<Hash, ()>>>,
requested: Arc<RwLock<LruCache<Hash, chain::Options>>>,
}

impl TrackingAdapter {
fn new(adapter: Arc<dyn NetAdapter>) -> TrackingAdapter {
TrackingAdapter {
adapter: adapter,
known: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))),
requested: Arc::new(RwLock::new(Vec::with_capacity(MAX_TRACK_SIZE))),
received: Arc::new(RwLock::new(LruCache::new(MAX_TRACK_SIZE))),
requested: Arc::new(RwLock::new(LruCache::new(MAX_TRACK_SIZE))),
}
}

fn has_recv(&self, hash: Hash) -> bool {
let known = self.known.read();
// may become too slow, an ordered set (by timestamp for eviction) may
// end up being a better choice
known.contains(&hash)
self.received.write().contains_key(&hash)
}

fn push_recv(&self, hash: Hash) {
let mut known = self.known.write();
if known.len() > MAX_TRACK_SIZE {
known.truncate(MAX_TRACK_SIZE);
}
if !known.contains(&hash) {
known.insert(0, hash);
}
self.received.write().insert(hash, ());
}

fn has_req(&self, hash: Hash) -> bool {
let requested = self.requested.read();
// may become too slow, an ordered set (by timestamp for eviction) may
// end up being a better choice
requested.contains(&hash)
/// Track a block or transaction hash requested by us.
/// Track the opts alongside the hash so we know if this was due to us syncing or not.
fn push_req(&self, hash: Hash, opts: chain::Options) {
self.requested.write().insert(hash, opts);
}

fn push_req(&self, hash: Hash) {
let mut requested = self.requested.write();
if requested.len() > MAX_TRACK_SIZE {
requested.truncate(MAX_TRACK_SIZE);
}
if !requested.contains(&hash) {
requested.insert(0, hash);
}
fn req_opts(&self, hash: Hash) -> Option<chain::Options> {
self.requested.write().get_mut(&hash).cloned()
}
}

Expand Down Expand Up @@ -518,11 +505,17 @@ impl ChainAdapter for TrackingAdapter {
&self,
b: core::Block,
peer_info: &PeerInfo,
_was_requested: bool,
opts: chain::Options,
) -> Result<bool, chain::Error> {
let bh = b.hash();
self.push_recv(bh);
self.adapter.block_received(b, peer_info, self.has_req(bh))

// If we are currently tracking a request for this block then
// use the opts specified when we made the request.
// If we requested this block as part of sync then we want to
// let our adapter know this when we receive it.
let req_opts = self.req_opts(bh).unwrap_or(opts);
self.adapter.block_received(b, peer_info, req_opts)
}

fn compact_block_received(
Expand Down
4 changes: 2 additions & 2 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,10 @@ impl ChainAdapter for Peers {
&self,
b: core::Block,
peer_info: &PeerInfo,
was_requested: bool,
opts: chain::Options,
) -> Result<bool, chain::Error> {
let hash = b.hash();
if !self.adapter.block_received(b, peer_info, was_requested)? {
if !self.adapter.block_received(b, peer_info, opts)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
Expand Down
9 changes: 6 additions & 3 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::chain;
use crate::conn::{Message, MessageHandler, Tracker};
use crate::core::core::{self, hash::Hash, hash::Hashed, CompactBlock};

Expand Down Expand Up @@ -162,9 +163,11 @@ impl MessageHandler for Protocol {
);
let b: core::Block = msg.body()?;

// we can't know at this level whether we requested the block or not,
// the boolean should be properly set in higher level adapter
adapter.block_received(b, &self.peer_info, false)?;
// We default to NONE opts here as we do not know know yet why this block was
// received.
// If we requested this block from a peer due to our node syncing then
// the peer adapter will override opts to reflect this.
adapter.block_received(b, &self.peer_info, chain::Options::NONE)?;
Ok(None)
}

Expand Down
7 changes: 6 additions & 1 deletion p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,12 @@ impl ChainAdapter for DummyAdapter {
) -> Result<bool, chain::Error> {
Ok(true)
}
fn block_received(&self, _: core::Block, _: &PeerInfo, _: bool) -> Result<bool, chain::Error> {
fn block_received(
&self,
_: core::Block,
_: &PeerInfo,
_: chain::Options,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn headers_received(
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ pub trait ChainAdapter: Sync + Send {
&self,
b: core::Block,
peer_info: &PeerInfo,
was_requested: bool,
opts: chain::Options,
) -> Result<bool, chain::Error>;

fn compact_block_received(
Expand Down
47 changes: 15 additions & 32 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
&self,
b: core::Block,
peer_info: &PeerInfo,
was_requested: bool,
opts: chain::Options,
) -> Result<bool, chain::Error> {
if self.chain().block_exists(b.hash())? {
return Ok(true);
Expand All @@ -132,7 +132,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
b.outputs().len(),
b.kernels().len(),
);
self.process_block(b, peer_info, was_requested)
self.process_block(b, peer_info, opts)
}

fn compact_block_received(
Expand Down Expand Up @@ -165,7 +165,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
hook.on_block_received(&block, &peer_info.addr);
}
}
self.process_block(block, peer_info, false)
self.process_block(block, peer_info, chain::Options::NONE)
}
Err(e) => {
debug!("Invalid hydrated block {}: {:?}", cb_hash, e);
Expand All @@ -176,7 +176,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// check at least the header is valid before hydrating
if let Err(e) = self
.chain()
.process_block_header(&cb.header, self.chain_opts(false))
.process_block_header(&cb.header, chain::Options::NONE)
{
debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind());
return Ok(!e.is_bad_data());
Expand Down Expand Up @@ -220,11 +220,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
.is_ok()
{
debug!("successfully hydrated block from tx pool!");
self.process_block(block, peer_info, false)
self.process_block(block, peer_info, chain::Options::NONE)
} else {
if self.sync_state.status() == SyncStatus::NoSync {
debug!("adapter: block invalid after hydration, requesting full block");
self.request_block(&cb.header, peer_info);
self.request_block(&cb.header, peer_info, chain::Options::NONE);
Ok(true)
} else {
debug!("block invalid after hydration, ignoring it, cause still syncing");
Expand Down Expand Up @@ -255,9 +255,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {

// pushing the new block header through the header chain pipeline
// we will go ask for the block if this is a new header
let res = self
.chain()
.process_block_header(&bh, self.chain_opts(false));
let res = self.chain().process_block_header(&bh, chain::Options::NONE);

if let Err(e) = res {
debug!(
Expand Down Expand Up @@ -298,7 +296,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}

// try to add headers to our header chain
match self.chain().sync_block_headers(bhs, self.chain_opts(true)) {
match self.chain().sync_block_headers(bhs, chain::Options::SYNC) {
Ok(_) => Ok(true),
Err(e) => {
debug!("Block headers refused by chain: {:?}", e);
Expand Down Expand Up @@ -533,7 +531,7 @@ impl NetToChainAdapter {
&self,
b: core::Block,
peer_info: &PeerInfo,
was_requested: bool,
opts: chain::Options,
) -> Result<bool, chain::Error> {
// We cannot process blocks earlier than the horizon so check for this here.
{
Expand All @@ -549,10 +547,7 @@ impl NetToChainAdapter {
let bhash = b.hash();
let previous = self.chain().get_previous_header(&b.header);

match self
.chain()
.process_block(b, self.chain_opts(was_requested))
{
match self.chain().process_block(b, opts) {
Ok(_) => {
self.validate_chain(bhash);
self.check_compact();
Expand All @@ -571,7 +566,7 @@ impl NetToChainAdapter {
&& !self.sync_state.is_syncing()
{
debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash());
self.request_block_by_hash(previous.hash(), peer_info)
self.request_block(&previous, peer_info, chain::Options::NONE)
}
}
Ok(true)
Expand Down Expand Up @@ -646,12 +641,10 @@ impl NetToChainAdapter {
// it into a full block then fallback to requesting the full block
// from the same peer that gave us the compact block
// consider additional peers for redundancy?
fn request_block(&self, bh: &BlockHeader, peer_info: &PeerInfo) {
self.request_block_by_hash(bh.hash(), peer_info)
}

fn request_block_by_hash(&self, h: Hash, peer_info: &PeerInfo) {
self.send_block_request_to_peer(h, peer_info, |peer, h| peer.send_block_request(h))
fn request_block(&self, bh: &BlockHeader, peer_info: &PeerInfo, opts: Options) {
self.send_block_request_to_peer(bh.hash(), peer_info, |peer, h| {
peer.send_block_request(h, opts)
})
}

// After we have received a block header in "header first" propagation
Expand Down Expand Up @@ -703,16 +696,6 @@ impl NetToChainAdapter {
),
}
}

/// Prepare options for the chain pipeline
fn chain_opts(&self, was_requested: bool) -> chain::Options {
let opts = if was_requested {
chain::Options::SYNC
} else {
chain::Options::NONE
};
opts
}
}

/// Implementation of the ChainAdapter for the network. Gets notified when the
Expand Down
2 changes: 1 addition & 1 deletion servers/src/grin/sync/body_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl BodySync {
let mut peers_iter = peers.iter().cycle();
for hash in hashes_to_get.clone() {
if let Some(peer) = peers_iter.next() {
if let Err(e) = peer.send_block_request(*hash) {
if let Err(e) = peer.send_block_request(*hash, chain::Options::SYNC) {
debug!("Skipped request to {}: {:?}", peer.info.addr, e);
peer.stop();
} else {
Expand Down