Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): display scan rows/bytes, write rows/bytes in global counter metrics #17266

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/base/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use net::get_free_udp_port;
pub use ordered_float::OrderedFloat;
pub use profiling::Profiling;
pub use progress::Progress;
pub use progress::ProgressHook;
pub use progress::ProgressValues;
pub use progress::SpillProgress;
pub use select::select3;
Expand Down
19 changes: 19 additions & 0 deletions src/common/base/src/base/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,43 @@ pub struct ProgressValues {
pub bytes: usize,
}

/// [`ProgressHook`] can be used to hook the progress to update the global metrics
/// whenever the progress is updated.
pub trait ProgressHook: std::fmt::Debug + Send + Sync {
fn incr(&self, progress_values: &ProgressValues);
}

#[derive(Debug)]
pub struct Progress {
rows: AtomicUsize,
bytes: AtomicUsize,
hook: Option<Box<dyn ProgressHook>>,
}

impl Progress {
pub fn create() -> Self {
Self {
rows: AtomicUsize::new(0),
bytes: AtomicUsize::new(0),
hook: None,
}
}

pub fn with_hook(self, hook: Box<dyn ProgressHook>) -> Self {
Self {
rows: self.rows,
bytes: self.bytes,
hook: Some(hook),
}
}

pub fn incr(&self, progress_values: &ProgressValues) {
self.rows.fetch_add(progress_values.rows, Ordering::Relaxed);
self.bytes
.fetch_add(progress_values.bytes, Ordering::Relaxed);
if let Some(hook) = &self.hook {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might result in double counting if cluster mode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help explain more about this?

hook.incr(progress_values);
}
}

pub fn set(&self, progress_values: &ProgressValues) {
Expand Down
3 changes: 3 additions & 0 deletions src/common/base/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ mod registry;
mod sample;

pub use counter::Counter;
pub use family_metrics::FamilyCounter as InnerFamilyCounter;
pub use family_metrics::FamilyGauge as InnerFamilyGauge;
pub use family_metrics::FamilyHistogram as InnerFamilyHistogram;
pub use gauge::Gauge;
pub use histogram::Histogram;
pub use histogram::BUCKET_MILLISECONDS;
Expand Down
13 changes: 13 additions & 0 deletions src/common/metrics/src/metrics/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const METRIC_QUERY_SCAN_PARTITIONS: &str = "query_scan_partitions";
const METRIC_QUERY_TOTAL_PARTITIONS: &str = "query_total_partitions";
const METRIC_QUERY_RESULT_ROWS: &str = "query_result_rows";
const METRIC_QUERY_RESULT_BYTES: &str = "query_result_bytes";
const METRIC_QUERY_PROGRESS_SCAN_ROWS: &str = "query_progress_scan_rows";
const METRIC_QUERY_PROGRESS_SCAN_BYTES: &str = "query_progress_scan_bytes";
const METRIC_QUERY_PROGRESS_WRITE_ROWS: &str = "query_progress_write_rows";
const METRIC_QUERY_PROGRESS_WRITE_BYTES: &str = "query_progress_write_bytes";

pub static QUERY_START: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_QUERY_START));
Expand Down Expand Up @@ -73,3 +77,12 @@ pub static QUERY_RESULT_ROWS: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_QUERY_RESULT_ROWS));
pub static QUERY_RESULT_BYTES: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_QUERY_RESULT_BYTES));

pub static QUERY_PROGRESS_SCAN_ROWS: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_QUERY_PROGRESS_SCAN_ROWS));
pub static QUERY_PROGRESS_SCAN_BYTES: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_QUERY_PROGRESS_SCAN_BYTES));
pub static QUERY_PROGRESS_WRITE_ROWS: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_QUERY_PROGRESS_WRITE_ROWS));
pub static QUERY_PROGRESS_WRITE_BYTES: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_QUERY_PROGRESS_WRITE_BYTES));
86 changes: 84 additions & 2 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use std::time::SystemTime;
use dashmap::DashMap;
use databend_common_base::base::short_sql;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressHook;
use databend_common_base::base::ProgressValues;
use databend_common_base::base::SpillProgress;
use databend_common_base::runtime::drop_guard;
use databend_common_base::runtime::metrics::InnerFamilyCounter;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::catalog::Catalog;
use databend_common_catalog::catalog::CatalogManager;
Expand Down Expand Up @@ -154,6 +157,10 @@ impl QueryContextShared {
session: Arc<Session>,
cluster_cache: Arc<Cluster>,
) -> Result<Arc<QueryContextShared>> {
let progress_metrics = QueryProgressMetrics::new(
session.get_current_tenant().tenant_name(),
&cluster_cache.local_id,
);
Ok(Arc::new(QueryContextShared {
query_settings: Settings::create(session.get_current_tenant()),
catalog_manager: CatalogManager::instance(),
Expand All @@ -162,9 +169,13 @@ impl QueryContextShared {
data_operator: DataOperator::instance(),
init_query_id: Arc::new(RwLock::new(Uuid::new_v4().to_string())),
total_scan_values: Arc::new(Progress::create()),
scan_progress: Arc::new(Progress::create()),
scan_progress: Arc::new(
Progress::create().with_hook(progress_metrics.scan_progress_hook()),
),
result_progress: Arc::new(Progress::create()),
write_progress: Arc::new(Progress::create()),
write_progress: Arc::new(
Progress::create().with_hook(progress_metrics.write_progress_hook()),
),
error: Arc::new(Mutex::new(None)),
warnings: Arc::new(Mutex::new(vec![])),
runtime: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -661,3 +672,74 @@ impl Drop for QueryContextShared {
})
}
}

struct QueryProgressMetrics {
scan_rows: Arc<InnerFamilyCounter<Vec<(&'static str, String)>>>,
scan_bytes: Arc<InnerFamilyCounter<Vec<(&'static str, String)>>>,
write_rows: Arc<InnerFamilyCounter<Vec<(&'static str, String)>>>,
write_bytes: Arc<InnerFamilyCounter<Vec<(&'static str, String)>>>,
}

impl QueryProgressMetrics {
fn new(tenant: &str, cluster: &str) -> Self {
let common_labels = vec![
("tenant", tenant.to_string()),
("cluster", cluster.to_string()),
];

let scan_rows = databend_common_metrics::interpreter::QUERY_PROGRESS_SCAN_ROWS
.get_or_create(&common_labels);
let scan_bytes = databend_common_metrics::interpreter::QUERY_PROGRESS_SCAN_BYTES
.get_or_create(&common_labels);
let write_rows = databend_common_metrics::interpreter::QUERY_PROGRESS_WRITE_ROWS
.get_or_create(&common_labels);
let write_bytes = databend_common_metrics::interpreter::QUERY_PROGRESS_WRITE_BYTES
.get_or_create(&common_labels);

Self {
scan_rows,
scan_bytes,
write_rows,
write_bytes,
}
}

fn scan_progress_hook(&self) -> Box<QueryProgressMetricsHook> {
Box::new(QueryProgressMetricsHook::new(
self.scan_rows.clone(),
self.scan_bytes.clone(),
))
}

fn write_progress_hook(&self) -> Box<QueryProgressMetricsHook> {
Box::new(QueryProgressMetricsHook::new(
self.write_rows.clone(),
self.write_bytes.clone(),
))
}
}

#[derive(Debug)]
pub struct QueryProgressMetricsHook {
rows_metrics: Arc<InnerFamilyCounter<Vec<(&'static str, String)>>>,
bytes_metrics: Arc<InnerFamilyCounter<Vec<(&'static str, String)>>>,
}

impl QueryProgressMetricsHook {
pub fn new(
rows_metrics: Arc<InnerFamilyCounter<Vec<(&'static str, String)>>>,
bytes_metrics: Arc<InnerFamilyCounter<Vec<(&'static str, String)>>>,
) -> Self {
Self {
rows_metrics,
bytes_metrics,
}
}
}

impl ProgressHook for QueryProgressMetricsHook {
fn incr(&self, progress_values: &ProgressValues) {
self.rows_metrics.inc_by(progress_values.rows as u64);
self.bytes_metrics.inc_by(progress_values.bytes as u64);
}
}
Loading