diff --git a/Cargo.lock b/Cargo.lock index 65ef61e9a1596..a19a9a8cbf26b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1161,9 +1161,9 @@ dependencies = [ [[package]] name = "bon" -version = "3.7.1" +version = "3.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "537c317ddf588aab15c695bf92cf55dec159b93221c074180ca3e0e5a94da415" +checksum = "c2529c31017402be841eb45892278a6c21a000c0a17643af326c73a73f83f0fb" dependencies = [ "bon-macros", "rustversion", @@ -1171,9 +1171,9 @@ dependencies = [ [[package]] name = "bon-macros" -version = "3.7.1" +version = "3.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca5abbf2d4a4c6896197c9de13d6d7cb7eff438c63dacde1dde980569cb00248" +checksum = "d82020dadcb845a345591863adb65d74fa8dc5c18a0b6d408470e13b7adc7005" dependencies = [ "darling 0.21.3", "ident_case", @@ -1989,6 +1989,7 @@ dependencies = [ "async-trait", "aws-config", "aws-credential-types", + "chrono", "clap 4.5.47", "ctor", "datafusion", @@ -4057,9 +4058,9 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libmimalloc-sys" -version = "0.1.44" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870" +checksum = "bf88cd67e9de251c1781dbe2f641a1a3ad66eaae831b8a2c38fbdc5ddae16d4d" dependencies = [ "cc", "cty", @@ -4191,9 +4192,9 @@ dependencies = [ [[package]] name = "mimalloc" -version = "0.1.48" +version = "0.1.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8" +checksum = "b1791cbe101e95af5764f06f20f6760521f7158f69dbf9d6baf941ee1bf6bc40" dependencies = [ "libmimalloc-sys", ] @@ -4294,11 +4295,12 @@ dependencies = [ [[package]] name = "nu-ansi-term" -version = "0.50.1" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" dependencies = [ - "windows-sys 0.52.0", + "overload", + "winapi", ] [[package]] @@ -4492,6 +4494,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "owo-colors" version = "4.2.2" @@ -5344,9 +5352,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.2" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", @@ -5673,9 +5681,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "rustyline" -version = "17.0.1" +version = "17.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6614df0b6d4cfb20d1d5e295332921793ce499af3ebc011bf1e393380e1e492" +checksum = "ed34fbd08950d17f8297e738d5b76acd4baab50c8d45008d498b4327feb43ea1" dependencies = [ "bitflags 2.9.1", "cfg-if", @@ -5857,9 +5865,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.143" +version = "1.0.142" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a" +checksum = "030fedb782600dcbd6f02d479bf0d817ac3bb40d644745b769d6a96bc3afc5a7" dependencies = [ "itoa", "memchr", @@ -6769,9 +6777,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.20" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "nu-ansi-term", "sharded-slab", @@ -6974,9 +6982,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.1" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" dependencies = [ "getrandom 0.3.3", "js-sys", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 394adbb6105f3..5b36d38ba5c87 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -39,6 +39,7 @@ arrow = { workspace = true } async-trait = { workspace = true } aws-config = "1.8.6" aws-credential-types = "1.2.6" +chrono = { workspace = true } clap = { version = "4.5.47", features = ["derive", "cargo"] } datafusion = { workspace = true, features = [ "avro", diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 77bc8d3d20003..3583622d5e502 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -46,6 +46,7 @@ pub enum Command { SearchFunctions(String), QuietMode(Option), OutputFormat(Option), + ObjectStoreProfileMode(Option), } pub enum OutputFormat { @@ -124,6 +125,29 @@ impl Command { Self::OutputFormat(_) => exec_err!( "Unexpected change output format, this should be handled outside" ), + Self::ObjectStoreProfileMode(mode) => { + if let Some(mode) = mode { + print_options.object_store_profile_mode = mode + .parse() + .map_err(|_| DataFusionError::Execution( + format!("Failed to parse input: {mode}. Valid options are disabled, summary, trace") + ))?; + print_options + .instrumented_registry + .set_instrument_mode(print_options.object_store_profile_mode); + println!( + "ObjectStore Profile mode set to {:?}", + print_options.object_store_profile_mode + ); + } else { + println!( + "ObjectStore Profile mode is {:?}", + print_options.object_store_profile_mode + ); + } + + Ok(()) + } } } @@ -142,11 +166,15 @@ impl Command { Self::OutputFormat(_) => { ("\\pset [NAME [VALUE]]", "set table output option\n(format)") } + Self::ObjectStoreProfileMode(_) => ( + "\\object_store_profiling (disabled|summary|trace)", + "print or set object store profile mode", + ), } } } -const ALL_COMMANDS: [Command; 9] = [ +const ALL_COMMANDS: [Command; 10] = [ Command::ListTables, Command::DescribeTableStmt(String::new()), Command::Quit, @@ -156,6 +184,7 @@ const ALL_COMMANDS: [Command; 9] = [ Command::SearchFunctions(String::new()), Command::QuietMode(None), Command::OutputFormat(None), + Command::ObjectStoreProfileMode(None), ]; fn all_commands_info() -> RecordBatch { @@ -206,6 +235,10 @@ impl FromStr for Command { Self::OutputFormat(Some(subcommand.to_string())) } ("pset", None) => Self::OutputFormat(None), + ("object_store_profiling", Some(mode)) => { + Self::ObjectStoreProfileMode(Some(mode.to_string())) + } + ("object_store_profiling", None) => Self::ObjectStoreProfileMode(None), _ => return Err(()), }) } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index a28e97a9f88ec..31a70315a17dc 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -27,10 +27,14 @@ use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; +use datafusion::execution::object_store::DefaultObjectStoreRegistry; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc}; +use datafusion_cli::object_storage::instrumented::{ + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, +}; use datafusion_cli::{ exec, pool_type::PoolType, @@ -144,6 +148,13 @@ struct Args { value_parser(extract_disk_limit) )] disk_limit: Option, + + #[clap( + long, + help = "Specify the default object_store_profiling mode, defaults to 'disabled'.\n[possible values: disabled, summary, trace]", + default_value_t = InstrumentedObjectStoreMode::Disabled + )] + object_store_profiling: InstrumentedObjectStoreMode, } #[tokio::main] @@ -205,6 +216,12 @@ async fn main_inner() -> Result<()> { rt_builder = rt_builder.with_disk_manager_builder(builder); } + let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( + Arc::new(DefaultObjectStoreRegistry::new()), + args.object_store_profiling, + )); + rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone()); + let runtime_env = rt_builder.build_arc()?; // enable dynamic file query @@ -232,6 +249,8 @@ async fn main_inner() -> Result<()> { quiet: args.quiet, maxrows: args.maxrows, color: args.color, + object_store_profile_mode: args.object_store_profiling, + instrumented_registry: Arc::clone(&instrumented_registry), }; let commands = args.command; diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index de33e11fe0100..839f90b3d743c 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -563,6 +563,593 @@ pub(crate) async fn get_object_store( Ok(store) } +pub mod instrumented { + use core::fmt; + use std::{ + cmp, default, + ops::AddAssign, + str::FromStr, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, + time::Duration, + }; + + use async_trait::async_trait; + use chrono::Utc; + use datafusion::{ + common::{instant::Instant, HashMap}, + error::DataFusionError, + execution::object_store::ObjectStoreRegistry, + }; + use futures::stream::BoxStream; + use object_store::{ + path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, + ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result, + }; + use parking_lot::{Mutex, RwLock}; + use url::Url; + + #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] + enum Operation { + Copy, + Delete, + Get, + Head, + List, + Put, + } + + #[derive(Debug)] + struct RequestDetails { + op: Operation, + path: Path, + timestamp: chrono::DateTime, + duration: Option, + size: Option, + range: Option, + extra_display: Option, + } + + impl fmt::Display for RequestDetails { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut output_parts = vec![format!( + "{} operation={:?}", + self.timestamp.to_rfc3339(), + self.op + )]; + + if let Some(d) = self.duration { + output_parts.push(format!("duration={:.6}s", d.as_secs_f32())); + } + if let Some(s) = self.size { + output_parts.push(format!("size={}", s)); + } + if let Some(r) = &self.range { + output_parts.push(format!("range: {}", r)); + } + output_parts.push(format!("path={}", self.path)); + + if let Some(ed) = &self.extra_display { + output_parts.push(ed.clone()); + } + + write!(f, "{}", output_parts.join(" ")) + } + } + + #[derive(Default)] + struct RequestSummary { + count: usize, + duration_stats: Option>, + size_stats: Option>, + } + + impl RequestSummary { + fn push(&mut self, request: &RequestDetails) { + self.count += 1; + if let Some(dur) = request.duration { + self.duration_stats.get_or_insert_default().push(dur) + } + if let Some(size) = request.size { + self.size_stats.get_or_insert_default().push(size) + } + } + } + + impl fmt::Display for RequestSummary { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, " count: {}", self.count)?; + + if let Some(dur_stats) = &self.duration_stats { + writeln!(f, " duration min: {:.6}s", dur_stats.min.as_secs_f32())?; + writeln!(f, " duration max: {:.6}s", dur_stats.max.as_secs_f32())?; + let avg = dur_stats.sum.as_secs_f32() / (self.count as f32); + writeln!(f, " duration avg: {:.6}s", avg)?; + } + + if let Some(size_stats) = &self.size_stats { + writeln!(f, " size min: {} B", size_stats.min)?; + writeln!(f, " size max: {} B", size_stats.max)?; + let avg = size_stats.sum / self.count; + writeln!(f, " size avg: {} B", avg)?; + writeln!(f, " size sum: {} B", size_stats.sum)?; + } + + Ok(()) + } + } + + struct Stats> { + min: T, + max: T, + sum: T, + } + + impl> Stats { + fn push(&mut self, val: T) { + self.min = cmp::min(val, self.min); + self.max = cmp::max(val, self.max); + self.sum += val; + } + } + + impl default::Default for Stats { + fn default() -> Self { + Self { + min: Duration::MAX, + max: Duration::ZERO, + sum: Duration::ZERO, + } + } + } + + impl default::Default for Stats { + fn default() -> Self { + Self { + min: usize::MAX, + max: usize::MIN, + sum: 0, + } + } + } + + #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] + pub enum InstrumentedObjectStoreMode { + #[default] + Disabled, + Summary, + Trace, + } + + impl fmt::Display for InstrumentedObjectStoreMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } + } + + impl FromStr for InstrumentedObjectStoreMode { + type Err = DataFusionError; + + fn from_str(s: &str) -> std::result::Result { + match s.to_lowercase().as_str() { + "disabled" => Ok(Self::Disabled), + "summary" => Ok(Self::Summary), + "trace" => Ok(Self::Trace), + _ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))), + } + } + } + + #[derive(Debug)] + pub struct InstrumentedObjectStore { + inner: Arc, + instrument_mode: AtomicU8, + requests: Mutex>, + } + + impl fmt::Display for InstrumentedObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut summaries = HashMap::new(); + let mut reqs = self.requests.lock(); + for rd in reqs.drain(..) { + match summaries.get_mut(&rd.op) { + None => { + let mut summary = RequestSummary::default(); + summary.push(&rd); + summaries.insert(rd.op, summary); + } + Some(summary) => summary.push(&rd), + } + + if self.instrument_mode.load(Ordering::Relaxed) + == InstrumentedObjectStoreMode::Trace as u8 + { + writeln!(f, "{rd}")?; + } + } + + if self.instrument_mode.load(Ordering::Relaxed) + >= InstrumentedObjectStoreMode::Summary as u8 + { + for (op, summary) in summaries.iter() { + writeln!(f, "{:?} Summary:", op)?; + writeln!(f, "{summary}")?; + } + } + + Ok(()) + } + } + + impl InstrumentedObjectStore { + pub fn new( + object_store: Arc, + instrument_mode: AtomicU8, + ) -> Self { + Self { + inner: object_store, + instrument_mode, + requests: Mutex::new(Vec::new()), + } + } + + pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) { + self.instrument_mode.store(mode as u8, Ordering::Relaxed) + } + + async fn inst_put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + let timestamp = Utc::now(); + let start = Instant::now(); + let size = payload.content_length(); + let ret = self.inner.put_opts(location, payload, opts).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Put, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: Some(size), + range: None, + extra_display: None, + }); + + Ok(ret) + } + + async fn inst_put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + let timestamp = Utc::now(); + let start = Instant::now(); + let ret = self.inner.put_multipart_opts(location, opts).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Put, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: None, + }); + + Ok(ret) + } + + async fn inst_get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> Result { + let timestamp = Utc::now(); + let range = options.range.clone(); + let start = Instant::now(); + let ret = self.inner.get_opts(location, options).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Get, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: Some((ret.range.end - ret.range.start) as usize), + range, + extra_display: None, + }); + + Ok(ret) + } + + async fn inst_delete(&self, location: &Path) -> Result<()> { + let timestamp = Utc::now(); + let start = Instant::now(); + self.inner.delete(location).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Delete, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: None, + }); + + Ok(()) + } + + fn inst_list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, Result> { + let timestamp = Utc::now(); + let ret = self.inner.list(prefix); + + self.requests.lock().push(RequestDetails { + op: Operation::List, + path: prefix.cloned().unwrap_or_else(|| Path::from("")), + timestamp, + duration: None, // list returns a future, so the duration isn't meaningful + size: None, + range: None, + extra_display: None, + }); + + ret + } + + async fn inst_list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> Result { + let timestamp = Utc::now(); + let start = Instant::now(); + let ret = self.inner.list_with_delimiter(prefix).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::List, + path: prefix.cloned().unwrap_or_else(|| Path::from("")), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: None, + }); + + Ok(ret) + } + + async fn inst_copy(&self, from: &Path, to: &Path) -> Result<()> { + let timestamp = Utc::now(); + let start = Instant::now(); + self.inner.copy(from, to).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Copy, + path: from.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: Some(format!("copy_to: {to}")), + }); + + Ok(()) + } + + async fn inst_copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let timestamp = Utc::now(); + let start = Instant::now(); + self.inner.copy_if_not_exists(from, to).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Copy, + path: from.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: Some(format!("copy_to: {to}")), + }); + + Ok(()) + } + + async fn inst_head(&self, location: &Path) -> Result { + let timestamp = Utc::now(); + let start = Instant::now(); + let ret = self.inner.head(location).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Head, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: None, + }); + + Ok(ret) + } + } + + #[async_trait] + impl ObjectStore for InstrumentedObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + if self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + { + return self.inst_put_opts(location, payload, opts).await; + } + + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + if self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + { + return self.inst_put_multipart_opts(location, opts).await; + } + + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> Result { + if self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + { + return self.inst_get_opts(location, options).await; + } + + self.inner.get_opts(location, options).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + if self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + { + return self.inst_delete(location).await; + } + + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + if self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + { + return self.inst_list(prefix); + } + + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + if self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + { + return self.inst_list_with_delimiter(prefix).await; + } + + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + if self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + { + return self.inst_copy(from, to).await; + } + + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + if self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + { + return self.inst_copy_if_not_exists(from, to).await; + } + + self.inner.copy_if_not_exists(from, to).await + } + + async fn head(&self, location: &Path) -> Result { + if self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + { + return self.inst_head(location).await; + } + + self.inner.head(location).await + } + } + + #[derive(Debug)] + pub struct InstrumentedObjectStoreRegistry { + inner: Arc, + instrument_mode: AtomicU8, + stores: RwLock>>, + } + + impl InstrumentedObjectStoreRegistry { + pub fn new( + registry: Arc, + default_mode: InstrumentedObjectStoreMode, + ) -> Self { + Self { + inner: registry, + instrument_mode: AtomicU8::new(default_mode as u8), + stores: RwLock::new(Vec::new()), + } + } + + pub fn stores(&self) -> Vec> { + self.stores.read().clone() + } + + pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) { + self.instrument_mode.store(mode as u8, Ordering::Relaxed); + for s in self.stores.read().iter() { + s.set_instrument_mode(mode) + } + } + } + + impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { + fn register_store( + &self, + url: &Url, + store: Arc, + ) -> Option> { + let mode = self.instrument_mode.load(Ordering::Relaxed); + let instrumented = + Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode))); + self.stores.write().push(Arc::clone(&instrumented)); + self.inner.register_store(url, instrumented) + } + + fn get_store( + &self, + url: &Url, + ) -> datafusion::common::Result> { + self.inner.get_store(url) + } + } +} + #[cfg(test)] mod tests { use crate::cli_context::CliSessionContext; diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 56d787b0fe087..bb4b09c46e3ff 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -16,10 +16,14 @@ // under the License. use std::fmt::{Display, Formatter}; -use std::io::Write; +use std::io; use std::pin::Pin; use std::str::FromStr; +use std::sync::Arc; +use crate::object_storage::instrumented::{ + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, +}; use crate::print_format::PrintFormat; use arrow::datatypes::SchemaRef; @@ -73,6 +77,8 @@ pub struct PrintOptions { pub quiet: bool, pub maxrows: MaxRows, pub color: bool, + pub object_store_profile_mode: InstrumentedObjectStoreMode, + pub instrumented_registry: Arc, } // Returns the query execution details formatted @@ -128,11 +134,7 @@ impl PrintOptions { query_start_time, ); - if !self.quiet { - writeln!(writer, "{formatted_exec_details}")?; - } - - Ok(()) + self.write_output(&mut writer, formatted_exec_details) } /// Print the stream to stdout using the specified format @@ -174,8 +176,23 @@ impl PrintOptions { query_start_time, ); + self.write_output(&mut writer, formatted_exec_details) + } + + fn write_output( + &self, + writer: &mut W, + formatted_exec_details: String, + ) -> Result<()> { if !self.quiet { writeln!(writer, "{formatted_exec_details}")?; + + if self.object_store_profile_mode != InstrumentedObjectStoreMode::Disabled { + writeln!(writer, "Object Store Profiling")?; + for store in self.instrumented_registry.stores() { + write!(writer, "{store}")?; + } + } } Ok(())