diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 39aca4cd13206..04a8c2a0f1d84 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -33,7 +33,9 @@ use datafusion::logical_expr::ExplainFormat; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc}; -use datafusion_cli::object_storage::instrumented::InstrumentedObjectStoreRegistry; +use datafusion_cli::object_storage::instrumented::{ + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, +}; use datafusion_cli::{ exec, pool_type::PoolType, @@ -208,9 +210,10 @@ async fn main_inner() -> Result<()> { rt_builder = rt_builder.with_disk_manager_builder(builder); } - let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new( - DefaultObjectStoreRegistry::new(), - ))); + let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( + Arc::new(DefaultObjectStoreRegistry::new()), + InstrumentedObjectStoreMode::default(), + )); rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone()); let runtime_env = rt_builder.build_arc()?; diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index c0802faf88b78..f0313da3a3795 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -15,10 +15,17 @@ // specific language governing permissions and limitations // under the License. -use std::{fmt, sync::Arc}; +use std::{ + fmt, + str::FromStr, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, +}; use async_trait::async_trait; -use datafusion::execution::object_store::ObjectStoreRegistry; +use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry}; use futures::stream::BoxStream; use object_store::{ path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, @@ -26,25 +33,72 @@ use object_store::{ }; use url::Url; +/// The profiling mode to use for an [`ObjectStore`] instance that has been instrumented to collect +/// profiling data. Collecting profiling data will have a small negative impact on both CPU and +/// memory usage. Default is `Disabled` +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +pub enum InstrumentedObjectStoreMode { + /// Disable collection of profiling data + #[default] + Disabled, + /// Enable collection of profiling data + Enabled, +} + +impl fmt::Display for InstrumentedObjectStoreMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl FromStr for InstrumentedObjectStoreMode { + type Err = DataFusionError; + + fn from_str(s: &str) -> std::result::Result { + match s.to_lowercase().as_str() { + "disabled" => Ok(Self::Disabled), + "enabled" => Ok(Self::Enabled), + _ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))), + } + } +} + +impl From for InstrumentedObjectStoreMode { + fn from(value: u8) -> Self { + match value { + 1 => InstrumentedObjectStoreMode::Enabled, + _ => InstrumentedObjectStoreMode::Disabled, + } + } +} + /// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the /// inner [`ObjectStore`] #[derive(Debug)] struct InstrumentedObjectStore { inner: Arc, + instrument_mode: AtomicU8, } impl InstrumentedObjectStore { /// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`] - fn new(object_store: Arc) -> Self { + fn new(object_store: Arc, instrument_mode: AtomicU8) -> Self { Self { inner: object_store, + instrument_mode, } } } impl fmt::Display for InstrumentedObjectStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Instrumented Object Store: {}", self.inner) + let mode: InstrumentedObjectStoreMode = + self.instrument_mode.load(Ordering::Relaxed).into(); + write!( + f, + "Instrumented Object Store: instrument_mode: {mode}, inner: {}", + self.inner + ) } } @@ -100,13 +154,20 @@ impl ObjectStore for InstrumentedObjectStore { #[derive(Debug)] pub struct InstrumentedObjectStoreRegistry { inner: Arc, + instrument_mode: InstrumentedObjectStoreMode, } impl InstrumentedObjectStoreRegistry { /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided /// [`ObjectStoreRegistry`] - pub fn new(registry: Arc) -> Self { - Self { inner: registry } + pub fn new( + registry: Arc, + default_mode: InstrumentedObjectStoreMode, + ) -> Self { + Self { + inner: registry, + instrument_mode: default_mode, + } } } @@ -116,7 +177,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { url: &Url, store: Arc, ) -> Option> { - let instrumented = Arc::new(InstrumentedObjectStore::new(store)); + let mode = AtomicU8::new(self.instrument_mode as u8); + let instrumented = Arc::new(InstrumentedObjectStore::new(store, mode)); self.inner.register_store(url, instrumented) } @@ -131,11 +193,36 @@ mod tests { use super::*; + #[test] + fn instrumented_mode() { + assert!(matches!( + InstrumentedObjectStoreMode::default(), + InstrumentedObjectStoreMode::Disabled + )); + + assert!(matches!( + "dIsABleD".parse().unwrap(), + InstrumentedObjectStoreMode::Disabled + )); + assert!(matches!( + "EnABlEd".parse().unwrap(), + InstrumentedObjectStoreMode::Enabled + )); + assert!("does_not_exist" + .parse::() + .is_err()); + + assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled)); + assert!(matches!(1.into(), InstrumentedObjectStoreMode::Enabled)); + assert!(matches!(2.into(), InstrumentedObjectStoreMode::Disabled)); + } + #[test] fn instrumented_registry() { - let reg = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new( - DefaultObjectStoreRegistry::new(), - ))); + let reg = Arc::new(InstrumentedObjectStoreRegistry::new( + Arc::new(DefaultObjectStoreRegistry::new()), + InstrumentedObjectStoreMode::default(), + )); let store = object_store::memory::InMemory::new(); let url = "mem://test".parse().unwrap();