Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ keccak-hasher = { path = "../util/keccak-hasher" }
kvdb = "0.1"
kvdb-memorydb = "0.1"
kvdb-rocksdb = { version = "0.1.3", optional = true }
lazy_static = "1.0"
lazy_static = "1.2.0"
len-caching-lock = { path = "../util/len-caching-lock" }
log = "0.4"
lru-cache = "0.1"
Expand Down
153 changes: 84 additions & 69 deletions ethcore/light/src/on_demand/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,24 @@ pub mod error {
}
}

/// Public interface for performing network requests `OnDemand`
pub trait OnDemandRequester: Send + Sync {
/// Submit a strongly-typed batch of requests.
///
/// Fails if back-reference are not coherent.
fn request<T>(&self, ctx: &BasicContext, requests: T) -> Result<OnResponses<T>, basic_request::NoSuchOutput>
where
T: request::RequestAdapter;

/// Submit a vector of requests to be processed together.
///
/// Fails if back-references are not coherent.
/// The returned vector of responses will correspond to the requests exactly.
fn request_raw(&self, ctx: &BasicContext, requests: Vec<Request>)
-> Result<Receiver<PendingResponse>, basic_request::NoSuchOutput>;
}


// relevant peer info.
#[derive(Debug, Clone, PartialEq, Eq)]
struct Peer {
Expand Down Expand Up @@ -355,71 +373,8 @@ pub struct OnDemand {
request_number_of_consecutive_errors: usize
}

impl OnDemand {

/// Create a new `OnDemand` service with the given cache.
pub fn new(
cache: Arc<Mutex<Cache>>,
response_time_window: Duration,
request_backoff_start: Duration,
request_backoff_max: Duration,
request_backoff_rounds_max: usize,
request_number_of_consecutive_errors: usize,
) -> Self {

Self {
pending: RwLock::new(Vec::new()),
peers: RwLock::new(HashMap::new()),
in_transit: RwLock::new(HashMap::new()),
cache,
no_immediate_dispatch: false,
response_time_window: Self::sanitize_circuit_breaker_input(response_time_window, "Response time window"),
request_backoff_start: Self::sanitize_circuit_breaker_input(request_backoff_start, "Request initial backoff time window"),
request_backoff_max: Self::sanitize_circuit_breaker_input(request_backoff_max, "Request maximum backoff time window"),
request_backoff_rounds_max,
request_number_of_consecutive_errors,
}
}

fn sanitize_circuit_breaker_input(dur: Duration, name: &'static str) -> Duration {
if dur.as_secs() < 1 {
warn!(target: "on_demand",
"{} is too short must be at least 1 second, configuring it to 1 second", name);
Duration::from_secs(1)
} else {
dur
}
}

// make a test version: this doesn't dispatch pending requests
// until you trigger it manually.
#[cfg(test)]
fn new_test(
cache: Arc<Mutex<Cache>>,
request_ttl: Duration,
request_backoff_start: Duration,
request_backoff_max: Duration,
request_backoff_rounds_max: usize,
request_number_of_consecutive_errors: usize,
) -> Self {
let mut me = OnDemand::new(
cache,
request_ttl,
request_backoff_start,
request_backoff_max,
request_backoff_rounds_max,
request_number_of_consecutive_errors,
);
me.no_immediate_dispatch = true;

me
}

/// Submit a vector of requests to be processed together.
///
/// Fails if back-references are not coherent.
/// The returned vector of responses will correspond to the requests exactly.
pub fn request_raw(&self, ctx: &BasicContext, requests: Vec<Request>)
impl OnDemandRequester for OnDemand {
fn request_raw(&self, ctx: &BasicContext, requests: Vec<Request>)
-> Result<Receiver<PendingResponse>, basic_request::NoSuchOutput>
{
let (sender, receiver) = oneshot::channel();
Expand Down Expand Up @@ -475,10 +430,7 @@ impl OnDemand {
Ok(receiver)
}

/// Submit a strongly-typed batch of requests.
///
/// Fails if back-reference are not coherent.
pub fn request<T>(&self, ctx: &BasicContext, requests: T) -> Result<OnResponses<T>, basic_request::NoSuchOutput>
fn request<T>(&self, ctx: &BasicContext, requests: T) -> Result<OnResponses<T>, basic_request::NoSuchOutput>
where T: request::RequestAdapter
{
self.request_raw(ctx, requests.make_requests()).map(|recv| OnResponses {
Expand All @@ -487,6 +439,69 @@ impl OnDemand {
})
}

}

impl OnDemand {

/// Create a new `OnDemand` service with the given cache.
pub fn new(
cache: Arc<Mutex<Cache>>,
response_time_window: Duration,
request_backoff_start: Duration,
request_backoff_max: Duration,
request_backoff_rounds_max: usize,
request_number_of_consecutive_errors: usize,
) -> Self {

Self {
pending: RwLock::new(Vec::new()),
peers: RwLock::new(HashMap::new()),
in_transit: RwLock::new(HashMap::new()),
cache,
no_immediate_dispatch: false,
response_time_window: Self::sanitize_circuit_breaker_input(response_time_window, "Response time window"),
request_backoff_start: Self::sanitize_circuit_breaker_input(request_backoff_start, "Request initial backoff time window"),
request_backoff_max: Self::sanitize_circuit_breaker_input(request_backoff_max, "Request maximum backoff time window"),
request_backoff_rounds_max,
request_number_of_consecutive_errors,
}
}

fn sanitize_circuit_breaker_input(dur: Duration, name: &'static str) -> Duration {
if dur.as_secs() < 1 {
warn!(target: "on_demand",
"{} is too short must be at least 1 second, configuring it to 1 second", name);
Duration::from_secs(1)
} else {
dur
}
}

// make a test version: this doesn't dispatch pending requests
// until you trigger it manually.
#[cfg(test)]
fn new_test(
cache: Arc<Mutex<Cache>>,
request_ttl: Duration,
request_backoff_start: Duration,
request_backoff_max: Duration,
request_backoff_rounds_max: usize,
request_number_of_consecutive_errors: usize,
) -> Self {
let mut me = OnDemand::new(
cache,
request_ttl,
request_backoff_start,
request_backoff_max,
request_backoff_rounds_max,
request_number_of_consecutive_errors,
);
me.no_immediate_dispatch = true;

me
}


// maybe dispatch pending requests.
// sometimes
fn attempt_dispatch(&self, ctx: &BasicContext) {
Expand Down
2 changes: 1 addition & 1 deletion ethcore/light/src/on_demand/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

use super::{request, OnDemand, Peer, HeaderRef};
use super::{request, OnDemand, OnDemandRequester, Peer, HeaderRef};

// useful contexts to give the service.
enum Context {
Expand Down
Loading