From 096dca999dce8ffe52a9dd14272848be3bec59a5 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 15 Oct 2018 17:17:51 +0200 Subject: [PATCH 1/3] fix(light::response) : handle bad responses This PR adds the following: * Registers empty responses as a `bad` * Register bad responses and only takes a decision when the majority of the peers have come to `agreement`. --- ethcore/light/src/net/mod.rs | 7 +- ethcore/light/src/on_demand/mod.rs | 127 +++++++++++++++++++++++---- ethcore/light/src/on_demand/tests.rs | 98 ++++++++++++++++++++- 3 files changed, 215 insertions(+), 17 deletions(-) diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 590be038e21..34d04f28dbb 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -567,7 +567,12 @@ impl LightProtocol { None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind. }; - res.map(|_| IdGuard::new(peers, peer, req_id)) + if let Err(err) = res { + trace!(target: "on_demand", "pre_verify_response_failed: {:?}", err); + Err(err) + } else { + Ok(IdGuard::new(peers, peer, req_id)) + } } /// Handle a packet using the given io context. diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index 5d39e0cc4d0..c8b29961213 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -19,7 +19,7 @@ //! will take the raw data received here and extract meaningful results from it. use std::cmp; -use std::collections::{HashMap, BTreeSet}; +use std::collections::{HashMap, HashSet, BTreeSet}; use std::marker::PhantomData; use std::sync::Arc; @@ -42,6 +42,9 @@ use self::request::CheckedRequest; pub use self::request::{Request, Response, HeaderRef}; +use types::request::ResponseError; +use self::request::Error as ValidityError; + #[cfg(test)] mod tests; @@ -69,6 +72,12 @@ pub mod error { } errors { + #[doc = "Request was faulty"] + FaultyRequest(req_id: super::ReqId, bad_responses: usize, num_providers: usize) { + description("Faulty request found") + display("The request: {} was determined as faulty, {}/{} peer(s) gave bad response", req_id, bad_responses, num_providers) + } + #[doc = "Max number of on-demand query attempts reached without result."] MaxAttemptReach(query_index: usize) { description("On-demand query limit reached") @@ -124,6 +133,10 @@ struct Pending { required_capabilities: Capabilities, responses: Vec, sender: oneshot::Sender, + /// This will collect how many bad responses we get from each peer per request + // When we get `|bad_responses| > peers / 2` then regard the request as `faulty` + // This, can happen for several reasons such as a request for a hash that doesn't exist + bad_responses: HashSet, base_query_index: usize, remaining_query_count: usize, query_id_history: BTreeSet, @@ -173,8 +186,9 @@ impl Pending { // supply a response. fn supply_response(&mut self, cache: &Mutex, response: &basic_request::Response) - -> Result<(), basic_request::ResponseError> - { + -> Result<(), basic_request::ResponseError> { + + println!("{:?}", response); match self.requests.supply_response(&cache, response) { Ok(response) => { let idx = self.responses.len(); @@ -227,7 +241,15 @@ impl Pending { self.required_capabilities = capabilities; } - // returning no reponse, it will result in an error. + fn add_bad_response(&mut self, peer: PeerId) { + self.bad_responses.insert(peer); + } + + fn is_bad_response(&self, total_peers: usize) -> bool { + self.bad_responses.len() > total_peers / 2 + } + + // returning no response, it will result in an error. // self is consumed on purpose. fn no_response(self) { trace!(target: "on_demand", "Dropping a pending query (no reply) at query #{}", self.query_id_history.len()); @@ -237,7 +259,7 @@ impl Pending { } } - // returning a peer discovery timeout during query attempts + // Returning a peer discovery timeout during query attempts fn time_out(self) { trace!(target: "on_demand", "Dropping a pending query (no new peer time out) at query #{}", self.query_id_history.len()); let err = self::error::ErrorKind::TimeoutOnNewPeers(self.requests.num_answered(), self.query_id_history.len()); @@ -245,6 +267,16 @@ impl Pending { debug!(target: "on_demand", "Dropped oneshot channel receiver on time out"); } } + + // The given request is determined as faulty and drop it accordingly + fn set_as_faulty_request(self, total_peers: usize, req_id: ReqId) { + let bad_peers = self.bad_responses.len(); + warn!(target: "on_demand", "The request: {} was determined as faulty, {}/{} peer(s) gave bad response", req_id, bad_peers, total_peers); + let err = self::error::ErrorKind::FaultyRequest(req_id, bad_peers, total_peers); + if self.sender.send(Err(err.into())).is_err() { + debug!(target: "on_demand", "Dropped oneshot channel receiver on time out"); + } + } } // helper to guess capabilities required for a given batch of network requests. @@ -403,6 +435,7 @@ impl OnDemand { required_capabilities: capabilities, responses, sender, + bad_responses: HashSet::new(), base_query_index: 0, remaining_query_count: 0, query_id_history: BTreeSet::new(), @@ -443,12 +476,15 @@ impl OnDemand { // iterate over all pending requests, and check them for hang-up. // then, try and find a peer who can serve it. let peers = self.peers.read(); - *pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter() + *pending = ::std::mem::replace(&mut *pending, Vec::new()) + .into_iter() .filter(|pending| !pending.sender.is_canceled()) .filter_map(|mut pending| { - // the peer we dispatch to is chosen randomly + let num_peers = peers.len(); let history_len = pending.query_id_history.len(); + + // The first peer to dispatch the request is chosen at random let offset = if history_len == 0 { pending.remaining_query_count = self.base_retry_count; let rand = rand::random::(); @@ -457,15 +493,21 @@ impl OnDemand { } else { pending.base_query_index + history_len } % cmp::max(num_peers, 1); + let init_remaining_query_count = pending.remaining_query_count; // to fail in case of big reduction of nb of peers - for (peer_id, peer) in peers.iter().chain(peers.iter()) - .skip(offset).take(num_peers) { + + for (peer_id, peer) in peers + .iter() + .cycle() + .skip(offset) + .take(num_peers) + { // TODO: see which requests can be answered by the cache? if pending.remaining_query_count == 0 { break } - if pending.query_id_history.insert(peer_id.clone()) { + if pending.query_id_history.insert(*peer_id) { if !peer.can_fulfill(&pending.required_capabilities) { trace!(target: "on_demand", "Peer {} without required capabilities, skipping, {} remaining attempts", peer_id, pending.remaining_query_count); @@ -588,12 +630,26 @@ impl Handler for OnDemand { } fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) { + // the req_id was not found in the pending request let mut pending = match self.in_transit.write().remove(&req_id) { Some(req) => req, None => return, }; + // Handle the case if the response is empty if responses.is_empty() { + let total_peers = self.peers.read().len(); + + // Register the response as bad for that `peer` + pending.add_bad_response(ctx.peer()); + + // Majority of responses are bad, drop the request + if pending.is_bad_response(total_peers) { + pending.set_as_faulty_request(total_peers, req_id); + return; + } + + // no remaning queries attempts left on the response if pending.remaining_query_count == 0 { pending.no_response(); return; @@ -608,12 +664,53 @@ impl Handler for OnDemand { // 2. pending.requests.supply_response // 3. if extracted on-demand response, keep it for later. for response in responses { - if let Err(e) = pending.supply_response(&*self.cache, response) { - let peer = ctx.peer(); - debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e); - ctx.disable_peer(peer); + match pending.supply_response(&*self.cache, response) { + Err(ResponseError::Validity(err)) => { + match err { + // This can be a malformed request or bad response but we can't determine which at the moment! + // Thus, register the response as bad and wait for more responses in order to take a decision + ValidityError::BadProof | ValidityError::Empty => { + let peer = ctx.peer(); + trace!(target: "on_demand", "Peer {} gave a potential bad response on req_id: {} - can't determine whether + it was bad request or response yet", peer, req_id); + pending.add_bad_response(peer); + } - break; + // Bad response, punish the peer + ValidityError::Decoder(_) + | ValidityError::HeaderByNumber + | ValidityError::Trie(_) + | ValidityError::TooFewResults(_, _) + | ValidityError::TooManyResults(_, _) + | ValidityError::UnresolvedHeader(_) + | ValidityError::WrongTrieRoot(_, _) + | ValidityError::WrongKind + | ValidityError::WrongNumber(_, _) + | ValidityError::WrongHash(_, _) + | ValidityError::WrongHeaderSequence => { + let peer = ctx.peer(); + pending.add_bad_response(peer); + debug!(target: "on_demand", "Peer {} gave bad response", peer); + ctx.disable_peer(peer); + } + } + } + + Err(ResponseError::Unexpected) => { + let peer = ctx.peer(); + pending.add_bad_response(peer); + debug!(target: "on_demand", "Peer {} gave bad response", peer); + ctx.disable_peer(peer); + } + // `Good` response continue + _ => (), + } + + // Majority of responses are bad, drop the request + let total_peers = self.peers.read().len(); + if pending.is_bad_response(total_peers) { + pending.set_as_faulty_request(total_peers, req_id); + return; } } diff --git a/ethcore/light/src/on_demand/tests.rs b/ethcore/light/src/on_demand/tests.rs index d3cd137ec05..c9cca92705d 100644 --- a/ethcore/light/src/on_demand/tests.rs +++ b/ethcore/light/src/on_demand/tests.rs @@ -18,6 +18,7 @@ use cache::Cache; use ethcore::header::Header; +use ethcore::encoded::Header as EncodedHeader; use futures::Future; use network::{PeerId, NodeId}; use net::*; @@ -28,7 +29,7 @@ use ::request::{self as basic_request, Response}; use std::sync::Arc; -use super::{request, OnDemand, Peer, HeaderRef}; +use super::{request, OnDemand, Peer, HeaderRef, ResponseError, ValidityError}; // useful contexts to give the service. enum Context { @@ -367,6 +368,101 @@ fn part_bad_part_good() { assert!(recv.wait().is_ok()); } +#[test] +fn determine_faulty_response_by_majority() { + let mut harness = Harness::create(); + + // peer[0] - requester ("light") + // peer[1..9] - responders ("providers" + let peers: Vec = (1..=10).map(|id| id).collect(); + + let req_id = ReqId(14426); + + for peer in &peers { + harness.inject_peer( + *peer, + Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + } + ); + } + + // make sure query limit doesn't interfer + harness.service.base_retry_count = usize::max_value(); + + let _recv = harness.service.request_raw( + &Context::RequestFrom(peers[0], req_id), + vec![ + request::HeaderByHash(Header::default().encoded().hash().into()).into(), + ], + ).unwrap(); + + harness.service.dispatch_pending(&Context::RequestFrom(peers[0], req_id)); + + for i in 1..=6 { + harness.service.on_responses( + &Context::RequestFrom(peers[i], req_id), + req_id, + &[] + ); + // Until 6 bad responses have been received the pending will be refilled + if i < 6 { + assert_eq!(harness.service.pending.read().len(), 1, "The request should be pending"); + let pend = harness.service.pending.write().remove(0); + harness.service.in_transit.write().insert(req_id, pend); + } + } + + // the request has been dropped reached `bad_responses` > |peers| / 2` + assert_eq!(harness.service.pending.read().len(), 0); + assert!(harness.service.in_transit.read().get(&req_id).is_none()); +} + +#[test] +fn faulty_by_response_minority_many_times() { + let mut harness = Harness::create(); + + let peers: Vec = (1..=10).map(|id| id).collect(); + + let req_id = ReqId(14426); + + for peer in &peers { + harness.inject_peer( + *peer, + Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + } + ); + } + + // make sure query limit doesn't interfer + harness.service.base_retry_count = usize::max_value(); + + let _recv = harness.service.request_raw( + &Context::RequestFrom(peers[0], req_id), + vec![ + request::HeaderByHash(Header::default().encoded().hash().into()).into(), + ], + ).unwrap(); + + harness.service.dispatch_pending(&Context::RequestFrom(peers[0], req_id)); + + for i in 1..=1000 { + harness.service.on_responses( + &Context::RequestFrom(peers[i % 2], req_id), + req_id, + &[] + ); + assert_eq!(harness.service.pending.read().len(), 1, "The request should be pending"); + let pend = harness.service.pending.write().remove(0); + harness.service.in_transit.write().insert(req_id, pend); + } + + assert!(harness.service.in_transit.read().get(&req_id).is_some(), "The request should be pending"); +} + #[test] fn wrong_kind() { let harness = Harness::create(); From 3e50f39ea50d67538d599d656d130006db93a03d Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 15 Oct 2018 18:52:31 +0200 Subject: [PATCH 2/3] fix(test) - make sure request don't dropped --- ethcore/light/src/on_demand/tests.rs | 32 ++++++++++++++++++---------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/ethcore/light/src/on_demand/tests.rs b/ethcore/light/src/on_demand/tests.rs index c9cca92705d..f7aac2eddf7 100644 --- a/ethcore/light/src/on_demand/tests.rs +++ b/ethcore/light/src/on_demand/tests.rs @@ -18,7 +18,6 @@ use cache::Cache; use ethcore::header::Header; -use ethcore::encoded::Header as EncodedHeader; use futures::Future; use network::{PeerId, NodeId}; use net::*; @@ -29,7 +28,7 @@ use ::request::{self as basic_request, Response}; use std::sync::Arc; -use super::{request, OnDemand, Peer, HeaderRef, ResponseError, ValidityError}; +use super::{request, OnDemand, Peer, HeaderRef}; // useful contexts to give the service. enum Context { @@ -307,10 +306,16 @@ fn part_bad_part_good() { let peer_id = 111; let req_ids = (ReqId(14426), ReqId(555)); - harness.inject_peer(peer_id, Peer { - status: dummy_status(), - capabilities: dummy_capabilities(), - }); + // Enough peers so that the requests don't get dropped + for peer in peer_id..=peer_id+5 { + harness.inject_peer( + peer, + Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + } + ); + } let make = |num| { let mut hdr = Header::default(); @@ -467,13 +472,18 @@ fn faulty_by_response_minority_many_times() { fn wrong_kind() { let harness = Harness::create(); - let peer_id = 10101; + let peer_id = 3; let req_id = ReqId(14426); - harness.inject_peer(peer_id, Peer { - status: dummy_status(), - capabilities: dummy_capabilities(), - }); + for peer_id in 0..10 { + harness.inject_peer( + peer_id, + Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + } + ); + } let _recv = harness.service.request_raw( &Context::NoOp, From 56779ff34c0403086b090678de3fdd9f12a46497 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 16 Oct 2018 14:12:42 +0200 Subject: [PATCH 3/3] fix(nits) --- ethcore/light/src/net/mod.rs | 2 +- ethcore/light/src/on_demand/mod.rs | 8 +++----- ethcore/light/src/on_demand/tests.rs | 6 +++--- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 34d04f28dbb..c2c5083c9e5 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -248,7 +248,7 @@ pub trait Handler: Send + Sync { fn tick(&self, _ctx: &BasicContext) { } /// Called on abort. This signals to handlers that they should clean up /// and ignore peers. - // TODO: coreresponding `on_activate`? + // TODO: corresponding `on_activate`? fn on_abort(&self) { } } diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index c8b29961213..8c9fb9fa7f8 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -84,7 +84,7 @@ pub mod error { display("On-demand query limit reached on query #{}", query_index) } - #[doc = "No reply with current peer set, time out occured while waiting for new peers for additional query attempt."] + #[doc = "No reply with current peer set, time out occurred while waiting for new peers for additional query attempt."] TimeoutOnNewPeers(query_index: usize, remaining_attempts: usize) { description("Timeout for On-demand query") display("Timeout for On-demand query; {} query attempts remain for query #{}", remaining_attempts, query_index) @@ -187,8 +187,6 @@ impl Pending { // supply a response. fn supply_response(&mut self, cache: &Mutex, response: &basic_request::Response) -> Result<(), basic_request::ResponseError> { - - println!("{:?}", response); match self.requests.supply_response(&cache, response) { Ok(response) => { let idx = self.responses.len(); @@ -271,7 +269,7 @@ impl Pending { // The given request is determined as faulty and drop it accordingly fn set_as_faulty_request(self, total_peers: usize, req_id: ReqId) { let bad_peers = self.bad_responses.len(); - warn!(target: "on_demand", "The request: {} was determined as faulty, {}/{} peer(s) gave bad response", req_id, bad_peers, total_peers); + trace!(target: "on_demand", "The request: {} was determined as faulty, {}/{} peer(s) gave bad response", req_id, bad_peers, total_peers); let err = self::error::ErrorKind::FaultyRequest(req_id, bad_peers, total_peers); if self.sender.send(Err(err.into())).is_err() { debug!(target: "on_demand", "Dropped oneshot channel receiver on time out"); @@ -649,7 +647,7 @@ impl Handler for OnDemand { return; } - // no remaning queries attempts left on the response + // no remaining queries attempts left on the response if pending.remaining_query_count == 0 { pending.no_response(); return; diff --git a/ethcore/light/src/on_demand/tests.rs b/ethcore/light/src/on_demand/tests.rs index f7aac2eddf7..accf56dd8a5 100644 --- a/ethcore/light/src/on_demand/tests.rs +++ b/ethcore/light/src/on_demand/tests.rs @@ -374,11 +374,11 @@ fn part_bad_part_good() { } #[test] -fn determine_faulty_response_by_majority() { +fn determine_response_as_faulty_by_majority() { let mut harness = Harness::create(); // peer[0] - requester ("light") - // peer[1..9] - responders ("providers" + // peer[1..9] - responders ("providers") let peers: Vec = (1..=10).map(|id| id).collect(); let req_id = ReqId(14426); @@ -425,7 +425,7 @@ fn determine_faulty_response_by_majority() { } #[test] -fn faulty_by_response_minority_many_times() { +fn response_by_minority_many_times() { let mut harness = Harness::create(); let peers: Vec = (1..=10).map(|id| id).collect();