Skip to content

Commit

Permalink
Retry service maintenance
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicholas Rodrigues Lordello committed Nov 17, 2022
1 parent 30b6512 commit 69c8a51
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 44 deletions.
10 changes: 5 additions & 5 deletions crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use shared::{
},
gas_price::InstrumentedGasEstimator,
http_client::HttpClientFactory,
maintenance::Maintaining,
maintenance::{Maintaining, ServiceMaintenance},
metrics::LivenessChecking,
oneinch_api::OneInchClientImpl,
order_quoting::OrderQuoter,
Expand Down Expand Up @@ -475,14 +475,14 @@ pub async fn main(args: arguments::Arguments) {
));
maintainers.push(broadcaster_event_updater);
}
let mut service_maintainer = shared::maintenance::ServiceMaintenance { maintainers };

if let Some(balancer) = balancer_pool_fetcher {
service_maintainer.maintainers.push(balancer);
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
service_maintainer.maintainers.push(uniswap_v3);
maintainers.push(uniswap_v3);
}

let service_maintainer = ServiceMaintenance::new(maintainers);
let maintenance_task = tokio::task::spawn(
service_maintainer.run_maintenance_on_new_block(current_block_stream.clone()),
);
Expand Down
6 changes: 3 additions & 3 deletions crates/driver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,14 @@ async fn build_auction_converter(
(None, None)
};

let maintainer = ServiceMaintenance {
maintainers: pool_caches
let maintainer = ServiceMaintenance::new(
pool_caches
.into_iter()
.map(|(_, cache)| cache as Arc<dyn Maintaining>)
.chain(balancer_pool_maintainer)
.chain(uniswap_v3_maintainer)
.collect(),
};
);
tokio::task::spawn(
maintainer.run_maintenance_on_new_block(common.current_block_stream.clone()),
);
Expand Down
12 changes: 5 additions & 7 deletions crates/e2e/tests/e2e/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,11 @@ impl OrderbookServices {
api_db.as_ref().clone(),
order_validator.clone(),
));
let maintenance = ServiceMaintenance {
maintainers: vec![
Arc::new(autopilot_db.clone()),
ethflow_event_updater,
gpv2_event_updater,
],
};
let maintenance = ServiceMaintenance::new(vec![
Arc::new(autopilot_db.clone()),
ethflow_event_updater,
gpv2_event_updater,
]);
let quotes = Arc::new(QuoteHandler::new(order_validator, quoter.clone()));
orderbook::serve_api(
api_db.clone(),
Expand Down
14 changes: 8 additions & 6 deletions crates/orderbook/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use shared::{
},
gas_price::InstrumentedGasEstimator,
http_client::HttpClientFactory,
maintenance::ServiceMaintenance,
maintenance::{Maintaining, ServiceMaintenance},
metrics::{serve_metrics, DEFAULT_METRICS_PORT},
network::network_name,
oneinch_api::OneInchClientImpl,
Expand Down Expand Up @@ -423,15 +423,15 @@ async fn main() {
database.as_ref().clone(),
order_validator.clone(),
));
let mut service_maintainer = ServiceMaintenance {
maintainers: vec![pool_fetcher],
};

let mut maintainers = vec![pool_fetcher as Arc<dyn Maintaining>];
if let Some(balancer) = balancer_pool_fetcher {
service_maintainer.maintainers.push(balancer);
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
service_maintainer.maintainers.push(uniswap_v3);
maintainers.push(uniswap_v3);
}

check_database_connection(orderbook.as_ref()).await;
let quotes =
Arc::new(QuoteHandler::new(order_validator, optimal_quoter).with_fast_quoter(fast_quoter));
Expand All @@ -448,6 +448,8 @@ async fn main() {
args.shared.solver_competition_auth,
native_price_estimator,
);

let service_maintainer = ServiceMaintenance::new(maintainers);
let maintenance_task =
task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream));

Expand Down
120 changes: 100 additions & 20 deletions crates/shared/src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
use crate::current_block::{self, Block, CurrentBlockStream};
use anyhow::{ensure, Result};
use futures::{future::join_all, Stream, StreamExt};
use std::sync::Arc;
use tracing::Instrument;
use futures::{future::join_all, Stream, StreamExt as _};
use std::{sync::Arc, time::Duration};
use tokio::time;
use tracing::Instrument as _;

/// Collects all service components requiring maintenance on each new block
pub struct ServiceMaintenance {
pub maintainers: Vec<Arc<dyn Maintaining>>,
maintainers: Vec<Arc<dyn Maintaining>>,
retry_delay: Duration,
}

impl ServiceMaintenance {
pub fn new(maintainers: Vec<Arc<dyn Maintaining>>) -> Self {
Self {
maintainers,
retry_delay: Duration::from_secs(1),
}
}
}

#[cfg_attr(test, mockall::automock)]
Expand All @@ -32,36 +43,56 @@ impl Maintaining for ServiceMaintenance {
}

impl ServiceMaintenance {
async fn run_maintenance_for_block_stream(self, block_stream: impl Stream<Item = Block>) {
futures::pin_mut!(block_stream);

pub async fn run_maintenance_for_blocks(self, blocks: impl Stream<Item = Block>) {
let metrics = Metrics::instance(global_metrics::get_metric_storage_registry()).unwrap();
for label in ["success", "failure"] {
metrics.runs.with_label_values(&[label]).reset();
}

let blocks = blocks.fuse();
futures::pin_mut!(blocks);

while let Some(block) = block_stream.next().await {
let mut retry_block = None;

while let Some(block) = match retry_block.take() {
// We have a pending retry to process. First see if there is a new
// block that becomes available within a certain retry delay, and if
// there is, prefer that over the old outdated block.
Some(block) => time::timeout(self.retry_delay, blocks.next())
.await
.unwrap_or(Some(block)),
None => blocks.next().await,
} {
tracing::debug!(
"running maintenance on block number {:?} hash {:?}",
block.number,
block.hash
block.hash,
);

let block = block.number.unwrap_or_default().as_u64();
metrics.last_seen_block.set(block as _);
let block_number = block.number.unwrap_or_default().as_u64();

metrics.last_seen_block.set(block_number as _);

if self
.run_maintenance()
.instrument(tracing::debug_span!("maintenance", block))
.instrument(tracing::debug_span!("maintenance", block = block_number))
.await
.is_ok()
.is_err()
{
metrics.last_updated_block.set(block as _);
metrics.runs.with_label_values(&["failure"]).inc();
retry_block = Some(block);
continue;
}

metrics.last_updated_block.set(block_number as _);
metrics.runs.with_label_values(&["success"]).inc();
}
}

pub async fn run_maintenance_on_new_block(self, current_block_stream: CurrentBlockStream) -> ! {
self.run_maintenance_for_block_stream(current_block::into_stream(current_block_stream))
self.run_maintenance_for_blocks(current_block::into_stream(current_block_stream))
.await;
unreachable!()
panic!("block stream unexpectedly dropped");
}
}

Expand All @@ -75,12 +106,18 @@ struct Metrics {
/// Service maintenance last successfully updated block.
#[metric()]
last_updated_block: prometheus::IntGauge,

/// Service maintenance error counter
#[metric(labels("result"))]
runs: prometheus::IntCounterVec,
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::bail;
use futures::stream;
use mockall::Sequence;

#[tokio::test]
async fn run_maintenance_no_early_exit_on_error() {
Expand All @@ -106,28 +143,71 @@ mod tests {
Arc::new(err_mock_maintenance),
Arc::new(ok2_mock_maintenance),
],
retry_delay: Duration::default(),
};

assert!(service_maintenance.run_maintenance().await.is_err());
}

#[tokio::test]
async fn block_stream_maintenance() {
let block_count = 2;
let mut mock_maintenance = MockMaintaining::new();
let block_count = 5;

// Mock interface is responsible for assertions here.
// Will panic if run_maintenance is not called exactly `block_count` times.
let mut mock_maintenance = MockMaintaining::new();
mock_maintenance
.expect_run_maintenance()
.times(block_count)
.returning(|| Ok(()));

let service_maintenance = ServiceMaintenance {
maintainers: vec![Arc::new(mock_maintenance)],
retry_delay: Duration::default(),
};

let block_stream = stream::repeat(Block::default()).take(block_count);
service_maintenance
.run_maintenance_for_blocks(block_stream)
.await;
}

#[tokio::test]
async fn block_stream_retries_failed_blocks() {
crate::tracing::initialize("debug", tracing::Level::ERROR.into());

let mut mock_maintenance = MockMaintaining::new();
let mut sequence = Sequence::new();
mock_maintenance
.expect_run_maintenance()
.return_once(|| bail!("test"))
.times(1)
.in_sequence(&mut sequence);
mock_maintenance
.expect_run_maintenance()
.return_once(|| Ok(()))
.times(1)
.in_sequence(&mut sequence);
mock_maintenance
.expect_run_maintenance()
.return_once(|| Ok(()))
.times(1)
.in_sequence(&mut sequence);

let service_maintenance = ServiceMaintenance {
maintainers: vec![Arc::new(mock_maintenance)],
retry_delay: Duration::default(),
};

let block_stream = futures::stream::repeat(Block::default()).take(block_count);
let block_stream = async_stream::stream! {
yield Block::default();

// Wait a bit to trigger a retry and not just go to the next block
time::sleep(Duration::from_millis(10)).await;
yield Block::default();
};
service_maintenance
.run_maintenance_for_block_stream(block_stream)
.run_maintenance_for_blocks(block_stream)
.await;
}
}
6 changes: 3 additions & 3 deletions crates/solver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,14 +459,14 @@ async fn main() {
args.solution_comparison_decimal_cutoff,
);

let maintainer = ServiceMaintenance {
maintainers: pool_caches
let maintainer = ServiceMaintenance::new(
pool_caches
.into_iter()
.map(|(_, cache)| cache as Arc<dyn Maintaining>)
.chain(balancer_pool_maintainer)
.chain(uniswap_v3_maintainer)
.collect(),
};
);
tokio::task::spawn(maintainer.run_maintenance_on_new_block(current_block_stream));

serve_metrics(metrics, ([0, 0, 0, 0], args.metrics_port).into());
Expand Down

0 comments on commit 69c8a51

Please sign in to comment.