diff --git a/Cargo.lock b/Cargo.lock index 2831b36390391..0b5089d5056a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2806,6 +2806,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "pprof", + "procfs 0.16.0", "prometheus-client", "prometheus-parse", "rand 0.8.5", @@ -11113,6 +11114,32 @@ dependencies = [ "rustix 0.36.17", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags 2.4.2", + "chrono", + "flate2", + "hex", + "lazy_static", + "procfs-core", + "rustix 0.38.31", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags 2.4.2", + "chrono", + "hex", +] + [[package]] name = "prodash" version = "28.0.0" diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 7d294c83525d8..824ef6a0277da 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -58,6 +58,9 @@ tikv-jemalloc-sys = "0.5.2" tokio = { workspace = true } uuid = { workspace = true } +[target.'cfg(target_os = "linux")'.dependencies] +procfs = { version = "^0.16" } + [dev-dependencies] anyerror = { workspace = true } anyhow = { workspace = true } diff --git a/src/common/base/src/runtime/metrics/mod.rs b/src/common/base/src/runtime/metrics/mod.rs index e6c915037f471..3d1563c22be55 100644 --- a/src/common/base/src/runtime/metrics/mod.rs +++ b/src/common/base/src/runtime/metrics/mod.rs @@ -17,6 +17,7 @@ mod family; mod family_metrics; mod gauge; mod histogram; +mod process_collector; mod registry; mod sample; @@ -25,6 +26,7 @@ pub use gauge::Gauge; pub use histogram::Histogram; pub use histogram::BUCKET_MILLISECONDS; pub use histogram::BUCKET_SECONDS; +pub use process_collector::dump_process_stat; pub use registry::register_counter; pub use registry::register_counter_family; pub use registry::register_gauge; diff --git a/src/common/base/src/runtime/metrics/process_collector.rs b/src/common/base/src/runtime/metrics/process_collector.rs new file mode 100644 index 0000000000000..3da0e2779950c --- /dev/null +++ b/src/common/base/src/runtime/metrics/process_collector.rs @@ -0,0 +1,181 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use prometheus_client::collector::Collector; +use prometheus_client::encoding::EncodeMetric; +use prometheus_client::metrics::counter::ConstCounter; +use prometheus_client::metrics::gauge::ConstGauge; + +#[derive(Debug)] +pub struct ProcessCollector {} + +impl ProcessCollector { + pub fn new() -> Box { + Box::new(ProcessCollector {}) + } +} + +impl Collector for ProcessCollector { + fn encode( + &self, + mut encoder: prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let stat = match dump_process_stat() { + Some(stat) => stat, + None => return Ok(()), + }; + + let cpu_secs = ConstCounter::new(stat.cpu_secs); + let cpu_secs_encoder = encoder.encode_descriptor( + "process_cpu_seconds_total", + "Total user and system CPU time spent in seconds.", + None, + cpu_secs.metric_type(), + )?; + cpu_secs.encode(cpu_secs_encoder)?; + + let open_fds = ConstGauge::new(stat.open_fds as f64); + let open_fds_encoder = encoder.encode_descriptor( + "process_open_fds", + "Number of open file descriptors.", + None, + open_fds.metric_type(), + )?; + open_fds.encode(open_fds_encoder)?; + + let max_fds = ConstGauge::new(stat.max_fds as f64); + let max_fds_encoder = encoder.encode_descriptor( + "process_max_fds", + "Maximum number of open file descriptors.", + None, + max_fds.metric_type(), + )?; + max_fds.encode(max_fds_encoder)?; + + let vsize = ConstGauge::new(stat.vsize as f64); + let vsize_encoder = encoder.encode_descriptor( + "process_virtual_memory_bytes", + "Virtual memory size in bytes.", + None, + vsize.metric_type(), + )?; + vsize.encode(vsize_encoder)?; + + let rss = ConstGauge::new(stat.rss as f64); + let rss_encoder = encoder.encode_descriptor( + "process_resident_memory_bytes", + "Resident memory size in bytes.", + None, + rss.metric_type(), + )?; + rss.encode(rss_encoder)?; + + let start_time = ConstGauge::new(stat.start_time as f64); + let start_time_encoder = encoder.encode_descriptor( + "process_start_time_seconds", + "Start time of the process since unix epoch in seconds.", + None, + start_time.metric_type(), + )?; + start_time.encode(start_time_encoder)?; + + let threads_num = ConstGauge::new(stat.threads_num as f64); + let threads_num_encoder = encoder.encode_descriptor( + "process_threads", + "Number of OS threads in the process.", + None, + threads_num.metric_type(), + )?; + threads_num.encode(threads_num_encoder)?; + + Ok(()) + } +} + +#[derive(Clone, Default)] +pub struct ProcessStat { + pub cpu_secs: u64, + pub open_fds: u64, + pub max_fds: u64, + pub vsize: u64, + pub rss: u64, + pub start_time: i64, + pub threads_num: usize, +} + +pub fn dump_process_stat() -> Option { + #[cfg(target_os = "linux")] + { + dump_linux_process_stat() + } + + #[cfg(not(target_os = "linux"))] + { + None + } +} + +#[cfg(target_os = "linux")] +fn dump_linux_process_stat() -> Option { + let proc = match procfs::process::Process::myself() { + Ok(p) => p, + Err(_) => { + return None; + } + }; + let stat = match proc.stat() { + Ok(stat) => stat, + Err(_) => { + return None; + } + }; + + // constants + let clk_tck: i64 = unsafe { libc::sysconf(libc::_SC_CLK_TCK) }; + let page_size: i64 = unsafe { libc::sysconf(libc::_SC_PAGESIZE) }; + + // fds + let open_fds = proc.fd_count().unwrap_or(0) as u64; + let max_fds = if let Ok(limits) = proc.limits() { + match limits.max_open_files.soft_limit { + procfs::process::LimitValue::Value(v) => v, + procfs::process::LimitValue::Unlimited => 0, + } + } else { + 0 + }; + + // memory + let vsize = stat.vsize; + let rss = stat.rss * (page_size as u64); + + // cpu time + let cpu_secs = (stat.utime + stat.stime) / clk_tck as u64; + + // start time + let start_time = stat.starttime as i64 * clk_tck; + + // threads + let threads_num = stat.num_threads as usize; + + Some(ProcessStat { + open_fds, + max_fds, + vsize, + rss, + cpu_secs, + start_time, + threads_num, + }) +} diff --git a/src/common/base/src/runtime/metrics/registry.rs b/src/common/base/src/runtime/metrics/registry.rs index 2249d120dc240..c071f98021888 100644 --- a/src/common/base/src/runtime/metrics/registry.rs +++ b/src/common/base/src/runtime/metrics/registry.rs @@ -39,6 +39,7 @@ use crate::runtime::metrics::gauge::Gauge; use crate::runtime::metrics::histogram::Histogram; use crate::runtime::metrics::histogram::BUCKET_MILLISECONDS; use crate::runtime::metrics::histogram::BUCKET_SECONDS; +use crate::runtime::metrics::process_collector::ProcessCollector; use crate::runtime::metrics::sample::MetricSample; use crate::runtime::ThreadTracker; @@ -85,10 +86,12 @@ unsafe impl Sync for GlobalRegistry {} impl GlobalRegistry { pub fn create() -> GlobalRegistry { + let mut registry = Registry::with_prefix("databend"); + registry.register_collector(ProcessCollector::new()); GlobalRegistry { inner: Mutex::new(GlobalRegistryInner { metrics: vec![], - registry: Registry::with_prefix("databend"), + registry, }), } } diff --git a/src/query/service/tests/it/metrics.rs b/src/query/service/tests/it/metrics.rs index 0218f2b4abaa8..b4382b9b8968e 100644 --- a/src/query/service/tests/it/metrics.rs +++ b/src/query/service/tests/it/metrics.rs @@ -15,6 +15,7 @@ use std::net::SocketAddr; use databend_common_base::base::tokio; +use databend_common_base::runtime::metrics::dump_process_stat; use databend_common_base::runtime::metrics::register_counter; use databend_query::servers::metrics::MetricService; use databend_query::servers::Server; @@ -47,3 +48,14 @@ async fn test_metric_server() -> databend_common_exception::Result<()> { Ok(()) } + +#[cfg(target_os = "linux")] +#[test] +fn test_process_collector() { + let stat = dump_process_stat().unwrap(); + + assert!(stat.max_fds > 0); + assert!(stat.vsize > 0); + assert!(stat.rss > 0); + assert!(stat.threads_num > 0); +}