diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index c4bd44011dc75..4c930f1d8631b 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -44,6 +44,13 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { self.inner.register_store(url, store) } + fn deregister_store( + &self, + url: &Url, + ) -> datafusion::common::Result> { + self.inner.deregister_store(url) + } + fn get_store(&self, url: &Url) -> datafusion::common::Result> { self.inner.get_store(url) } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index a8148b80495e6..eace5610e117d 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -505,6 +505,13 @@ impl SessionContext { self.runtime_env().register_object_store(url, object_store) } + /// Deregisters an [`ObjectStore`] associated with the specific URL prefix. + /// + /// See [`RuntimeEnv::deregister_object_store`] for more details. + pub fn deregister_object_store(&self, url: &Url) -> Result> { + self.runtime_env().deregister_object_store(url) + } + /// Registers the [`RecordBatch`] as the specified table name pub fn register_batch( &self, diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index ef83128ac6818..aedee7d44460d 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -20,7 +20,9 @@ //! and query data inside these systems. use dashmap::DashMap; -use datafusion_common::{exec_err, internal_datafusion_err, DataFusionError, Result}; +use datafusion_common::{ + exec_err, internal_datafusion_err, not_impl_err, DataFusionError, Result, +}; #[cfg(not(target_arch = "wasm32"))] use object_store::local::LocalFileSystem; use object_store::ObjectStore; @@ -154,6 +156,13 @@ pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static { store: Arc, ) -> Option>; + /// Deregister the store previously registered with the same key. Returns the + /// deregistered store if it existed. + #[allow(unused_variables)] + fn deregister_store(&self, url: &Url) -> Result> { + not_impl_err!("ObjectStoreRegistry::deregister_store is not implemented for this ObjectStoreRegistry") + } + /// Get a suitable store for the provided URL. For example: /// /// - URL with scheme `file:///` or no scheme will return the default LocalFS store @@ -230,6 +239,17 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { self.object_stores.insert(s, store) } + fn deregister_store(&self, url: &Url) -> Result> { + let s = get_url_key(url); + let (_, object_store) = self.object_stores + .remove(&s) + .ok_or_else(|| { + internal_datafusion_err!("Failed to deregister object store. No suitable object store found for {url}. See `RuntimeEnv::register_object_store`") + })?; + + Ok(object_store) + } + fn get_store(&self, url: &Url) -> Result> { let s = get_url_key(url); self.object_stores diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index db045a8b7e8a7..b0d0a966b7a27 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -114,8 +114,6 @@ impl RuntimeEnv { /// ``` /// /// # Example: Register remote URL object store like [Github](https://github.com) - /// - /// /// ``` /// # use std::sync::Arc; /// # use url::Url; @@ -141,6 +139,12 @@ impl RuntimeEnv { self.object_store_registry.register_store(url, object_store) } + /// Deregisters a custom `ObjectStore` previously registered for a specific url. + /// See [`ObjectStoreRegistry::deregister_store`] for more details. + pub fn deregister_object_store(&self, url: &Url) -> Result> { + self.object_store_registry.deregister_store(url) + } + /// Retrieves a `ObjectStore` instance for a url by consulting the /// registry. See [`ObjectStoreRegistry::get_store`] for more /// details.