Skip to content

Commit

Permalink
Retry service maintenance (#811)
Browse files Browse the repository at this point in the history
This PR changes the service maintenance component to retry updating to the latest block after some delay instead of waiting until a new block is observed.

This is done for two reasons:
1. In order to reduce alert noise because of maintenance update delays (I made sure to keep track of success and failure so we can setup dashboards and alerts for it).
2. Because we want maintenance to run so we are at the "latest" block always. Without retrying, we stay 12 seconds knowing that there is a new block and we have stale data before trying to update again. So, conceptually, retrying here is not so much of a sin.

Furthermore, in terms of the first point, changing the parameters of the alerts is not really possible. The issue is that the current alert measures how stale our data is (i.e. how far the last updated block is from the latest block). This PR proposes actively trying to make our data less stale (retrying to update to newer blocks when an update fails) and start measuring how often that has issues instead.

In general, I hate retry logic as it hides more problematic underlying issues... I'm not 100% convinced this is the right approach to go.

### Test Plan

Added a unit test.
  • Loading branch information
Nicholas Rodrigues Lordello committed Nov 19, 2022
1 parent e01194c commit 22ebd7a
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 47 deletions.
10 changes: 5 additions & 5 deletions crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use shared::{
},
gas_price::InstrumentedGasEstimator,
http_client::HttpClientFactory,
maintenance::Maintaining,
maintenance::{Maintaining, ServiceMaintenance},
metrics::LivenessChecking,
oneinch_api::OneInchClientImpl,
order_quoting::OrderQuoter,
Expand Down Expand Up @@ -475,14 +475,14 @@ pub async fn main(args: arguments::Arguments) {
));
maintainers.push(broadcaster_event_updater);
}
let mut service_maintainer = shared::maintenance::ServiceMaintenance { maintainers };

if let Some(balancer) = balancer_pool_fetcher {
service_maintainer.maintainers.push(balancer);
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
service_maintainer.maintainers.push(uniswap_v3);
maintainers.push(uniswap_v3);
}

let service_maintainer = ServiceMaintenance::new(maintainers);
let maintenance_task = tokio::task::spawn(
service_maintainer.run_maintenance_on_new_block(current_block_stream.clone()),
);
Expand Down
6 changes: 3 additions & 3 deletions crates/driver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,14 @@ async fn build_auction_converter(
(None, None)
};

let maintainer = ServiceMaintenance {
maintainers: pool_caches
let maintainer = ServiceMaintenance::new(
pool_caches
.into_iter()
.map(|(_, cache)| cache as Arc<dyn Maintaining>)
.chain(balancer_pool_maintainer)
.chain(uniswap_v3_maintainer)
.collect(),
};
);
tokio::task::spawn(
maintainer.run_maintenance_on_new_block(common.current_block_stream.clone()),
);
Expand Down
12 changes: 5 additions & 7 deletions crates/e2e/tests/e2e/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,11 @@ impl OrderbookServices {
api_db.as_ref().clone(),
order_validator.clone(),
));
let maintenance = ServiceMaintenance {
maintainers: vec![
Arc::new(autopilot_db.clone()),
ethflow_event_updater,
gpv2_event_updater,
],
};
let maintenance = ServiceMaintenance::new(vec![
Arc::new(autopilot_db.clone()),
ethflow_event_updater,
gpv2_event_updater,
]);
let quotes = Arc::new(QuoteHandler::new(order_validator, quoter.clone()));
orderbook::serve_api(
api_db.clone(),
Expand Down
14 changes: 8 additions & 6 deletions crates/orderbook/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use shared::{
},
gas_price::InstrumentedGasEstimator,
http_client::HttpClientFactory,
maintenance::ServiceMaintenance,
maintenance::{Maintaining, ServiceMaintenance},
metrics::{serve_metrics, DEFAULT_METRICS_PORT},
network::network_name,
oneinch_api::OneInchClientImpl,
Expand Down Expand Up @@ -423,15 +423,15 @@ async fn main() {
database.as_ref().clone(),
order_validator.clone(),
));
let mut service_maintainer = ServiceMaintenance {
maintainers: vec![pool_fetcher],
};

let mut maintainers = vec![pool_fetcher as Arc<dyn Maintaining>];
if let Some(balancer) = balancer_pool_fetcher {
service_maintainer.maintainers.push(balancer);
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
service_maintainer.maintainers.push(uniswap_v3);
maintainers.push(uniswap_v3);
}

check_database_connection(orderbook.as_ref()).await;
let quotes =
Arc::new(QuoteHandler::new(order_validator, optimal_quoter).with_fast_quoter(fast_quoter));
Expand All @@ -448,6 +448,8 @@ async fn main() {
args.shared.solver_competition_auth,
native_price_estimator,
);

let service_maintainer = ServiceMaintenance::new(maintainers);
let maintenance_task =
task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream));

Expand Down
129 changes: 106 additions & 23 deletions crates/shared/src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
use crate::current_block::{self, Block, CurrentBlockStream};
use anyhow::{ensure, Result};
use futures::{future::join_all, Stream, StreamExt};
use std::sync::Arc;
use tracing::Instrument;
use futures::{future::join_all, Stream, StreamExt as _};
use std::{sync::Arc, time::Duration};
use tokio::time;
use tracing::Instrument as _;

/// Collects all service components requiring maintenance on each new block
pub struct ServiceMaintenance {
pub maintainers: Vec<Arc<dyn Maintaining>>,
maintainers: Vec<Arc<dyn Maintaining>>,
retry_delay: Duration,
}

impl ServiceMaintenance {
pub fn new(maintainers: Vec<Arc<dyn Maintaining>>) -> Self {
Self {
maintainers,
retry_delay: Duration::from_secs(1),
}
}
}

#[cfg_attr(test, mockall::automock)]
Expand All @@ -32,36 +43,59 @@ impl Maintaining for ServiceMaintenance {
}

impl ServiceMaintenance {
async fn run_maintenance_for_block_stream(self, block_stream: impl Stream<Item = Block>) {
futures::pin_mut!(block_stream);

pub async fn run_maintenance_for_blocks(self, blocks: impl Stream<Item = Block>) {
let metrics = Metrics::instance(global_metrics::get_metric_storage_registry()).unwrap();
for label in ["success", "failure"] {
metrics.runs.with_label_values(&[label]).reset();
}

let blocks = blocks.fuse();
futures::pin_mut!(blocks);

while let Some(block) = block_stream.next().await {
let mut retry_block = None;

while let Some(block) = match retry_block.take() {
// We have a pending retry to process. First see if there is a new
// block that becomes available within a certain retry delay, and if
// there is, prefer that over the old outdated block.
Some(block) => time::timeout(self.retry_delay, blocks.next())
.await
.unwrap_or(Some(block)),
None => blocks.next().await,
} {
tracing::debug!(
"running maintenance on block number {:?} hash {:?}",
block.number,
block.hash
?block.number, ?block.hash,
"running maintenance",
);

let block = block.number.unwrap_or_default().as_u64();
metrics.last_seen_block.set(block as _);
let block_number = block.number.unwrap_or_default().as_u64();

if self
metrics.last_seen_block.set(block_number as _);

if let Err(err) = self
.run_maintenance()
.instrument(tracing::debug_span!("maintenance", block))
.instrument(tracing::debug_span!("maintenance", block = block_number))
.await
.is_ok()
{
metrics.last_updated_block.set(block as _);
tracing::debug!(
?block.number, ?block.hash, ?err,
"maintenance failed; queuing retry",
);

metrics.runs.with_label_values(&["failure"]).inc();
retry_block = Some(block);
continue;
}

metrics.last_updated_block.set(block_number as _);
metrics.runs.with_label_values(&["success"]).inc();
}
}

pub async fn run_maintenance_on_new_block(self, current_block_stream: CurrentBlockStream) -> ! {
self.run_maintenance_for_block_stream(current_block::into_stream(current_block_stream))
self.run_maintenance_for_blocks(current_block::into_stream(current_block_stream))
.await;
unreachable!()
panic!("block stream unexpectedly dropped");
}
}

Expand All @@ -75,12 +109,18 @@ struct Metrics {
/// Service maintenance last successfully updated block.
#[metric()]
last_updated_block: prometheus::IntGauge,

/// Service maintenance error counter
#[metric(labels("result"))]
runs: prometheus::IntCounterVec,
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::bail;
use futures::stream;
use mockall::Sequence;

#[tokio::test]
async fn run_maintenance_no_early_exit_on_error() {
Expand All @@ -106,28 +146,71 @@ mod tests {
Arc::new(err_mock_maintenance),
Arc::new(ok2_mock_maintenance),
],
retry_delay: Duration::default(),
};

assert!(service_maintenance.run_maintenance().await.is_err());
}

#[tokio::test]
async fn block_stream_maintenance() {
let block_count = 2;
let mut mock_maintenance = MockMaintaining::new();
let block_count = 5;

// Mock interface is responsible for assertions here.
// Will panic if run_maintenance is not called exactly `block_count` times.
let mut mock_maintenance = MockMaintaining::new();
mock_maintenance
.expect_run_maintenance()
.times(block_count)
.returning(|| Ok(()));

let service_maintenance = ServiceMaintenance {
maintainers: vec![Arc::new(mock_maintenance)],
retry_delay: Duration::default(),
};

let block_stream = stream::repeat(Block::default()).take(block_count);
service_maintenance
.run_maintenance_for_blocks(block_stream)
.await;
}

#[tokio::test]
async fn block_stream_retries_failed_blocks() {
crate::tracing::initialize("debug", tracing::Level::ERROR.into());

let mut mock_maintenance = MockMaintaining::new();
let mut sequence = Sequence::new();
mock_maintenance
.expect_run_maintenance()
.return_once(|| bail!("test"))
.times(1)
.in_sequence(&mut sequence);
mock_maintenance
.expect_run_maintenance()
.return_once(|| Ok(()))
.times(1)
.in_sequence(&mut sequence);
mock_maintenance
.expect_run_maintenance()
.return_once(|| Ok(()))
.times(1)
.in_sequence(&mut sequence);

let service_maintenance = ServiceMaintenance {
maintainers: vec![Arc::new(mock_maintenance)],
retry_delay: Duration::default(),
};

let block_stream = futures::stream::repeat(Block::default()).take(block_count);
let block_stream = async_stream::stream! {
yield Block::default();

// Wait a bit to trigger a retry and not just go to the next block
time::sleep(Duration::from_millis(10)).await;
yield Block::default();
};
service_maintenance
.run_maintenance_for_block_stream(block_stream)
.run_maintenance_for_blocks(block_stream)
.await;
}
}
6 changes: 3 additions & 3 deletions crates/solver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,14 +459,14 @@ async fn main() {
args.solution_comparison_decimal_cutoff,
);

let maintainer = ServiceMaintenance {
maintainers: pool_caches
let maintainer = ServiceMaintenance::new(
pool_caches
.into_iter()
.map(|(_, cache)| cache as Arc<dyn Maintaining>)
.chain(balancer_pool_maintainer)
.chain(uniswap_v3_maintainer)
.collect(),
};
);
tokio::task::spawn(maintainer.run_maintenance_on_new_block(current_block_stream));

serve_metrics(metrics, ([0, 0, 0, 0], args.metrics_port).into());
Expand Down

0 comments on commit 22ebd7a

Please sign in to comment.