Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry service maintenance #811

Merged
merged 3 commits into from
Nov 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use shared::{
},
gas_price::InstrumentedGasEstimator,
http_client::HttpClientFactory,
maintenance::Maintaining,
maintenance::{Maintaining, ServiceMaintenance},
metrics::LivenessChecking,
oneinch_api::OneInchClientImpl,
order_quoting::OrderQuoter,
Expand Down Expand Up @@ -475,14 +475,14 @@ pub async fn main(args: arguments::Arguments) {
));
maintainers.push(broadcaster_event_updater);
}
let mut service_maintainer = shared::maintenance::ServiceMaintenance { maintainers };

if let Some(balancer) = balancer_pool_fetcher {
service_maintainer.maintainers.push(balancer);
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
service_maintainer.maintainers.push(uniswap_v3);
maintainers.push(uniswap_v3);
}

let service_maintainer = ServiceMaintenance::new(maintainers);
let maintenance_task = tokio::task::spawn(
service_maintainer.run_maintenance_on_new_block(current_block_stream.clone()),
);
Expand Down
6 changes: 3 additions & 3 deletions crates/driver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,14 @@ async fn build_auction_converter(
(None, None)
};

let maintainer = ServiceMaintenance {
maintainers: pool_caches
let maintainer = ServiceMaintenance::new(
pool_caches
.into_iter()
.map(|(_, cache)| cache as Arc<dyn Maintaining>)
.chain(balancer_pool_maintainer)
.chain(uniswap_v3_maintainer)
.collect(),
};
);
tokio::task::spawn(
maintainer.run_maintenance_on_new_block(common.current_block_stream.clone()),
);
Expand Down
12 changes: 5 additions & 7 deletions crates/e2e/tests/e2e/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,11 @@ impl OrderbookServices {
api_db.as_ref().clone(),
order_validator.clone(),
));
let maintenance = ServiceMaintenance {
maintainers: vec![
Arc::new(autopilot_db.clone()),
ethflow_event_updater,
gpv2_event_updater,
],
};
let maintenance = ServiceMaintenance::new(vec![
Arc::new(autopilot_db.clone()),
ethflow_event_updater,
gpv2_event_updater,
]);
let quotes = Arc::new(QuoteHandler::new(order_validator, quoter.clone()));
orderbook::serve_api(
api_db.clone(),
Expand Down
14 changes: 8 additions & 6 deletions crates/orderbook/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use shared::{
},
gas_price::InstrumentedGasEstimator,
http_client::HttpClientFactory,
maintenance::ServiceMaintenance,
maintenance::{Maintaining, ServiceMaintenance},
metrics::{serve_metrics, DEFAULT_METRICS_PORT},
network::network_name,
oneinch_api::OneInchClientImpl,
Expand Down Expand Up @@ -423,15 +423,15 @@ async fn main() {
database.as_ref().clone(),
order_validator.clone(),
));
let mut service_maintainer = ServiceMaintenance {
maintainers: vec![pool_fetcher],
};

let mut maintainers = vec![pool_fetcher as Arc<dyn Maintaining>];
if let Some(balancer) = balancer_pool_fetcher {
service_maintainer.maintainers.push(balancer);
maintainers.push(balancer);
}
if let Some(uniswap_v3) = uniswap_v3_pool_fetcher {
service_maintainer.maintainers.push(uniswap_v3);
maintainers.push(uniswap_v3);
}

check_database_connection(orderbook.as_ref()).await;
let quotes =
Arc::new(QuoteHandler::new(order_validator, optimal_quoter).with_fast_quoter(fast_quoter));
Expand All @@ -448,6 +448,8 @@ async fn main() {
args.shared.solver_competition_auth,
native_price_estimator,
);

let service_maintainer = ServiceMaintenance::new(maintainers);
let maintenance_task =
task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream));

Expand Down
129 changes: 106 additions & 23 deletions crates/shared/src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
use crate::current_block::{self, Block, CurrentBlockStream};
use anyhow::{ensure, Result};
use futures::{future::join_all, Stream, StreamExt};
use std::sync::Arc;
use tracing::Instrument;
use futures::{future::join_all, Stream, StreamExt as _};
use std::{sync::Arc, time::Duration};
use tokio::time;
use tracing::Instrument as _;

/// Collects all service components requiring maintenance on each new block
pub struct ServiceMaintenance {
pub maintainers: Vec<Arc<dyn Maintaining>>,
maintainers: Vec<Arc<dyn Maintaining>>,
retry_delay: Duration,
}

impl ServiceMaintenance {
pub fn new(maintainers: Vec<Arc<dyn Maintaining>>) -> Self {
Self {
maintainers,
retry_delay: Duration::from_secs(1),
}
}
}

#[cfg_attr(test, mockall::automock)]
Expand All @@ -32,36 +43,59 @@ impl Maintaining for ServiceMaintenance {
}

impl ServiceMaintenance {
async fn run_maintenance_for_block_stream(self, block_stream: impl Stream<Item = Block>) {
futures::pin_mut!(block_stream);

pub async fn run_maintenance_for_blocks(self, blocks: impl Stream<Item = Block>) {
let metrics = Metrics::instance(global_metrics::get_metric_storage_registry()).unwrap();
for label in ["success", "failure"] {
metrics.runs.with_label_values(&[label]).reset();
}

let blocks = blocks.fuse();
futures::pin_mut!(blocks);

while let Some(block) = block_stream.next().await {
let mut retry_block = None;

while let Some(block) = match retry_block.take() {
// We have a pending retry to process. First see if there is a new
// block that becomes available within a certain retry delay, and if
// there is, prefer that over the old outdated block.
Some(block) => time::timeout(self.retry_delay, blocks.next())
.await
.unwrap_or(Some(block)),
None => blocks.next().await,
} {
tracing::debug!(
"running maintenance on block number {:?} hash {:?}",
block.number,
block.hash
?block.number, ?block.hash,
"running maintenance",
);

let block = block.number.unwrap_or_default().as_u64();
metrics.last_seen_block.set(block as _);
let block_number = block.number.unwrap_or_default().as_u64();

if self
metrics.last_seen_block.set(block_number as _);

if let Err(err) = self
.run_maintenance()
.instrument(tracing::debug_span!("maintenance", block))
.instrument(tracing::debug_span!("maintenance", block = block_number))
.await
.is_ok()
{
metrics.last_updated_block.set(block as _);
tracing::debug!(
?block.number, ?block.hash, ?err,
"maintenance failed; queuing retry",
);

metrics.runs.with_label_values(&["failure"]).inc();
nlordell marked this conversation as resolved.
Show resolved Hide resolved
nlordell marked this conversation as resolved.
Show resolved Hide resolved
retry_block = Some(block);
continue;
}

metrics.last_updated_block.set(block_number as _);
metrics.runs.with_label_values(&["success"]).inc();
}
}

pub async fn run_maintenance_on_new_block(self, current_block_stream: CurrentBlockStream) -> ! {
self.run_maintenance_for_block_stream(current_block::into_stream(current_block_stream))
self.run_maintenance_for_blocks(current_block::into_stream(current_block_stream))
.await;
unreachable!()
panic!("block stream unexpectedly dropped");
}
}

Expand All @@ -75,12 +109,18 @@ struct Metrics {
/// Service maintenance last successfully updated block.
#[metric()]
last_updated_block: prometheus::IntGauge,

/// Service maintenance error counter
#[metric(labels("result"))]
runs: prometheus::IntCounterVec,
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::bail;
use futures::stream;
use mockall::Sequence;

#[tokio::test]
async fn run_maintenance_no_early_exit_on_error() {
Expand All @@ -106,28 +146,71 @@ mod tests {
Arc::new(err_mock_maintenance),
Arc::new(ok2_mock_maintenance),
],
retry_delay: Duration::default(),
};

assert!(service_maintenance.run_maintenance().await.is_err());
}

#[tokio::test]
async fn block_stream_maintenance() {
let block_count = 2;
let mut mock_maintenance = MockMaintaining::new();
let block_count = 5;

// Mock interface is responsible for assertions here.
// Will panic if run_maintenance is not called exactly `block_count` times.
let mut mock_maintenance = MockMaintaining::new();
mock_maintenance
.expect_run_maintenance()
.times(block_count)
.returning(|| Ok(()));

let service_maintenance = ServiceMaintenance {
maintainers: vec![Arc::new(mock_maintenance)],
retry_delay: Duration::default(),
};

let block_stream = stream::repeat(Block::default()).take(block_count);
service_maintenance
.run_maintenance_for_blocks(block_stream)
.await;
}

#[tokio::test]
async fn block_stream_retries_failed_blocks() {
crate::tracing::initialize("debug", tracing::Level::ERROR.into());

let mut mock_maintenance = MockMaintaining::new();
let mut sequence = Sequence::new();
mock_maintenance
.expect_run_maintenance()
.return_once(|| bail!("test"))
.times(1)
.in_sequence(&mut sequence);
mock_maintenance
.expect_run_maintenance()
.return_once(|| Ok(()))
.times(1)
.in_sequence(&mut sequence);
MartinquaXD marked this conversation as resolved.
Show resolved Hide resolved
mock_maintenance
.expect_run_maintenance()
.return_once(|| Ok(()))
.times(1)
.in_sequence(&mut sequence);

let service_maintenance = ServiceMaintenance {
maintainers: vec![Arc::new(mock_maintenance)],
retry_delay: Duration::default(),
};

let block_stream = futures::stream::repeat(Block::default()).take(block_count);
let block_stream = async_stream::stream! {
yield Block::default();

// Wait a bit to trigger a retry and not just go to the next block
time::sleep(Duration::from_millis(10)).await;
yield Block::default();
};
service_maintenance
.run_maintenance_for_block_stream(block_stream)
.run_maintenance_for_blocks(block_stream)
.await;
}
}
6 changes: 3 additions & 3 deletions crates/solver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,14 +459,14 @@ async fn main() {
args.solution_comparison_decimal_cutoff,
);

let maintainer = ServiceMaintenance {
maintainers: pool_caches
let maintainer = ServiceMaintenance::new(
pool_caches
.into_iter()
.map(|(_, cache)| cache as Arc<dyn Maintaining>)
.chain(balancer_pool_maintainer)
.chain(uniswap_v3_maintainer)
.collect(),
};
);
tokio::task::spawn(maintainer.run_maintenance_on_new_block(current_block_stream));

serve_metrics(metrics, ([0, 0, 0, 0], args.metrics_port).into());
Expand Down