diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index c4bd44011dc75..c0802faf88b78 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -15,13 +15,88 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::{fmt, sync::Arc}; +use async_trait::async_trait; use datafusion::execution::object_store::ObjectStoreRegistry; -use object_store::ObjectStore; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, +}; use url::Url; -/// Provides access to wrapped [`ObjectStore`] instances that record requests for reporting +/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the +/// inner [`ObjectStore`] +#[derive(Debug)] +struct InstrumentedObjectStore { + inner: Arc, +} + +impl InstrumentedObjectStore { + /// Returns a new [`InstrumentedObjectStore`] that wraps the provided [`ObjectStore`] + fn new(object_store: Arc) -> Self { + Self { + inner: object_store, + } + } +} + +impl fmt::Display for InstrumentedObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Instrumented Object Store: {}", self.inner) + } +} + +#[async_trait] +impl ObjectStore for InstrumentedObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + self.inner.get_opts(location, options).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn head(&self, location: &Path) -> Result { + self.inner.head(location).await + } +} + +/// Provides access to [`ObjectStore`] instances that record requests for reporting #[derive(Debug)] pub struct InstrumentedObjectStoreRegistry { inner: Arc, @@ -41,7 +116,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { url: &Url, store: Arc, ) -> Option> { - self.inner.register_store(url, store) + let instrumented = Arc::new(InstrumentedObjectStore::new(store)); + self.inner.register_store(url, instrumented) } fn get_store(&self, url: &Url) -> datafusion::common::Result> {