diff --git a/crates/autopilot/src/lib.rs b/crates/autopilot/src/lib.rs index c19971b695..8dd7284bb3 100644 --- a/crates/autopilot/src/lib.rs +++ b/crates/autopilot/src/lib.rs @@ -35,7 +35,7 @@ use shared::{ }, gas_price::InstrumentedGasEstimator, http_client::HttpClientFactory, - maintenance::Maintaining, + maintenance::{Maintaining, ServiceMaintenance}, metrics::LivenessChecking, oneinch_api::OneInchClientImpl, order_quoting::OrderQuoter, @@ -475,14 +475,14 @@ pub async fn main(args: arguments::Arguments) { )); maintainers.push(broadcaster_event_updater); } - let mut service_maintainer = shared::maintenance::ServiceMaintenance { maintainers }; - if let Some(balancer) = balancer_pool_fetcher { - service_maintainer.maintainers.push(balancer); + maintainers.push(balancer); } if let Some(uniswap_v3) = uniswap_v3_pool_fetcher { - service_maintainer.maintainers.push(uniswap_v3); + maintainers.push(uniswap_v3); } + + let service_maintainer = ServiceMaintenance::new(maintainers); let maintenance_task = tokio::task::spawn( service_maintainer.run_maintenance_on_new_block(current_block_stream.clone()), ); diff --git a/crates/driver/src/main.rs b/crates/driver/src/main.rs index cfdb2c40c8..0a2d90e066 100644 --- a/crates/driver/src/main.rs +++ b/crates/driver/src/main.rs @@ -409,14 +409,14 @@ async fn build_auction_converter( (None, None) }; - let maintainer = ServiceMaintenance { - maintainers: pool_caches + let maintainer = ServiceMaintenance::new( + pool_caches .into_iter() .map(|(_, cache)| cache as Arc) .chain(balancer_pool_maintainer) .chain(uniswap_v3_maintainer) .collect(), - }; + ); tokio::task::spawn( maintainer.run_maintenance_on_new_block(common.current_block_stream.clone()), ); diff --git a/crates/e2e/tests/e2e/services.rs b/crates/e2e/tests/e2e/services.rs index a55648838f..bf4ad7cd71 100644 --- a/crates/e2e/tests/e2e/services.rs +++ b/crates/e2e/tests/e2e/services.rs @@ -199,13 +199,11 @@ impl OrderbookServices { api_db.as_ref().clone(), order_validator.clone(), )); - let maintenance = ServiceMaintenance { - maintainers: vec![ - Arc::new(autopilot_db.clone()), - ethflow_event_updater, - gpv2_event_updater, - ], - }; + let maintenance = ServiceMaintenance::new(vec![ + Arc::new(autopilot_db.clone()), + ethflow_event_updater, + gpv2_event_updater, + ]); let quotes = Arc::new(QuoteHandler::new(order_validator, quoter.clone())); orderbook::serve_api( api_db.clone(), diff --git a/crates/orderbook/src/main.rs b/crates/orderbook/src/main.rs index b9e55ca577..4081ea6586 100644 --- a/crates/orderbook/src/main.rs +++ b/crates/orderbook/src/main.rs @@ -24,7 +24,7 @@ use shared::{ }, gas_price::InstrumentedGasEstimator, http_client::HttpClientFactory, - maintenance::ServiceMaintenance, + maintenance::{Maintaining, ServiceMaintenance}, metrics::{serve_metrics, DEFAULT_METRICS_PORT}, network::network_name, oneinch_api::OneInchClientImpl, @@ -423,15 +423,15 @@ async fn main() { database.as_ref().clone(), order_validator.clone(), )); - let mut service_maintainer = ServiceMaintenance { - maintainers: vec![pool_fetcher], - }; + + let mut maintainers = vec![pool_fetcher as Arc]; if let Some(balancer) = balancer_pool_fetcher { - service_maintainer.maintainers.push(balancer); + maintainers.push(balancer); } if let Some(uniswap_v3) = uniswap_v3_pool_fetcher { - service_maintainer.maintainers.push(uniswap_v3); + maintainers.push(uniswap_v3); } + check_database_connection(orderbook.as_ref()).await; let quotes = Arc::new(QuoteHandler::new(order_validator, optimal_quoter).with_fast_quoter(fast_quoter)); @@ -448,6 +448,8 @@ async fn main() { args.shared.solver_competition_auth, native_price_estimator, ); + + let service_maintainer = ServiceMaintenance::new(maintainers); let maintenance_task = task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream)); diff --git a/crates/shared/src/maintenance.rs b/crates/shared/src/maintenance.rs index 0c56350428..2961639b57 100644 --- a/crates/shared/src/maintenance.rs +++ b/crates/shared/src/maintenance.rs @@ -1,12 +1,23 @@ use crate::current_block::{self, Block, CurrentBlockStream}; use anyhow::{ensure, Result}; -use futures::{future::join_all, Stream, StreamExt}; -use std::sync::Arc; -use tracing::Instrument; +use futures::{future::join_all, Stream, StreamExt as _}; +use std::{sync::Arc, time::Duration}; +use tokio::time; +use tracing::Instrument as _; /// Collects all service components requiring maintenance on each new block pub struct ServiceMaintenance { - pub maintainers: Vec>, + maintainers: Vec>, + retry_delay: Duration, +} + +impl ServiceMaintenance { + pub fn new(maintainers: Vec>) -> Self { + Self { + maintainers, + retry_delay: Duration::from_secs(1), + } + } } #[cfg_attr(test, mockall::automock)] @@ -32,36 +43,59 @@ impl Maintaining for ServiceMaintenance { } impl ServiceMaintenance { - async fn run_maintenance_for_block_stream(self, block_stream: impl Stream) { - futures::pin_mut!(block_stream); - + pub async fn run_maintenance_for_blocks(self, blocks: impl Stream) { let metrics = Metrics::instance(global_metrics::get_metric_storage_registry()).unwrap(); + for label in ["success", "failure"] { + metrics.runs.with_label_values(&[label]).reset(); + } + + let blocks = blocks.fuse(); + futures::pin_mut!(blocks); - while let Some(block) = block_stream.next().await { + let mut retry_block = None; + + while let Some(block) = match retry_block.take() { + // We have a pending retry to process. First see if there is a new + // block that becomes available within a certain retry delay, and if + // there is, prefer that over the old outdated block. + Some(block) => time::timeout(self.retry_delay, blocks.next()) + .await + .unwrap_or(Some(block)), + None => blocks.next().await, + } { tracing::debug!( - "running maintenance on block number {:?} hash {:?}", - block.number, - block.hash + ?block.number, ?block.hash, + "running maintenance", ); - let block = block.number.unwrap_or_default().as_u64(); - metrics.last_seen_block.set(block as _); + let block_number = block.number.unwrap_or_default().as_u64(); - if self + metrics.last_seen_block.set(block_number as _); + + if let Err(err) = self .run_maintenance() - .instrument(tracing::debug_span!("maintenance", block)) + .instrument(tracing::debug_span!("maintenance", block = block_number)) .await - .is_ok() { - metrics.last_updated_block.set(block as _); + tracing::debug!( + ?block.number, ?block.hash, ?err, + "maintenance failed; queuing retry", + ); + + metrics.runs.with_label_values(&["failure"]).inc(); + retry_block = Some(block); + continue; } + + metrics.last_updated_block.set(block_number as _); + metrics.runs.with_label_values(&["success"]).inc(); } } pub async fn run_maintenance_on_new_block(self, current_block_stream: CurrentBlockStream) -> ! { - self.run_maintenance_for_block_stream(current_block::into_stream(current_block_stream)) + self.run_maintenance_for_blocks(current_block::into_stream(current_block_stream)) .await; - unreachable!() + panic!("block stream unexpectedly dropped"); } } @@ -75,12 +109,18 @@ struct Metrics { /// Service maintenance last successfully updated block. #[metric()] last_updated_block: prometheus::IntGauge, + + /// Service maintenance error counter + #[metric(labels("result"))] + runs: prometheus::IntCounterVec, } #[cfg(test)] mod tests { use super::*; use anyhow::bail; + use futures::stream; + use mockall::Sequence; #[tokio::test] async fn run_maintenance_no_early_exit_on_error() { @@ -106,6 +146,7 @@ mod tests { Arc::new(err_mock_maintenance), Arc::new(ok2_mock_maintenance), ], + retry_delay: Duration::default(), }; assert!(service_maintenance.run_maintenance().await.is_err()); @@ -113,21 +154,63 @@ mod tests { #[tokio::test] async fn block_stream_maintenance() { - let block_count = 2; - let mut mock_maintenance = MockMaintaining::new(); + let block_count = 5; + // Mock interface is responsible for assertions here. // Will panic if run_maintenance is not called exactly `block_count` times. + let mut mock_maintenance = MockMaintaining::new(); mock_maintenance .expect_run_maintenance() .times(block_count) .returning(|| Ok(())); + + let service_maintenance = ServiceMaintenance { + maintainers: vec![Arc::new(mock_maintenance)], + retry_delay: Duration::default(), + }; + + let block_stream = stream::repeat(Block::default()).take(block_count); + service_maintenance + .run_maintenance_for_blocks(block_stream) + .await; + } + + #[tokio::test] + async fn block_stream_retries_failed_blocks() { + crate::tracing::initialize("debug", tracing::Level::ERROR.into()); + + let mut mock_maintenance = MockMaintaining::new(); + let mut sequence = Sequence::new(); + mock_maintenance + .expect_run_maintenance() + .return_once(|| bail!("test")) + .times(1) + .in_sequence(&mut sequence); + mock_maintenance + .expect_run_maintenance() + .return_once(|| Ok(())) + .times(1) + .in_sequence(&mut sequence); + mock_maintenance + .expect_run_maintenance() + .return_once(|| Ok(())) + .times(1) + .in_sequence(&mut sequence); + let service_maintenance = ServiceMaintenance { maintainers: vec![Arc::new(mock_maintenance)], + retry_delay: Duration::default(), }; - let block_stream = futures::stream::repeat(Block::default()).take(block_count); + let block_stream = async_stream::stream! { + yield Block::default(); + + // Wait a bit to trigger a retry and not just go to the next block + time::sleep(Duration::from_millis(10)).await; + yield Block::default(); + }; service_maintenance - .run_maintenance_for_block_stream(block_stream) + .run_maintenance_for_blocks(block_stream) .await; } } diff --git a/crates/solver/src/main.rs b/crates/solver/src/main.rs index c220d6cfc1..5d6b9f25bd 100644 --- a/crates/solver/src/main.rs +++ b/crates/solver/src/main.rs @@ -459,14 +459,14 @@ async fn main() { args.solution_comparison_decimal_cutoff, ); - let maintainer = ServiceMaintenance { - maintainers: pool_caches + let maintainer = ServiceMaintenance::new( + pool_caches .into_iter() .map(|(_, cache)| cache as Arc) .chain(balancer_pool_maintainer) .chain(uniswap_v3_maintainer) .collect(), - }; + ); tokio::task::spawn(maintainer.run_maintenance_on_new_block(current_block_stream)); serve_metrics(metrics, ([0, 0, 0, 0], args.metrics_port).into());