Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Paraswap rate limiting #168

Merged
merged 13 commits into from
Apr 29, 2022
Merged
1 change: 1 addition & 0 deletions crates/orderbook/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ async fn main() {
Arc::new(DefaultParaswapApi {
client: client.clone(),
partner: args.shared.paraswap_partner.clone().unwrap_or_default(),
rate_limiter: args.shared.paraswap_rate_limiter.clone().map(Into::into),
}),
token_info_fetcher.clone(),
args.shared.disabled_paraswap_dexs.clone(),
Expand Down
39 changes: 38 additions & 1 deletion crates/shared/src/arguments.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Contains command line arguments and related helpers that are shared between the binaries.
use crate::{
gas_price_estimation::GasEstimatorType,
http_client::RateLimitingStrategy,
sources::{balancer_v2::BalancerFactoryKind, BaselineSource},
};
use anyhow::{ensure, Result};
use anyhow::{ensure, Context, Result};
use ethcontract::{H160, U256};
use std::{
num::{NonZeroU64, ParseFloatError},
Expand Down Expand Up @@ -101,6 +102,15 @@ pub struct Arguments {
#[clap(long, env, default_value = "ParaSwapPool4", use_value_delimiter = true)]
pub disabled_paraswap_dexs: Vec<String>,

/// Configures the back off strategy for the paraswap API when our requests get rate limited.
/// Requests issued while back off is active get dropped entirely.
/// Needs to be passed as "<back_off_growth_factor>,<min_back_off>,<max_back_off>".
/// back_off_growth_factor: f64 > 1.0
/// min_back_off: f64 in seconds
/// max_back_off: f64 in seconds
#[clap(long, env, verbatim_doc_comment)]
pub paraswap_rate_limiter: Option<RateLimitingStrategy>,

#[clap(long, env)]
pub zeroex_url: Option<String>,

Expand Down Expand Up @@ -165,3 +175,30 @@ pub fn wei_from_gwei(s: &str) -> anyhow::Result<f64> {
let in_gwei: f64 = s.parse()?;
Ok(in_gwei * 1e9)
}

impl FromStr for RateLimitingStrategy {
type Err = anyhow::Error;

fn from_str(config: &str) -> Result<Self> {
let mut parts = config.split(',');
let back_off_growth_factor = parts
.next()
.ok_or_else(|| anyhow::anyhow!("missing back_off_growth_factor"))?;
let min_back_off = parts
.next()
.ok_or_else(|| anyhow::anyhow!("missing min_back_off"))?;
let max_back_off = parts
.next()
.ok_or_else(|| anyhow::anyhow!("missing max_back_off"))?;
MartinquaXD marked this conversation as resolved.
Show resolved Hide resolved
ensure!(
parts.next().is_none(),
"extraneous rate limiting parameters"
);
let back_off_growth_factor: f64 = back_off_growth_factor
.parse()
.context("parsing back_off_growth_factor")?;
let min_back_off = duration_from_seconds(min_back_off).context("parsing min_back_off")?;
let max_back_off = duration_from_seconds(max_back_off).context("parsing max_back_off")?;
Self::try_new(back_off_growth_factor, min_back_off, max_back_off)
}
}
175 changes: 171 additions & 4 deletions crates/shared/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use anyhow::{anyhow, Result};
use reqwest::Response;
use anyhow::{anyhow, ensure, Result};
use reqwest::{RequestBuilder, Response};
use std::{
sync::{Mutex, MutexGuard},
time::{Duration, Instant},
};

/// Extracts the bytes of the response up to some size limit.
///
Expand All @@ -19,11 +23,141 @@ pub async fn response_body_with_size_limit(
Ok(bytes)
}

#[derive(Debug, Clone)]
pub struct RateLimitingStrategy {
drop_requests_until: Instant,
/// How many requests got rate limited in a row.
times_rate_limited: u64,
back_off_growth_factor: f64,
min_back_off: Duration,
max_back_off: Duration,
}

impl RateLimitingStrategy {
pub fn try_new(
back_off_growth_factor: f64,
min_back_off: Duration,
max_back_off: Duration,
) -> Result<Self> {
ensure!(
back_off_growth_factor.is_normal(),
"back_off_growth_factor must be a normal f64"
);
ensure!(
back_off_growth_factor > 1.0,
"back_off_growth_factor needs to be greater than 1.0"
);
ensure!(
min_back_off <= max_back_off,
"min_back_off needs to be <= max_back_off"
);
Ok(Self {
drop_requests_until: Instant::now(),
times_rate_limited: 0,
back_off_growth_factor,
min_back_off,
max_back_off,
})
}
}

impl RateLimitingStrategy {
MartinquaXD marked this conversation as resolved.
Show resolved Hide resolved
/// Resets back off and stops rate limiting requests.
pub fn response_ok(&mut self) {
self.times_rate_limited = 0;
self.drop_requests_until = Instant::now();
}

/// Calculates back off based on how often we got rate limited in a row.
fn get_current_back_off(&self) -> Duration {
let mut back_off = self.min_back_off;
for _ in 0..self.times_rate_limited {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use f64::powi or f64::powf instead of a loop. You can't get overflow panics with f64 because worst case it will stay at +INF.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid an overflow with Duration::mul_f64().
Being able to compute the correct factor to call mul_f64() only once wouldn't help us much if that would cause mul_f64() to panic because the result would overflow Duration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a checked_mul_f64?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be like

let factor =  back_off_growth_factor.pow(times_rate_limited);
back_off *= factor;

I see now that Duration for some reason doesn't have a non panicking version of mul_f64 which is weird. In this case I feel it is reasonable to extract the f64 out of the duration, do the math, put it back.

Copy link
Contributor Author

@MartinquaXD MartinquaXD Apr 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately there is only checked_mul and saturating_mul which both require a u32 as the argument.
I could have made the growth_factor argument an u32 but that felt overly restrictive.
But I don't feel strongly about that so I could change it if you'd like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I feel it is reasonable to extract the f64 out of the duration, do the math, put it back.

I hadn't considered that before. Good idea. 👍

// do this iteratively to reasonably prevent panics on overflow
back_off = back_off.mul_f64(self.back_off_growth_factor);
if back_off >= self.max_back_off {
return self.max_back_off;
}
}
back_off
}

/// Returns updated back off if no other thread increased it in the mean time.
pub fn response_rate_limited(&mut self, previous_rate_limits: u64) -> Option<Duration> {
if self.times_rate_limited != previous_rate_limits {
// Don't increase back off if somebody else already updated it in the meantime.
return None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of returning None, should we return the current backoff here, even if we don't increment the counter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line with the comment above, we need some way to avoid printing duplicated messages.
In the first version of this implementation I logged messages while holding the Mutex lock. The code for that was way cleaner and easier to understand but also made the critical section super long.
Now I only return Some(Duration) in case we actually increased the back off and therefore need a log message.

}

self.times_rate_limited += 1;
let new_back_off = self.get_current_back_off();
self.drop_requests_until = Instant::now() + new_back_off;
Some(new_back_off)
}

/// Returns number of times we got rate limited in a row if we are currently allowing requests.
pub fn times_rate_limited(&self, now: Instant) -> Option<u64> {
if self.drop_requests_until > now {
nlordell marked this conversation as resolved.
Show resolved Hide resolved
return None;
}

Some(self.times_rate_limited)
}
}

#[derive(Debug)]
pub struct RateLimiter {
pub strategy: Mutex<RateLimitingStrategy>,
}

impl RateLimiter {
fn strategy(&self) -> MutexGuard<RateLimitingStrategy> {
self.strategy.lock().unwrap()
}
}

impl From<RateLimitingStrategy> for RateLimiter {
fn from(strategy: RateLimitingStrategy) -> Self {
Self {
strategy: Mutex::new(strategy),
}
}
}

impl RateLimiter {
/// If a request receives the response "Too many requests" (status code 429) future requests
/// will get dropped for some time. Every successive 429 response increases that time exponentially.
/// When a request eventually returns a normal result again future requests will no longer get
/// dropped until the next 429 response occurs.
pub async fn request(&self, request: RequestBuilder) -> Result<Response> {
let times_rate_limited = match self.strategy().times_rate_limited(Instant::now()) {
None => {
tracing::warn!("dropping request because API is currently rate limited");
anyhow::bail!("backing off rate limit");
}
Some(times_rate_limited) => times_rate_limited,
};

let response = request.send().await?;

if response.status() == 429 {
if let Some(new_back_off) = self.strategy().response_rate_limited(times_rate_limited) {
tracing::warn!("extended rate limiting for {:?}", new_back_off);
}
anyhow::bail!("rate limited");
} else {
self.strategy().response_ok();
tracing::debug!("reset rate limit");
Ok(response)
}
}
}

#[cfg(test)]
mod tests {
use reqwest::Client;

use super::*;
use futures::stream::{self, StreamExt};
use reqwest::Client;
use tokio::time::{sleep_until, Instant as TokioInstant};

#[tokio::test]
#[ignore]
Expand All @@ -43,4 +177,37 @@ mod tests {
let text = std::str::from_utf8(&bytes).unwrap();
dbg!(text);
}

#[tokio::test]
#[ignore]
async fn rate_limited_requests() {
let client = Client::default();

let url = "https://apiv5.paraswap.io/prices?srcToken=0x99d8a9c45b2eca8864373a26d1459e3dff1e17f3&destToken=0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48&srcDecimals=18&destDecimals=6&amount=100000000&side=BUY&network=1&excludeDEXS=ParaSwapPool4";
let strategy = RateLimitingStrategy::try_new(
2.0,
Duration::from_millis(16),
Duration::from_millis(20_000),
)
.unwrap();
let rate_limiter = RateLimiter::from(strategy);
// note that 1_000 requests will not always trigger a rate limit
let mut stream = stream::iter(0..1_000).map(|_| async {}).buffer_unordered(2);
while stream.next().await.is_some() {
let request = client.get(url);
let response = rate_limiter.request(request).await;
match &response {
Ok(response) => println!("{}", response.status()),
Err(e) => {
println!("error: {}", e);
let instant = rate_limiter.strategy.lock().unwrap().drop_requests_until;
println!(
"sleeping for {} milliseconds",
instant.duration_since(Instant::now()).as_millis()
);
sleep_until(TokioInstant::from_std(instant)).await;
}
}
}
}
}
26 changes: 16 additions & 10 deletions crates/shared/src/paraswap_api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::debug_bytes;
use crate::{debug_bytes, http_client::RateLimiter};
use anyhow::Result;
use derivative::Derivative;
use ethcontract::{H160, U256};
Expand Down Expand Up @@ -28,14 +28,19 @@ pub trait ParaswapApi: Send + Sync {
pub struct DefaultParaswapApi {
pub client: Client,
pub partner: String,
pub rate_limiter: Option<RateLimiter>,
}

#[async_trait::async_trait]
impl ParaswapApi for DefaultParaswapApi {
async fn price(&self, query: PriceQuery) -> Result<PriceResponse, ParaswapResponseError> {
let url = query.into_url(&self.partner);
tracing::debug!("Querying Paraswap price API: {}", url);
let response = self.client.get(url).send().await?;
let request = self.client.get(url);
let response = match &self.rate_limiter {
Some(limiter) => limiter.request(request).await?,
None => request.send().await?,
};
let status = response.status();
let text = response.text().await?;
tracing::debug!(%status, %text, "Response from Paraswap price API");
Expand All @@ -49,12 +54,12 @@ impl ParaswapApi for DefaultParaswapApi {
query,
partner: &self.partner,
};
let response_text = query
.into_request(&self.client)
.send()
.await?
.text()
.await?;
let request = query.into_request(&self.client);
let response = match &self.rate_limiter {
Some(limiter) => limiter.request(request).await?,
None => request.send().await?,
};
let response_text = response.text().await?;
parse_paraswap_response_text(&response_text)
}
}
Expand Down Expand Up @@ -84,7 +89,7 @@ pub enum ParaswapResponseError {
Retryable(String),

#[error("other ParaSwap error: {0}")]
Other(String),
Other(#[from] anyhow::Error),
MartinquaXD marked this conversation as resolved.
Show resolved Hide resolved
}

impl ParaswapResponseError {
Expand Down Expand Up @@ -119,7 +124,7 @@ where
| "Too much slippage on quote, please try again" => {
Err(ParaswapResponseError::InsufficientLiquidity(message))
}
_ => Err(ParaswapResponseError::Other(message)),
_ => Err(ParaswapResponseError::Other(anyhow::anyhow!(message))),
},
}
}
Expand Down Expand Up @@ -727,6 +732,7 @@ mod tests {
let api = DefaultParaswapApi {
client: Client::new(),
partner: "Test".into(),
rate_limiter: None,
};

let good_query = TransactionBuilderQuery {
Expand Down
1 change: 1 addition & 0 deletions crates/shared/src/price_estimation/paraswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ mod tests {
let paraswap = DefaultParaswapApi {
client: Client::new(),
partner: "".to_string(),
rate_limiter: None,
};
let estimator = ParaswapPriceEstimator {
paraswap: Arc::new(paraswap),
Expand Down
1 change: 1 addition & 0 deletions crates/solver/src/solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ pub fn create(
disabled_paraswap_dexs.clone(),
client.clone(),
paraswap_partner.clone(),
None,
),
solver_metrics.clone(),
))),
Expand Down
4 changes: 4 additions & 0 deletions crates/solver/src/solver/paraswap_solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use ethcontract::{Account, Bytes, H160, U256};
use maplit::hashmap;
use model::order::OrderKind;
use reqwest::Client;
use shared::http_client::RateLimiter;
use shared::paraswap_api::{
DefaultParaswapApi, ParaswapApi, ParaswapResponseError, PriceQuery, PriceResponse, Side,
TradeAmount, TransactionBuilderQuery, TransactionBuilderResponse,
Expand Down Expand Up @@ -53,6 +54,7 @@ impl ParaswapSolver {
disabled_paraswap_dexs: Vec<String>,
client: Client,
partner: Option<String>,
rate_limiter: Option<RateLimiter>,
) -> Self {
let allowance_fetcher = AllowanceManager::new(web3, settlement_contract.address());

Expand All @@ -64,6 +66,7 @@ impl ParaswapSolver {
client: Box::new(DefaultParaswapApi {
client,
partner: partner.unwrap_or_else(|| REFERRER.into()),
rate_limiter,
}),
slippage_bps,
disabled_paraswap_dexs,
Expand Down Expand Up @@ -539,6 +542,7 @@ mod tests {
vec![],
Client::new(),
None,
None,
);

let settlement = solver
Expand Down