Skip to content

Commit

Permalink
[Inspection Service] Add simple consensus health check endpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and danielxiangzl committed Dec 12, 2024
1 parent 69f331c commit 96464b4
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 29 deletions.
6 changes: 4 additions & 2 deletions crates/aptos-inspection-service/src/server/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
server::utils::CONTENT_TYPE_TEXT, CONFIGURATION_PATH, FORGE_METRICS_PATH, JSON_METRICS_PATH,
METRICS_PATH, PEER_INFORMATION_PATH, SYSTEM_INFORMATION_PATH,
server::utils::CONTENT_TYPE_TEXT, CONFIGURATION_PATH, CONSENSUS_HEALTH_CHECK_PATH,
FORGE_METRICS_PATH, JSON_METRICS_PATH, METRICS_PATH, PEER_INFORMATION_PATH,
SYSTEM_INFORMATION_PATH,
};
use hyper::{Body, StatusCode};

Expand All @@ -25,6 +26,7 @@ fn get_index_response() -> String {
index_response.push("Welcome to the Aptos Inspection Service!".into());
index_response.push("The following endpoints are available:".into());
index_response.push(format!("\t- {}", CONFIGURATION_PATH));
index_response.push(format!("\t- {}", CONSENSUS_HEALTH_CHECK_PATH));
index_response.push(format!("\t- {}", FORGE_METRICS_PATH));
index_response.push(format!("\t- {}", JSON_METRICS_PATH));
index_response.push(format!("\t- {}", METRICS_PATH));
Expand Down
38 changes: 38 additions & 0 deletions crates/aptos-inspection-service/src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,47 @@ use crate::server::{
utils,
utils::{CONTENT_TYPE_JSON, CONTENT_TYPE_TEXT},
};
use aptos_config::config::NodeConfig;
use hyper::{Body, StatusCode};
use prometheus::TextEncoder;

// The metric key for the consensus execution gauge
const CONSENSUS_EXECUTION_GAUGE: &str = "aptos_state_sync_consensus_executing_gauge{}";

/// Handles a consensus health check request. This method returns
/// 200 if the node is currently participating in consensus.
///
/// Note: we assume that this endpoint will only be used every few seconds.
pub async fn handle_consensus_health_check(node_config: &NodeConfig) -> (StatusCode, Body, String) {
// Verify the node is a validator. If not, return an error.
if !node_config.base.role.is_validator() {
return (
StatusCode::BAD_REQUEST,
Body::from("This node is not a validator!"),
CONTENT_TYPE_TEXT.into(),
);
}

// Check the value of the consensus execution gauge
let metrics = utils::get_all_metrics();
if let Some(gauge_value) = metrics.get(CONSENSUS_EXECUTION_GAUGE) {
if gauge_value == "1" {
return (
StatusCode::OK,
Body::from("Consensus health check passed!"),
CONTENT_TYPE_TEXT.into(),
);
}
}

// Otherwise, consensus is not executing
(
StatusCode::INTERNAL_SERVER_ERROR,
Body::from("Consensus health check failed! Consensus is not executing!"),
CONTENT_TYPE_TEXT.into(),
)
}

/// Handles a new forge metrics request
pub fn handle_forge_metrics() -> (StatusCode, Body, String) {
// Get and encode the metrics
Expand Down
6 changes: 6 additions & 0 deletions crates/aptos-inspection-service/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod tests;

// The list of endpoints offered by the inspection service
pub const CONFIGURATION_PATH: &str = "/configuration";
pub const CONSENSUS_HEALTH_CHECK_PATH: &str = "/consensus_health_check";
pub const FORGE_METRICS_PATH: &str = "/forge_metrics";
pub const INDEX_PATH: &str = "/";
pub const JSON_METRICS_PATH: &str = "/json_metrics";
Expand Down Expand Up @@ -111,6 +112,11 @@ async fn serve_requests(
// Exposes the node configuration
configuration::handle_configuration_request(&node_config)
},
CONSENSUS_HEALTH_CHECK_PATH => {
// /consensus_health_check
// Exposes the consensus health check
metrics::handle_consensus_health_check(&node_config).await
},
FORGE_METRICS_PATH => {
// /forge_metrics
// Exposes forge encoded metrics
Expand Down
63 changes: 38 additions & 25 deletions state-sync/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ impl<

/// Checks that state sync is making progress
async fn drive_progress(&mut self) {
// Update the executing component metrics
self.update_executing_component_metrics();

// Fetch the global data summary and verify we have active peers
let global_data_summary = self.aptos_data_client.get_global_data_summary();
if global_data_summary.is_empty() {
Expand All @@ -673,15 +676,6 @@ impl<

// If consensus or consensus observer is executing, there's nothing to do
if self.check_if_consensus_or_observer_executing() {
let executing_component = if self.driver_configuration.role.is_validator() {
ExecutingComponent::Consensus
} else {
ExecutingComponent::ConsensusObserver
};
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
executing_component.get_label(),
);
return;
}

Expand All @@ -691,10 +685,6 @@ impl<
let consensus_sync_request = self.consensus_notification_handler.get_sync_request();

// Attempt to continuously sync
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
ExecutingComponent::ContinuousSyncer.get_label(),
);
if let Err(error) = self
.continuous_syncer
.drive_progress(consensus_sync_request)
Expand All @@ -708,20 +698,43 @@ impl<
);
metrics::increment_counter(&metrics::CONTINUOUS_SYNCER_ERRORS, error.get_label());
}
} else {
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
ExecutingComponent::Bootstrapper.get_label(),
} else if let Err(error) = self.bootstrapper.drive_progress(&global_data_summary).await {
sample!(
SampleRate::Duration(Duration::from_secs(DRIVER_ERROR_LOG_FREQ_SECS)),
warn!(LogSchema::new(LogEntry::Driver)
.error(&error)
.message("Error found when checking the bootstrapper progress!"));
);
if let Err(error) = self.bootstrapper.drive_progress(&global_data_summary).await {
sample!(
SampleRate::Duration(Duration::from_secs(DRIVER_ERROR_LOG_FREQ_SECS)),
warn!(LogSchema::new(LogEntry::Driver)
.error(&error)
.message("Error found when checking the bootstrapper progress!"));
);
metrics::increment_counter(&metrics::BOOTSTRAPPER_ERRORS, error.get_label());
metrics::increment_counter(&metrics::BOOTSTRAPPER_ERRORS, error.get_label());
};
}

/// Updates the executing component metrics for the driver
fn update_executing_component_metrics(&self) {
// Determine the executing component
let executing_component = if self.check_if_consensus_or_observer_executing() {
if self.driver_configuration.role.is_validator() {
ExecutingComponent::Consensus
} else {
ExecutingComponent::ConsensusObserver
}
} else if self.bootstrapper.is_bootstrapped() {
ExecutingComponent::ContinuousSyncer
} else {
ExecutingComponent::Bootstrapper
};

// Increment the executing component counter
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
executing_component.get_label(),
);

// Set the consensus executing gauge
if executing_component == ExecutingComponent::Consensus {
metrics::CONSENSUS_EXECUTING_GAUGE.set(1);
} else {
metrics::CONSENSUS_EXECUTING_GAUGE.set(0);
}
}
}
15 changes: 13 additions & 2 deletions state-sync/state-sync-driver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

use aptos_metrics_core::{
exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec,
register_int_gauge_vec, HistogramTimer, HistogramVec, IntCounterVec, IntGaugeVec,
register_int_gauge, register_int_gauge_vec, HistogramTimer, HistogramVec, IntCounterVec,
IntGauge, IntGaugeVec,
};
use once_cell::sync::Lazy;
use std::time::Instant;
Expand Down Expand Up @@ -42,6 +43,7 @@ pub const STORAGE_SYNCHRONIZER_COMMIT_POST_PROCESSOR: &str = "commit_post_proces
pub const STORAGE_SYNCHRONIZER_STATE_SNAPSHOT_RECEIVER: &str = "state_snapshot_receiver";

/// An enum representing the component currently executing
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ExecutingComponent {
Bootstrapper,
Consensus,
Expand Down Expand Up @@ -105,6 +107,15 @@ pub static BOOTSTRAPPER_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Gauge indicating whether consensus is currently executing
pub static CONSENSUS_EXECUTING_GAUGE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_state_sync_consensus_executing_gauge",
"Gauge indicating whether consensus is currently executing"
)
.unwrap()
});

/// Gauge for state sync continuous syncer fallback mode
pub static CONTINUOUS_SYNCER_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down Expand Up @@ -146,7 +157,7 @@ pub static DRIVER_FALLBACK_MODE: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});

/// Counters related to the currently executing component
/// Counters related to the currently executing component in the main driver loop
pub static EXECUTING_COMPONENT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_state_sync_executing_component_counters",
Expand Down

0 comments on commit 96464b4

Please sign in to comment.