diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 590be038e21..c2c5083c9e5 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -248,7 +248,7 @@ pub trait Handler: Send + Sync { fn tick(&self, _ctx: &BasicContext) { } /// Called on abort. This signals to handlers that they should clean up /// and ignore peers. - // TODO: coreresponding `on_activate`? + // TODO: corresponding `on_activate`? fn on_abort(&self) { } } @@ -567,7 +567,12 @@ impl LightProtocol { None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind. }; - res.map(|_| IdGuard::new(peers, peer, req_id)) + if let Err(err) = res { + trace!(target: "on_demand", "pre_verify_response_failed: {:?}", err); + Err(err) + } else { + Ok(IdGuard::new(peers, peer, req_id)) + } } /// Handle a packet using the given io context. diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index 5d39e0cc4d0..8c9fb9fa7f8 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -19,7 +19,7 @@ //! will take the raw data received here and extract meaningful results from it. use std::cmp; -use std::collections::{HashMap, BTreeSet}; +use std::collections::{HashMap, HashSet, BTreeSet}; use std::marker::PhantomData; use std::sync::Arc; @@ -42,6 +42,9 @@ use self::request::CheckedRequest; pub use self::request::{Request, Response, HeaderRef}; +use types::request::ResponseError; +use self::request::Error as ValidityError; + #[cfg(test)] mod tests; @@ -69,13 +72,19 @@ pub mod error { } errors { + #[doc = "Request was faulty"] + FaultyRequest(req_id: super::ReqId, bad_responses: usize, num_providers: usize) { + description("Faulty request found") + display("The request: {} was determined as faulty, {}/{} peer(s) gave bad response", req_id, bad_responses, num_providers) + } + #[doc = "Max number of on-demand query attempts reached without result."] MaxAttemptReach(query_index: usize) { description("On-demand query limit reached") display("On-demand query limit reached on query #{}", query_index) } - #[doc = "No reply with current peer set, time out occured while waiting for new peers for additional query attempt."] + #[doc = "No reply with current peer set, time out occurred while waiting for new peers for additional query attempt."] TimeoutOnNewPeers(query_index: usize, remaining_attempts: usize) { description("Timeout for On-demand query") display("Timeout for On-demand query; {} query attempts remain for query #{}", remaining_attempts, query_index) @@ -124,6 +133,10 @@ struct Pending { required_capabilities: Capabilities, responses: Vec, sender: oneshot::Sender, + /// This will collect how many bad responses we get from each peer per request + // When we get `|bad_responses| > peers / 2` then regard the request as `faulty` + // This, can happen for several reasons such as a request for a hash that doesn't exist + bad_responses: HashSet, base_query_index: usize, remaining_query_count: usize, query_id_history: BTreeSet, @@ -173,8 +186,7 @@ impl Pending { // supply a response. fn supply_response(&mut self, cache: &Mutex, response: &basic_request::Response) - -> Result<(), basic_request::ResponseError> - { + -> Result<(), basic_request::ResponseError> { match self.requests.supply_response(&cache, response) { Ok(response) => { let idx = self.responses.len(); @@ -227,7 +239,15 @@ impl Pending { self.required_capabilities = capabilities; } - // returning no reponse, it will result in an error. + fn add_bad_response(&mut self, peer: PeerId) { + self.bad_responses.insert(peer); + } + + fn is_bad_response(&self, total_peers: usize) -> bool { + self.bad_responses.len() > total_peers / 2 + } + + // returning no response, it will result in an error. // self is consumed on purpose. fn no_response(self) { trace!(target: "on_demand", "Dropping a pending query (no reply) at query #{}", self.query_id_history.len()); @@ -237,7 +257,7 @@ impl Pending { } } - // returning a peer discovery timeout during query attempts + // Returning a peer discovery timeout during query attempts fn time_out(self) { trace!(target: "on_demand", "Dropping a pending query (no new peer time out) at query #{}", self.query_id_history.len()); let err = self::error::ErrorKind::TimeoutOnNewPeers(self.requests.num_answered(), self.query_id_history.len()); @@ -245,6 +265,16 @@ impl Pending { debug!(target: "on_demand", "Dropped oneshot channel receiver on time out"); } } + + // The given request is determined as faulty and drop it accordingly + fn set_as_faulty_request(self, total_peers: usize, req_id: ReqId) { + let bad_peers = self.bad_responses.len(); + trace!(target: "on_demand", "The request: {} was determined as faulty, {}/{} peer(s) gave bad response", req_id, bad_peers, total_peers); + let err = self::error::ErrorKind::FaultyRequest(req_id, bad_peers, total_peers); + if self.sender.send(Err(err.into())).is_err() { + debug!(target: "on_demand", "Dropped oneshot channel receiver on time out"); + } + } } // helper to guess capabilities required for a given batch of network requests. @@ -403,6 +433,7 @@ impl OnDemand { required_capabilities: capabilities, responses, sender, + bad_responses: HashSet::new(), base_query_index: 0, remaining_query_count: 0, query_id_history: BTreeSet::new(), @@ -443,12 +474,15 @@ impl OnDemand { // iterate over all pending requests, and check them for hang-up. // then, try and find a peer who can serve it. let peers = self.peers.read(); - *pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter() + *pending = ::std::mem::replace(&mut *pending, Vec::new()) + .into_iter() .filter(|pending| !pending.sender.is_canceled()) .filter_map(|mut pending| { - // the peer we dispatch to is chosen randomly + let num_peers = peers.len(); let history_len = pending.query_id_history.len(); + + // The first peer to dispatch the request is chosen at random let offset = if history_len == 0 { pending.remaining_query_count = self.base_retry_count; let rand = rand::random::(); @@ -457,15 +491,21 @@ impl OnDemand { } else { pending.base_query_index + history_len } % cmp::max(num_peers, 1); + let init_remaining_query_count = pending.remaining_query_count; // to fail in case of big reduction of nb of peers - for (peer_id, peer) in peers.iter().chain(peers.iter()) - .skip(offset).take(num_peers) { + + for (peer_id, peer) in peers + .iter() + .cycle() + .skip(offset) + .take(num_peers) + { // TODO: see which requests can be answered by the cache? if pending.remaining_query_count == 0 { break } - if pending.query_id_history.insert(peer_id.clone()) { + if pending.query_id_history.insert(*peer_id) { if !peer.can_fulfill(&pending.required_capabilities) { trace!(target: "on_demand", "Peer {} without required capabilities, skipping, {} remaining attempts", peer_id, pending.remaining_query_count); @@ -588,12 +628,26 @@ impl Handler for OnDemand { } fn on_responses(&self, ctx: &EventContext, req_id: ReqId, responses: &[basic_request::Response]) { + // the req_id was not found in the pending request let mut pending = match self.in_transit.write().remove(&req_id) { Some(req) => req, None => return, }; + // Handle the case if the response is empty if responses.is_empty() { + let total_peers = self.peers.read().len(); + + // Register the response as bad for that `peer` + pending.add_bad_response(ctx.peer()); + + // Majority of responses are bad, drop the request + if pending.is_bad_response(total_peers) { + pending.set_as_faulty_request(total_peers, req_id); + return; + } + + // no remaining queries attempts left on the response if pending.remaining_query_count == 0 { pending.no_response(); return; @@ -608,12 +662,53 @@ impl Handler for OnDemand { // 2. pending.requests.supply_response // 3. if extracted on-demand response, keep it for later. for response in responses { - if let Err(e) = pending.supply_response(&*self.cache, response) { - let peer = ctx.peer(); - debug!(target: "on_demand", "Peer {} gave bad response: {:?}", peer, e); - ctx.disable_peer(peer); + match pending.supply_response(&*self.cache, response) { + Err(ResponseError::Validity(err)) => { + match err { + // This can be a malformed request or bad response but we can't determine which at the moment! + // Thus, register the response as bad and wait for more responses in order to take a decision + ValidityError::BadProof | ValidityError::Empty => { + let peer = ctx.peer(); + trace!(target: "on_demand", "Peer {} gave a potential bad response on req_id: {} - can't determine whether + it was bad request or response yet", peer, req_id); + pending.add_bad_response(peer); + } - break; + // Bad response, punish the peer + ValidityError::Decoder(_) + | ValidityError::HeaderByNumber + | ValidityError::Trie(_) + | ValidityError::TooFewResults(_, _) + | ValidityError::TooManyResults(_, _) + | ValidityError::UnresolvedHeader(_) + | ValidityError::WrongTrieRoot(_, _) + | ValidityError::WrongKind + | ValidityError::WrongNumber(_, _) + | ValidityError::WrongHash(_, _) + | ValidityError::WrongHeaderSequence => { + let peer = ctx.peer(); + pending.add_bad_response(peer); + debug!(target: "on_demand", "Peer {} gave bad response", peer); + ctx.disable_peer(peer); + } + } + } + + Err(ResponseError::Unexpected) => { + let peer = ctx.peer(); + pending.add_bad_response(peer); + debug!(target: "on_demand", "Peer {} gave bad response", peer); + ctx.disable_peer(peer); + } + // `Good` response continue + _ => (), + } + + // Majority of responses are bad, drop the request + let total_peers = self.peers.read().len(); + if pending.is_bad_response(total_peers) { + pending.set_as_faulty_request(total_peers, req_id); + return; } } diff --git a/ethcore/light/src/on_demand/tests.rs b/ethcore/light/src/on_demand/tests.rs index d3cd137ec05..accf56dd8a5 100644 --- a/ethcore/light/src/on_demand/tests.rs +++ b/ethcore/light/src/on_demand/tests.rs @@ -306,10 +306,16 @@ fn part_bad_part_good() { let peer_id = 111; let req_ids = (ReqId(14426), ReqId(555)); - harness.inject_peer(peer_id, Peer { - status: dummy_status(), - capabilities: dummy_capabilities(), - }); + // Enough peers so that the requests don't get dropped + for peer in peer_id..=peer_id+5 { + harness.inject_peer( + peer, + Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + } + ); + } let make = |num| { let mut hdr = Header::default(); @@ -367,17 +373,117 @@ fn part_bad_part_good() { assert!(recv.wait().is_ok()); } +#[test] +fn determine_response_as_faulty_by_majority() { + let mut harness = Harness::create(); + + // peer[0] - requester ("light") + // peer[1..9] - responders ("providers") + let peers: Vec = (1..=10).map(|id| id).collect(); + + let req_id = ReqId(14426); + + for peer in &peers { + harness.inject_peer( + *peer, + Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + } + ); + } + + // make sure query limit doesn't interfer + harness.service.base_retry_count = usize::max_value(); + + let _recv = harness.service.request_raw( + &Context::RequestFrom(peers[0], req_id), + vec![ + request::HeaderByHash(Header::default().encoded().hash().into()).into(), + ], + ).unwrap(); + + harness.service.dispatch_pending(&Context::RequestFrom(peers[0], req_id)); + + for i in 1..=6 { + harness.service.on_responses( + &Context::RequestFrom(peers[i], req_id), + req_id, + &[] + ); + // Until 6 bad responses have been received the pending will be refilled + if i < 6 { + assert_eq!(harness.service.pending.read().len(), 1, "The request should be pending"); + let pend = harness.service.pending.write().remove(0); + harness.service.in_transit.write().insert(req_id, pend); + } + } + + // the request has been dropped reached `bad_responses` > |peers| / 2` + assert_eq!(harness.service.pending.read().len(), 0); + assert!(harness.service.in_transit.read().get(&req_id).is_none()); +} + +#[test] +fn response_by_minority_many_times() { + let mut harness = Harness::create(); + + let peers: Vec = (1..=10).map(|id| id).collect(); + + let req_id = ReqId(14426); + + for peer in &peers { + harness.inject_peer( + *peer, + Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + } + ); + } + + // make sure query limit doesn't interfer + harness.service.base_retry_count = usize::max_value(); + + let _recv = harness.service.request_raw( + &Context::RequestFrom(peers[0], req_id), + vec![ + request::HeaderByHash(Header::default().encoded().hash().into()).into(), + ], + ).unwrap(); + + harness.service.dispatch_pending(&Context::RequestFrom(peers[0], req_id)); + + for i in 1..=1000 { + harness.service.on_responses( + &Context::RequestFrom(peers[i % 2], req_id), + req_id, + &[] + ); + assert_eq!(harness.service.pending.read().len(), 1, "The request should be pending"); + let pend = harness.service.pending.write().remove(0); + harness.service.in_transit.write().insert(req_id, pend); + } + + assert!(harness.service.in_transit.read().get(&req_id).is_some(), "The request should be pending"); +} + #[test] fn wrong_kind() { let harness = Harness::create(); - let peer_id = 10101; + let peer_id = 3; let req_id = ReqId(14426); - harness.inject_peer(peer_id, Peer { - status: dummy_status(), - capabilities: dummy_capabilities(), - }); + for peer_id in 0..10 { + harness.inject_peer( + peer_id, + Peer { + status: dummy_status(), + capabilities: dummy_capabilities(), + } + ); + } let _recv = harness.service.request_raw( &Context::NoOp,