-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Adds Instrument Mode for InstrumentedObjectStore in datafusion-cli #18000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,36 +15,90 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use std::{fmt, sync::Arc}; | ||
| use std::{ | ||
| fmt, | ||
| str::FromStr, | ||
| sync::{ | ||
| atomic::{AtomicU8, Ordering}, | ||
| Arc, | ||
| }, | ||
| }; | ||
|
|
||
| use async_trait::async_trait; | ||
| use datafusion::execution::object_store::ObjectStoreRegistry; | ||
| use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry}; | ||
| use futures::stream::BoxStream; | ||
| use object_store::{ | ||
| path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, | ||
| ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, | ||
| }; | ||
| use url::Url; | ||
|
|
||
| /// The profiling mode to use for an [`ObjectStore`] instance that has been instrumented to collect | ||
| /// profiling data. Collecting profiling data will have a small negative impact on both CPU and | ||
| /// memory usage. Default is `Disabled` | ||
| #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] | ||
| pub enum InstrumentedObjectStoreMode { | ||
| /// Disable collection of profiling data | ||
| #[default] | ||
| Disabled, | ||
| /// Enable collection of profiling data | ||
| Enabled, | ||
| } | ||
|
|
||
| impl fmt::Display for InstrumentedObjectStoreMode { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| write!(f, "{:?}", self) | ||
| } | ||
| } | ||
|
|
||
| impl FromStr for InstrumentedObjectStoreMode { | ||
| type Err = DataFusionError; | ||
|
|
||
| fn from_str(s: &str) -> std::result::Result<Self, Self::Err> { | ||
| match s.to_lowercase().as_str() { | ||
| "disabled" => Ok(Self::Disabled), | ||
| "enabled" => Ok(Self::Enabled), | ||
| _ => Err(DataFusionError::Execution(format!("Unrecognized mode {s}"))), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl From<u8> for InstrumentedObjectStoreMode { | ||
| fn from(value: u8) -> Self { | ||
| match value { | ||
| 1 => InstrumentedObjectStoreMode::Enabled, | ||
| _ => InstrumentedObjectStoreMode::Disabled, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the | ||
| /// inner [`ObjectStore`] | ||
| #[derive(Debug)] | ||
| struct InstrumentedObjectStore { | ||
| inner: Arc<dyn ObjectStore>, | ||
| instrument_mode: AtomicU8, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This type likely seems like an odd decision in isolation, but it becomes necessary when the mode actually needs to switch behavior in the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe an Minor detail however
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes! ...but -- The actual intent here is to have this actually end up looking like: pub enum InstrumentedObjectStoreMode {
/// Disable collection of profiling data
#[default]
Disabled,
/// Enable collection of profiling data and output a summary
Summary,
/// Enable collection of profiling data and output all profiling details
Trace,
}Thus why it's an |
||
| } | ||
|
|
||
| impl InstrumentedObjectStore { | ||
| /// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`] | ||
| fn new(object_store: Arc<dyn ObjectStore>) -> Self { | ||
| fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> Self { | ||
| Self { | ||
| inner: object_store, | ||
| instrument_mode, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl fmt::Display for InstrumentedObjectStore { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| write!(f, "Instrumented Object Store: {}", self.inner) | ||
| let mode: InstrumentedObjectStoreMode = | ||
| self.instrument_mode.load(Ordering::Relaxed).into(); | ||
| write!( | ||
| f, | ||
| "Instrumented Object Store: instrument_mode: {mode}, inner: {}", | ||
| self.inner | ||
| ) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -100,13 +154,20 @@ impl ObjectStore for InstrumentedObjectStore { | |
| #[derive(Debug)] | ||
| pub struct InstrumentedObjectStoreRegistry { | ||
| inner: Arc<dyn ObjectStoreRegistry>, | ||
| instrument_mode: InstrumentedObjectStoreMode, | ||
| } | ||
|
|
||
| impl InstrumentedObjectStoreRegistry { | ||
| /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided | ||
| /// [`ObjectStoreRegistry`] | ||
| pub fn new(registry: Arc<dyn ObjectStoreRegistry>) -> Self { | ||
| Self { inner: registry } | ||
| pub fn new( | ||
| registry: Arc<dyn ObjectStoreRegistry>, | ||
| default_mode: InstrumentedObjectStoreMode, | ||
| ) -> Self { | ||
| Self { | ||
| inner: registry, | ||
| instrument_mode: default_mode, | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -116,7 +177,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { | |
| url: &Url, | ||
| store: Arc<dyn ObjectStore>, | ||
| ) -> Option<Arc<dyn ObjectStore>> { | ||
| let instrumented = Arc::new(InstrumentedObjectStore::new(store)); | ||
| let mode = AtomicU8::new(self.instrument_mode as u8); | ||
| let instrumented = Arc::new(InstrumentedObjectStore::new(store, mode)); | ||
| self.inner.register_store(url, instrumented) | ||
| } | ||
|
|
||
|
|
@@ -131,11 +193,36 @@ mod tests { | |
|
|
||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn instrumented_mode() { | ||
| assert!(matches!( | ||
| InstrumentedObjectStoreMode::default(), | ||
| InstrumentedObjectStoreMode::Disabled | ||
| )); | ||
|
|
||
| assert!(matches!( | ||
| "dIsABleD".parse().unwrap(), | ||
| InstrumentedObjectStoreMode::Disabled | ||
| )); | ||
| assert!(matches!( | ||
| "EnABlEd".parse().unwrap(), | ||
| InstrumentedObjectStoreMode::Enabled | ||
| )); | ||
| assert!("does_not_exist" | ||
| .parse::<InstrumentedObjectStoreMode>() | ||
| .is_err()); | ||
|
|
||
| assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled)); | ||
| assert!(matches!(1.into(), InstrumentedObjectStoreMode::Enabled)); | ||
| assert!(matches!(2.into(), InstrumentedObjectStoreMode::Disabled)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn instrumented_registry() { | ||
| let reg = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new( | ||
| DefaultObjectStoreRegistry::new(), | ||
| ))); | ||
| let reg = Arc::new(InstrumentedObjectStoreRegistry::new( | ||
| Arc::new(DefaultObjectStoreRegistry::new()), | ||
| InstrumentedObjectStoreMode::default(), | ||
| )); | ||
| let store = object_store::memory::InMemory::new(); | ||
|
|
||
| let url = "mem://test".parse().unwrap(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that while this is functionally "dead code" in the current implementation, it's important for parsing cli arguments/commands which will come in a follow-on PR "soon" (tm)