diff --git a/Cargo.lock b/Cargo.lock index eafb451a29b..108de5bbede 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4813,8 +4813,9 @@ dependencies = [ "bytes", "lance", "lance-core", + "lance-io", "lance-namespace", - "opendal", + "object_store", "reqwest", "serde_json", "snafu", diff --git a/rust/lance-namespace-impls/Cargo.toml b/rust/lance-namespace-impls/Cargo.toml index dab3ebeac30..18c3d9ac526 100644 --- a/rust/lance-namespace-impls/Cargo.toml +++ b/rust/lance-namespace-impls/Cargo.toml @@ -14,7 +14,11 @@ rust-version.workspace = true [features] default = [] rest = ["dep:reqwest", "dep:serde_json", "dep:url"] -dir = ["dep:lance", "dep:opendal", "dep:arrow", "dep:arrow-ipc", "dep:arrow-schema", "dep:url"] +# Cloud storage features for directory implementation - align with lance-io +dir-gcp = ["lance-io/gcp"] +dir-aws = ["lance-io/aws"] +dir-azure = ["lance-io/azure"] +dir-oss = ["lance-io/oss"] [dependencies] lance-namespace.workspace = true @@ -30,15 +34,16 @@ reqwest = { version = "0.12", optional = true, default-features = false, feature "rustls-tls-native-roots" ] } serde_json = { workspace = true, optional = true } - -# Directory implementation dependencies (optional, enabled by "dir" feature) -lance = { workspace = true, optional = true } -opendal = { workspace = true, optional = true, features = ["services-fs", "services-s3", "services-gcs", "services-azblob"] } -arrow = { workspace = true, optional = true } -arrow-ipc = { workspace = true, optional = true } -arrow-schema = { workspace = true, optional = true } url = { workspace = true, optional = true } +# Directory implementation dependencies (always enabled) +lance = { workspace = true } +lance-io = { workspace = true } +object_store = { workspace = true } +arrow = { workspace = true } +arrow-ipc = { workspace = true } +arrow-schema = { workspace = true } + # Common dependencies async-trait.workspace = true bytes.workspace = true diff --git a/rust/lance-namespace-impls/README.md b/rust/lance-namespace-impls/README.md index 5c6e4de29f0..b61d495a791 100644 --- a/rust/lance-namespace-impls/README.md +++ b/rust/lance-namespace-impls/README.md @@ -8,7 +8,7 @@ This crate provides concrete implementations of the Lance namespace trait: - Unified connection interface for all implementations - **REST Namespace** - REST API client for remote Lance namespace servers (feature: `rest`) -- **Directory Namespace** - File system-based namespace that stores tables as Lance datasets (feature: `dir`) +- **Directory Namespace** - File system-based namespace that stores tables as Lance datasets (always available) ## Features @@ -16,15 +16,16 @@ This crate provides concrete implementations of the Lance namespace trait: The REST namespace implementation provides a client for connecting to remote Lance namespace servers via REST API. -### Directory Namespace (feature: `dir`) +### Directory Namespace (always available) The directory namespace implementation stores tables as Lance datasets in a directory structure on local or cloud storage. Supported storage backends: -- Local filesystem -- AWS S3 -- Google Cloud Storage (GCS) -- Azure Blob Storage +- Local filesystem (always available) +- AWS S3 (feature: `dir-aws`) +- Google Cloud Storage (feature: `dir-gcp`) +- Azure Blob Storage (feature: `dir-azure`) +- Alibaba Cloud OSS (feature: `dir-oss`) ## Usage diff --git a/rust/lance-namespace-impls/src/connect.rs b/rust/lance-namespace-impls/src/connect.rs index ca32822acc7..e3560b14289 100644 --- a/rust/lance-namespace-impls/src/connect.rs +++ b/rust/lance-namespace-impls/src/connect.rs @@ -6,100 +6,220 @@ use std::collections::HashMap; use std::sync::Arc; +use lance::session::Session; use lance_core::{Error, Result}; use lance_namespace::LanceNamespace; -/// Connect to a Lance namespace implementation. +/// Builder for creating Lance namespace connections. /// -/// This function creates a connection to a Lance namespace backend based on -/// the specified implementation type and configuration properties. -/// -/// # Arguments -/// -/// * `impl_name` - Implementation identifier. Supported values: -/// - "rest": REST API implementation (requires "rest" feature) -/// - "dir": Directory-based implementation (requires "dir" feature) -/// -/// * `properties` - Configuration properties specific to the implementation. -/// Common properties: -/// - For REST: "uri" (base URL), "delimiter", "header.*" (custom headers) -/// - For DIR: "root" (directory path), "storage.*" (storage options) -/// -/// # Returns -/// -/// Returns a boxed trait object implementing the `LanceNamespace` trait. +/// This builder provides a fluent API for configuring and establishing +/// connections to Lance namespace implementations. /// /// # Examples /// /// ```no_run -/// use lance_namespace_impls::connect; -/// use std::collections::HashMap; -/// +/// # use lance_namespace_impls::ConnectBuilder; +/// # use std::collections::HashMap; /// # async fn example() -> Result<(), Box> { -/// // Connect to REST implementation -/// let mut props = HashMap::new(); -/// props.insert("uri".to_string(), "http://localhost:8080".to_string()); -/// let namespace = connect("rest", props).await?; +/// // Connect to directory implementation +/// let namespace = ConnectBuilder::new("dir") +/// .property("root", "/path/to/data") +/// .property("storage.region", "us-west-2") +/// .connect() +/// .await?; /// # Ok(()) /// # } /// ``` /// /// ```no_run -/// # #[cfg(feature = "dir")] +/// # use lance_namespace_impls::ConnectBuilder; +/// # use lance::session::Session; +/// # use std::sync::Arc; /// # async fn example() -> Result<(), Box> { -/// use lance_namespace_impls::connect; -/// use std::collections::HashMap; -/// -/// // Connect to directory implementation (requires "dir" feature) -/// let mut props = HashMap::new(); -/// props.insert("root".to_string(), "/path/to/data".to_string()); -/// let namespace = connect("dir", props).await?; +/// // Connect with a shared session +/// let session = Arc::new(Session::default()); +/// let namespace = ConnectBuilder::new("dir") +/// .property("root", "/path/to/data") +/// .session(session) +/// .connect() +/// .await?; /// # Ok(()) /// # } /// ``` -pub async fn connect( - impl_name: &str, - #[allow(unused)] properties: HashMap, -) -> Result> { - match impl_name { - #[cfg(feature = "rest")] - "rest" => { - // Create REST implementation - Ok(Arc::new(crate::rest::RestNamespace::new(properties))) - } - #[cfg(not(feature = "rest"))] - "rest" => Err(Error::Namespace { - source: "REST namespace implementation requires 'rest' feature to be enabled".into(), - location: snafu::location!(), - }), - #[cfg(feature = "dir")] - "dir" => { - // Create directory implementation - crate::dir::connect_dir(properties).await +#[derive(Debug, Clone)] +pub struct ConnectBuilder { + impl_name: String, + properties: HashMap, + session: Option>, +} + +impl ConnectBuilder { + /// Create a new ConnectBuilder for the specified implementation. + /// + /// # Arguments + /// + /// * `impl_name` - Implementation identifier ("dir", "rest", etc.) + pub fn new(impl_name: impl Into) -> Self { + Self { + impl_name: impl_name.into(), + properties: HashMap::new(), + session: None, } - #[cfg(not(feature = "dir"))] - "dir" => Err(Error::Namespace { - source: "Directory namespace implementation requires 'dir' feature to be enabled" + } + + /// Add a configuration property. + /// + /// # Arguments + /// + /// * `key` - Property key + /// * `value` - Property value + pub fn property(mut self, key: impl Into, value: impl Into) -> Self { + self.properties.insert(key.into(), value.into()); + self + } + + /// Add multiple configuration properties. + /// + /// # Arguments + /// + /// * `properties` - HashMap of properties to add + pub fn properties(mut self, properties: HashMap) -> Self { + self.properties.extend(properties); + self + } + + /// Set the Lance session to use for this connection. + /// + /// When a session is provided, the namespace will reuse the session's + /// object store registry, allowing multiple namespaces and datasets + /// to share the same underlying storage connections. + /// + /// # Arguments + /// + /// * `session` - Arc-wrapped Lance session + pub fn session(mut self, session: Arc) -> Self { + self.session = Some(session); + self + } + + /// Build and establish the connection to the namespace. + /// + /// # Returns + /// + /// Returns a trait object implementing `LanceNamespace`. + /// + /// # Errors + /// + /// Returns an error if: + /// - The implementation type is not supported + /// - Required configuration properties are missing + /// - Connection to the backend fails + pub async fn connect(self) -> Result> { + match self.impl_name.as_str() { + #[cfg(feature = "rest")] + "rest" => { + // Create REST implementation (REST doesn't use session) + crate::rest::RestNamespaceBuilder::from_properties(self.properties) + .map(|builder| Arc::new(builder.build()) as Arc) + } + #[cfg(not(feature = "rest"))] + "rest" => Err(Error::Namespace { + source: "REST namespace implementation requires 'rest' feature to be enabled" + .into(), + location: snafu::location!(), + }), + "dir" => { + // Create directory implementation (always available) + crate::dir::DirectoryNamespaceBuilder::from_properties( + self.properties, + self.session, + )? + .build() + .await + .map(|ns| Arc::new(ns) as Arc) + } + _ => Err(Error::Namespace { + source: format!( + "Implementation '{}' is not available. Supported: dir{}", + self.impl_name, + if cfg!(feature = "rest") { ", rest" } else { "" } + ) .into(), - location: snafu::location!(), - }), - _ => Err(Error::Namespace { - source: format!( - "Implementation '{}' is not available. Supported: {}{}", - impl_name, - if cfg!(feature = "rest") { "rest" } else { "" }, - if cfg!(feature = "dir") { - if cfg!(feature = "rest") { - ", dir" - } else { - "dir" - } - } else { - "" - } - ) - .into(), - location: snafu::location!(), - }), + location: snafu::location!(), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use lance_core::utils::tempfile::TempStdDir; + use lance_namespace::models::ListTablesRequest; + use std::collections::HashMap; + + #[tokio::test] + async fn test_connect_builder_basic() { + let temp_dir = TempStdDir::default(); + + let namespace = ConnectBuilder::new("dir") + .property("root", temp_dir.to_str().unwrap()) + .connect() + .await + .unwrap(); + + // Verify we can use the namespace + let request = ListTablesRequest::new(); + let response = namespace.list_tables(request).await.unwrap(); + assert_eq!(response.tables.len(), 0); + } + + #[tokio::test] + async fn test_connect_builder_with_properties() { + let temp_dir = TempStdDir::default(); + let mut props = HashMap::new(); + props.insert("storage.option1".to_string(), "value1".to_string()); + + let namespace = ConnectBuilder::new("dir") + .property("root", temp_dir.to_str().unwrap()) + .properties(props) + .connect() + .await + .unwrap(); + + // Verify we can use the namespace + let request = ListTablesRequest::new(); + let response = namespace.list_tables(request).await.unwrap(); + assert_eq!(response.tables.len(), 0); + } + + #[tokio::test] + async fn test_connect_builder_with_session() { + let temp_dir = TempStdDir::default(); + let session = Arc::new(Session::default()); + + let namespace = ConnectBuilder::new("dir") + .property("root", temp_dir.to_str().unwrap()) + .session(session.clone()) + .connect() + .await + .unwrap(); + + // Verify we can use the namespace + let request = ListTablesRequest::new(); + let response = namespace.list_tables(request).await.unwrap(); + assert_eq!(response.tables.len(), 0); + } + + #[tokio::test] + async fn test_connect_builder_invalid_impl() { + let result = ConnectBuilder::new("invalid") + .property("root", "/tmp") + .connect() + .await; + + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.to_string().contains("not available")); } } diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 96d1d6753a9..19ced63f482 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -7,13 +7,14 @@ //! that stores tables as Lance datasets in a filesystem directory structure. use std::collections::HashMap; -use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; use lance::dataset::{Dataset, WriteParams}; -use opendal::Operator; +use lance::session::Session; +use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; +use object_store::path::Path; use lance_namespace::models::{ CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest, @@ -27,186 +28,241 @@ use lance_namespace::models::{ use lance_core::{box_error, Error, Result}; use lance_namespace::LanceNamespace; -/// Connect to a directory-based namespace implementation. +/// Builder for creating a DirectoryNamespace. /// -/// This is a convenience wrapper around DirectoryNamespace::new that returns -/// the same type as connect for API consistency. -pub async fn connect_dir(properties: HashMap) -> Result> { - DirectoryNamespace::new(properties).map(|ns| Arc::new(ns) as Arc) -} - -/// Configuration for DirectoryNamespace. +/// This builder provides a fluent API for configuring and establishing +/// connections to directory-based Lance namespaces. +/// +/// # Examples +/// +/// ```no_run +/// # use lance_namespace_impls::DirectoryNamespaceBuilder; +/// # async fn example() -> Result<(), Box> { +/// // Create a local directory namespace +/// let namespace = DirectoryNamespaceBuilder::new("/path/to/data") +/// .build() +/// .await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// ```no_run +/// # use lance_namespace_impls::DirectoryNamespaceBuilder; +/// # use lance::session::Session; +/// # use std::sync::Arc; +/// # async fn example() -> Result<(), Box> { +/// // Create with custom storage options and session +/// let session = Arc::new(Session::default()); +/// let namespace = DirectoryNamespaceBuilder::new("s3://bucket/path") +/// .storage_option("region", "us-west-2") +/// .storage_option("access_key_id", "key") +/// .session(session) +/// .build() +/// .await?; +/// # Ok(()) +/// # } +/// ``` #[derive(Debug, Clone)] -pub struct DirectoryNamespaceConfig { - /// Root directory for the namespace +pub struct DirectoryNamespaceBuilder { root: String, - /// Storage options for the backend - storage_options: HashMap, + storage_options: Option>, + session: Option>, } -impl DirectoryNamespaceConfig { - /// Property key for the root directory - pub const ROOT: &'static str = "root"; - /// Prefix for storage options - pub const STORAGE_OPTIONS_PREFIX: &'static str = "storage."; +impl DirectoryNamespaceBuilder { + /// Create a new DirectoryNamespaceBuilder with the specified root path. + /// + /// # Arguments + /// + /// * `root` - Root directory path (local path or cloud URI like s3://bucket/path) + pub fn new(root: impl Into) -> Self { + Self { + root: root.into().trim_end_matches('/').to_string(), + storage_options: None, + session: None, + } + } - /// Create a new configuration from properties - pub fn new(properties: HashMap) -> Self { + /// Create a DirectoryNamespaceBuilder from properties HashMap. + /// + /// This method parses a properties map into builder configuration. + /// It expects: + /// - `root`: The root directory path (required) + /// - `storage.*`: Storage options (optional, prefix will be stripped) + /// + /// # Arguments + /// + /// * `properties` - Configuration properties + /// * `session` - Optional Lance session to reuse object store registry + /// + /// # Returns + /// + /// Returns a `DirectoryNamespaceBuilder` instance. + /// + /// # Errors + /// + /// Returns an error if the `root` property is missing. + /// + /// # Examples + /// + /// ```no_run + /// # use lance_namespace_impls::DirectoryNamespaceBuilder; + /// # use std::collections::HashMap; + /// # async fn example() -> Result<(), Box> { + /// let mut properties = HashMap::new(); + /// properties.insert("root".to_string(), "/path/to/data".to_string()); + /// properties.insert("storage.region".to_string(), "us-west-2".to_string()); + /// + /// let namespace = DirectoryNamespaceBuilder::from_properties(properties, None)? + /// .build() + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn from_properties( + properties: HashMap, + session: Option>, + ) -> Result { + // Extract root from properties (required) let root = properties - .get(Self::ROOT) + .get("root") .cloned() - .unwrap_or_else(|| { - std::env::current_dir() - .unwrap() - .to_string_lossy() - .to_string() - }) - .trim_end_matches('/') - .to_string(); + .ok_or_else(|| Error::Namespace { + source: "Missing required property 'root' for directory namespace".into(), + location: snafu::location!(), + })?; + // Extract storage options (properties prefixed with "storage.") let storage_options: HashMap = properties .iter() .filter_map(|(k, v)| { - k.strip_prefix(Self::STORAGE_OPTIONS_PREFIX) + k.strip_prefix("storage.") .map(|key| (key.to_string(), v.clone())) }) .collect(); - Self { - root, + let storage_options = if storage_options.is_empty() { + None + } else { + Some(storage_options) + }; + + Ok(Self { + root: root.trim_end_matches('/').to_string(), storage_options, - } + session, + }) } - /// Get the root directory - pub fn root(&self) -> &str { - &self.root + /// Add a storage option. + /// + /// # Arguments + /// + /// * `key` - Storage option key (e.g., "region", "access_key_id") + /// * `value` - Storage option value + pub fn storage_option(mut self, key: impl Into, value: impl Into) -> Self { + self.storage_options + .get_or_insert_with(HashMap::new) + .insert(key.into(), value.into()); + self } - /// Get the storage options - pub fn storage_options(&self) -> &HashMap { - &self.storage_options + /// Add multiple storage options. + /// + /// # Arguments + /// + /// * `options` - HashMap of storage options to add + pub fn storage_options(mut self, options: HashMap) -> Self { + self.storage_options + .get_or_insert_with(HashMap::new) + .extend(options); + self } -} -/// Directory-based implementation of Lance Namespace. -/// -/// This implementation stores tables as Lance datasets in a directory structure. -/// It supports local filesystems and cloud storage backends through OpenDAL. -pub struct DirectoryNamespace { - config: DirectoryNamespaceConfig, - operator: Operator, -} - -impl DirectoryNamespace { - /// Create a new DirectoryNamespace instance - pub fn new(properties: HashMap) -> Result { - let config = DirectoryNamespaceConfig::new(properties); - let operator = Self::initialize_operator(&config)?; - - Ok(Self { config, operator }) + /// Set the Lance session to use for this namespace. + /// + /// When a session is provided, the namespace will reuse the session's + /// object store registry, allowing multiple namespaces and datasets + /// to share the same underlying storage connections. + /// + /// # Arguments + /// + /// * `session` - Arc-wrapped Lance session + pub fn session(mut self, session: Arc) -> Self { + self.session = Some(session); + self } - /// Initialize the OpenDAL operator based on the configuration - fn initialize_operator(config: &DirectoryNamespaceConfig) -> Result { - let root = config.root(); - let storage_options = &config.storage_options; - - // Parse the root path to determine scheme and configuration - let (scheme, opendal_config) = Self::parse_storage_path(root, storage_options)?; - - // Create the operator with the determined scheme and configuration - let operator = - Operator::via_iter(scheme, opendal_config).map_err(|e| Error::Namespace { - source: format!("Failed to create operator: {}", e).into(), - location: snafu::location!(), - })?; - - Ok(operator) + /// Build the DirectoryNamespace. + /// + /// # Returns + /// + /// Returns a `DirectoryNamespace` instance. + /// + /// # Errors + /// + /// Returns an error if: + /// - The root path is invalid + /// - Connection to the storage backend fails + /// - Storage options are invalid + pub async fn build(self) -> Result { + let (object_store, base_path) = + Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?; + + Ok(DirectoryNamespace { + root: self.root, + storage_options: self.storage_options, + session: self.session, + object_store, + base_path, + }) } - /// Parse storage path and return scheme and configuration - fn parse_storage_path( + /// Initialize the Lance ObjectStore based on the configuration + async fn initialize_object_store( root: &str, - storage_options: &HashMap, - ) -> Result<(opendal::Scheme, HashMap)> { - use url::Url; - - let mut config = HashMap::new(); - - // Try to parse as URL, if it fails, treat as local filesystem path - let (scheme, authority, path) = if let Ok(url) = Url::parse(root) { - let scheme = Self::normalize_scheme(url.scheme()); - let authority = url.host_str().unwrap_or(""); - // For file:// and fs:// URLs, preserve the full path including leading slash - // For cloud storage URLs, remove the leading slash - let path = if scheme == "fs" || scheme == "file" { - url.path().to_string() - } else { - url.path().trim_start_matches('/').to_string() - }; - (scheme, authority.to_string(), path) - } else { - // Not a URL, treat as local filesystem - prepend fs:// - ("fs".to_string(), String::new(), root.to_string()) + storage_options: &Option>, + session: &Option>, + ) -> Result<(Arc, Path)> { + // Build ObjectStoreParams from storage options + let params = ObjectStoreParams { + storage_options: storage_options.clone(), + ..Default::default() }; - // Configure based on scheme - let opendal_scheme = match scheme.as_str() { - "fs" | "file" => { - // For filesystem, use the full path (authority is empty for local paths) - if authority.is_empty() { - config.insert("root".to_string(), path); - } else { - // Handle file:///absolute/path or fs://hostname/path - config.insert("root".to_string(), format!("{}/{}", authority, path)); - } - opendal::Scheme::Fs - } - "s3" => { - config.insert("root".to_string(), path); - config.insert("bucket".to_string(), authority); - opendal::Scheme::S3 - } - "gcs" => { - config.insert("root".to_string(), path); - config.insert("bucket".to_string(), authority); - opendal::Scheme::Gcs - } - "azblob" => { - config.insert("root".to_string(), path); - config.insert("container".to_string(), authority); - opendal::Scheme::Azblob - } - _ => { - // For unknown schemes, try to parse as OpenDAL scheme - config.insert("root".to_string(), path); - if !authority.is_empty() { - config.insert("bucket".to_string(), authority); - } - opendal::Scheme::from_str(&scheme).map_err(|_| Error::Namespace { - source: format!("Unsupported storage scheme: {}", scheme).into(), - location: snafu::location!(), - })? - } + // Use object store registry from session if provided, otherwise create a new one + let registry = if let Some(session) = session { + session.store_registry() + } else { + Arc::new(ObjectStoreRegistry::default()) }; - // Add storage options for all schemes - config.extend(storage_options.clone()); + // Use Lance's object store factory to create from URI + let (object_store, base_path) = ObjectStore::from_uri_and_params(registry, root, ¶ms) + .await + .map_err(|e| Error::Namespace { + source: format!("Failed to create object store: {}", e).into(), + location: snafu::location!(), + })?; - Ok((opendal_scheme, config)) + Ok((object_store, base_path)) } +} - /// Normalize scheme names with common aliases - fn normalize_scheme(scheme: &str) -> String { - match scheme.to_lowercase().as_str() { - "s3a" | "s3n" => "s3".to_string(), - "abfs" => "azblob".to_string(), - "file" => "fs".to_string(), - s => s.to_string(), - } - } +/// Directory-based implementation of Lance Namespace. +/// +/// This implementation stores tables as Lance datasets in a directory structure. +/// It supports local filesystems and cloud storage backends through Lance's object store. +pub struct DirectoryNamespace { + root: String, + storage_options: Option>, + #[allow(dead_code)] + session: Option>, + object_store: Arc, + base_path: Path, +} +impl DirectoryNamespace { /// Validate that the namespace ID represents the root namespace fn validate_root_namespace_id(id: &Option>) -> Result<()> { if let Some(id) = id { @@ -244,14 +300,31 @@ impl DirectoryNamespace { Ok(id[0].clone()) } - /// Get the full path for a table - fn table_full_path(&self, table_name: &str) -> String { - format!("{}/{}.lance", self.config.root(), table_name) + /// Get the full URI path for a table (for returning in responses) + fn table_full_uri(&self, table_name: &str) -> String { + format!("{}/{}.lance", &self.root, table_name) + } + + /// Get the object store path for a table (relative to base_path) + fn table_path(&self, table_name: &str) -> Path { + self.base_path + .child(format!("{}.lance", table_name).as_str()) + } + + /// Get the versions directory path for a table + fn table_versions_path(&self, table_name: &str) -> Path { + // Need to chain child calls to avoid URL encoding the slash + self.base_path + .child(format!("{}.lance", table_name).as_str()) + .child("_versions") } - /// Get the versions path for a table - fn table_versions_path(&self, table_name: &str) -> String { - format!("{}.lance/_versions/", table_name) + /// Get the reserved file path for a table + fn table_reserved_file_path(&self, table_name: &str) -> Path { + // Need to chain child calls to avoid URL encoding the slash + self.base_path + .child(format!("{}.lance", table_name).as_str()) + .child(".lance-reserved") } } @@ -334,17 +407,21 @@ impl LanceNamespace for DirectoryNamespace { let mut tables = Vec::new(); - // Use non-recursive listing to avoid issues with object stores that don't have directory concept - let entries = self.operator.list("").await.map_err(|e| Error::IO { - source: box_error(std::io::Error::other(format!( - "Failed to list directory: {}", - e - ))), - location: snafu::location!(), - })?; + // List all entries in the base directory + let entries = self + .object_store + .read_dir(self.base_path.clone()) + .await + .map_err(|e| Error::IO { + source: box_error(std::io::Error::other(format!( + "Failed to list directory: {}", + e + ))), + location: snafu::location!(), + })?; for entry in entries { - let path = entry.path().trim_end_matches('/'); + let path = entry.trim_end_matches('/'); // Only process directory-like paths that end with .lance if !path.ends_with(".lance") { @@ -358,9 +435,9 @@ impl LanceNamespace for DirectoryNamespace { let mut is_table = false; // First check for .lance-reserved file - let reserved_file_path = format!("{}.lance/.lance-reserved", table_name); + let reserved_file_path = self.table_reserved_file_path(table_name); if self - .operator + .object_store .exists(&reserved_file_path) .await .unwrap_or(false) @@ -371,7 +448,7 @@ impl LanceNamespace for DirectoryNamespace { // If not found, check for _versions directory if !is_table { let versions_path = self.table_versions_path(table_name); - if let Ok(version_entries) = self.operator.list(&versions_path).await { + if let Ok(version_entries) = self.object_store.read_dir(versions_path).await { // If there's at least one version file, it's a valid Lance dataset if !version_entries.is_empty() { is_table = true; @@ -390,15 +467,15 @@ impl LanceNamespace for DirectoryNamespace { async fn describe_table(&self, request: DescribeTableRequest) -> Result { let table_name = Self::table_name_from_id(&request.id)?; - let table_path = self.table_full_path(&table_name); + let table_uri = self.table_full_uri(&table_name); // Check if table exists - either as Lance dataset or with .lance-reserved file let mut table_exists = false; // First check for .lance-reserved file - let reserved_file_path = format!("{}.lance/.lance-reserved", table_name); + let reserved_file_path = self.table_reserved_file_path(&table_name); if self - .operator + .object_store .exists(&reserved_file_path) .await .unwrap_or(false) @@ -409,7 +486,7 @@ impl LanceNamespace for DirectoryNamespace { // If not found, check if it's a Lance dataset by looking for _versions directory if !table_exists { let versions_path = self.table_versions_path(&table_name); - if let Ok(entries) = self.operator.list(&versions_path).await { + if let Ok(entries) = self.object_store.read_dir(versions_path).await { if !entries.is_empty() { table_exists = true; } @@ -425,10 +502,10 @@ impl LanceNamespace for DirectoryNamespace { Ok(DescribeTableResponse { version: None, - location: Some(table_path), + location: Some(table_uri), schema: None, properties: None, - storage_options: Some(self.config.storage_options.clone()), + storage_options: self.storage_options.clone(), }) } @@ -439,9 +516,9 @@ impl LanceNamespace for DirectoryNamespace { let mut table_exists = false; // First check for .lance-reserved file - let reserved_file_path = format!("{}.lance/.lance-reserved", table_name); + let reserved_file_path = self.table_reserved_file_path(&table_name); if self - .operator + .object_store .exists(&reserved_file_path) .await .unwrap_or(false) @@ -452,7 +529,7 @@ impl LanceNamespace for DirectoryNamespace { // If not found, check if it's a Lance dataset by looking for _versions directory if !table_exists { let versions_path = self.table_versions_path(&table_name); - if let Ok(entries) = self.operator.list(&versions_path).await { + if let Ok(entries) = self.object_store.read_dir(versions_path).await { if !entries.is_empty() { table_exists = true; } @@ -469,13 +546,36 @@ impl LanceNamespace for DirectoryNamespace { Ok(()) } + async fn drop_table(&self, request: DropTableRequest) -> Result { + let table_name = Self::table_name_from_id(&request.id)?; + let table_uri = self.table_full_uri(&table_name); + + // Remove the entire table directory + let table_path = self.table_path(&table_name); + + self.object_store + .remove_dir_all(table_path) + .await + .map_err(|e| Error::Namespace { + source: format!("Failed to drop table {}: {}", table_name, e).into(), + location: snafu::location!(), + })?; + + Ok(DropTableResponse { + id: request.id, + location: Some(table_uri), + properties: None, + transaction_id: None, + }) + } + async fn create_table( &self, request: CreateTableRequest, request_data: Bytes, ) -> Result { let table_name = Self::table_name_from_id(&request.id)?; - let table_path = self.table_full_path(&table_name); + let table_uri = self.table_full_uri(&table_name); // Validate that request_data is provided and is a valid Arrow IPC stream if request_data.is_empty() { @@ -488,11 +588,11 @@ impl LanceNamespace for DirectoryNamespace { // Validate location if provided if let Some(location) = &request.location { let location = location.trim_end_matches('/'); - if location != table_path { + if location != table_uri { return Err(Error::Namespace { source: format!( "Cannot create table {} at location {}, must be at location {}", - table_name, location, table_path + table_name, location, table_uri ) .into(), location: snafu::location!(), @@ -535,13 +635,20 @@ impl LanceNamespace for DirectoryNamespace { }; // Set up write parameters for creating a new dataset + // Populate store_params with storage options to ensure they're forwarded to Dataset::write + let store_params = self.storage_options.as_ref().map(|opts| ObjectStoreParams { + storage_options: Some(opts.clone()), + ..Default::default() + }); + let write_params = WriteParams { mode: lance::dataset::WriteMode::Create, + store_params, ..Default::default() }; // Create the Lance dataset using the actual Lance API - Dataset::write(reader, &table_path, Some(write_params)) + Dataset::write(reader, &table_uri, Some(write_params)) .await .map_err(|e| Error::Namespace { source: format!("Failed to create Lance dataset: {}", e).into(), @@ -550,9 +657,9 @@ impl LanceNamespace for DirectoryNamespace { Ok(CreateTableResponse { version: Some(1), - location: Some(table_path), + location: Some(table_uri), properties: None, - storage_options: Some(self.config.storage_options.clone()), + storage_options: self.storage_options.clone(), }) } @@ -561,16 +668,16 @@ impl LanceNamespace for DirectoryNamespace { request: CreateEmptyTableRequest, ) -> Result { let table_name = Self::table_name_from_id(&request.id)?; - let table_path = self.table_full_path(&table_name); + let table_uri = self.table_full_uri(&table_name); // Validate location if provided if let Some(location) = &request.location { let location = location.trim_end_matches('/'); - if location != table_path { + if location != table_uri { return Err(Error::Namespace { source: format!( "Cannot create table {} at location {}, must be at location {}", - table_name, location, table_path + table_name, location, table_uri ) .into(), location: snafu::location!(), @@ -579,9 +686,10 @@ impl LanceNamespace for DirectoryNamespace { } // Create the .lance-reserved file to mark the table as existing - let reserved_file_path = format!("{}.lance/.lance-reserved", table_name); - self.operator - .write(&reserved_file_path, Vec::::new()) + let reserved_file_path = self.table_reserved_file_path(&table_name); + + self.object_store + .create(&reserved_file_path) .await .map_err(|e| Error::Namespace { source: format!( @@ -590,34 +698,22 @@ impl LanceNamespace for DirectoryNamespace { ) .into(), location: snafu::location!(), - })?; - - Ok(CreateEmptyTableResponse { - location: Some(table_path), - properties: None, - storage_options: Some(self.config.storage_options.clone()), - }) - } - - async fn drop_table(&self, request: DropTableRequest) -> Result { - let table_name = Self::table_name_from_id(&request.id)?; - let table_path = self.table_full_path(&table_name); - - // Remove the entire table directory - let table_dir = format!("{}.lance/", table_name); - self.operator - .remove_all(&table_dir) + })? + .shutdown() .await .map_err(|e| Error::Namespace { - source: format!("Failed to drop table {}: {}", table_name, e).into(), + source: format!( + "Failed to finalize .lance-reserved file for table {}: {}", + table_name, e + ) + .into(), location: snafu::location!(), })?; - Ok(DropTableResponse { - id: request.id, - location: Some(table_path), + Ok(CreateEmptyTableResponse { + location: Some(table_uri), properties: None, - transaction_id: None, + storage_options: self.storage_options.clone(), }) } } @@ -625,22 +721,19 @@ impl LanceNamespace for DirectoryNamespace { #[cfg(test)] mod tests { use super::*; + use lance_core::utils::tempfile::TempStdDir; use lance_namespace::models::{JsonArrowDataType, JsonArrowField, JsonArrowSchema}; use lance_namespace::schema::convert_json_arrow_schema; - use std::collections::HashMap; use std::sync::Arc; - use tempfile::TempDir; /// Helper to create a test DirectoryNamespace with a temporary directory - async fn create_test_namespace() -> (DirectoryNamespace, TempDir) { - let temp_dir = TempDir::new().unwrap(); - let mut properties = HashMap::new(); - properties.insert( - "root".to_string(), - temp_dir.path().to_string_lossy().to_string(), - ); - - let namespace = DirectoryNamespace::new(properties).unwrap(); + async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) { + let temp_dir = TempStdDir::default(); + + let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) + .build() + .await + .unwrap(); (namespace, temp_dir) } @@ -1007,17 +1100,14 @@ mod tests { #[tokio::test] async fn test_config_custom_root() { - let temp_dir = TempDir::new().unwrap(); - let custom_path = temp_dir.path().join("custom"); + let temp_dir = TempStdDir::default(); + let custom_path = temp_dir.join("custom"); std::fs::create_dir(&custom_path).unwrap(); - let mut properties = HashMap::new(); - properties.insert( - "root".to_string(), - custom_path.to_string_lossy().to_string(), - ); - - let namespace = DirectoryNamespace::new(properties).unwrap(); + let namespace = DirectoryNamespaceBuilder::new(custom_path.to_string_lossy().to_string()) + .build() + .await + .unwrap(); // Create test IPC data let schema = create_test_schema(); @@ -1037,16 +1127,14 @@ mod tests { #[tokio::test] async fn test_config_storage_options() { - let temp_dir = TempDir::new().unwrap(); - let mut properties = HashMap::new(); - properties.insert( - "root".to_string(), - temp_dir.path().to_string_lossy().to_string(), - ); - properties.insert("storage.option1".to_string(), "value1".to_string()); - properties.insert("storage.option2".to_string(), "value2".to_string()); + let temp_dir = TempStdDir::default(); - let namespace = DirectoryNamespace::new(properties).unwrap(); + let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) + .storage_option("option1", "value1") + .storage_option("option2", "value2") + .build() + .await + .unwrap(); // Create test IPC data let schema = create_test_schema(); @@ -1119,160 +1207,19 @@ mod tests { #[tokio::test] async fn test_connect_dir() { - let temp_dir = TempDir::new().unwrap(); - let mut properties = HashMap::new(); - properties.insert( - "root".to_string(), - temp_dir.path().to_string_lossy().to_string(), - ); + let temp_dir = TempStdDir::default(); - let namespace = connect_dir(properties).await.unwrap(); + let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) + .build() + .await + .unwrap(); - // Test basic operation through the trait object + // Test basic operation through the concrete type let request = ListTablesRequest::new(); let response = namespace.list_tables(request).await.unwrap(); assert_eq!(response.tables.len(), 0); } - #[test] - fn test_parse_storage_path_local() { - let storage_options = HashMap::new(); - - // Test local filesystem paths - let (scheme, config) = - DirectoryNamespace::parse_storage_path("/path/to/data", &storage_options).unwrap(); - assert!(matches!(scheme, opendal::Scheme::Fs)); - assert_eq!(config.get("root").unwrap(), "/path/to/data"); - - // Test relative path - let (scheme, config) = - DirectoryNamespace::parse_storage_path("./data", &storage_options).unwrap(); - assert!(matches!(scheme, opendal::Scheme::Fs)); - assert_eq!(config.get("root").unwrap(), "./data"); - } - - #[test] - fn test_parse_storage_path_s3() { - let storage_options = HashMap::new(); - - // Test S3 URL - let (scheme, config) = - DirectoryNamespace::parse_storage_path("s3://my-bucket/path/to/data", &storage_options) - .unwrap(); - assert!(matches!(scheme, opendal::Scheme::S3)); - assert_eq!(config.get("bucket").unwrap(), "my-bucket"); - assert_eq!(config.get("root").unwrap(), "path/to/data"); - - // Test S3 with just bucket - let (scheme, config) = - DirectoryNamespace::parse_storage_path("s3://my-bucket", &storage_options).unwrap(); - assert!(matches!(scheme, opendal::Scheme::S3)); - assert_eq!(config.get("bucket").unwrap(), "my-bucket"); - assert_eq!(config.get("root").unwrap(), ""); - } - - #[test] - fn test_parse_storage_path_gcs() { - let storage_options = HashMap::new(); - - // Test GCS URL - let (scheme, config) = DirectoryNamespace::parse_storage_path( - "gcs://my-bucket/path/to/data", - &storage_options, - ) - .unwrap(); - assert!(matches!(scheme, opendal::Scheme::Gcs)); - assert_eq!(config.get("bucket").unwrap(), "my-bucket"); - assert_eq!(config.get("root").unwrap(), "path/to/data"); - } - - #[test] - fn test_parse_storage_path_azblob() { - let storage_options = HashMap::new(); - - // Test Azure Blob URL - let (scheme, config) = DirectoryNamespace::parse_storage_path( - "azblob://my-container/path/to/data", - &storage_options, - ) - .unwrap(); - assert!(matches!(scheme, opendal::Scheme::Azblob)); - assert_eq!(config.get("container").unwrap(), "my-container"); - assert_eq!(config.get("root").unwrap(), "path/to/data"); - - // Test with abfs alias - let (scheme, config) = - DirectoryNamespace::parse_storage_path("abfs://my-container/path", &storage_options) - .unwrap(); - assert!(matches!(scheme, opendal::Scheme::Azblob)); - assert_eq!(config.get("container").unwrap(), "my-container"); - assert_eq!(config.get("root").unwrap(), "path"); - } - - #[test] - fn test_normalize_scheme() { - // Test scheme normalization - assert_eq!(DirectoryNamespace::normalize_scheme("s3a"), "s3"); - assert_eq!(DirectoryNamespace::normalize_scheme("s3n"), "s3"); - assert_eq!(DirectoryNamespace::normalize_scheme("S3A"), "s3"); - assert_eq!(DirectoryNamespace::normalize_scheme("abfs"), "azblob"); - assert_eq!(DirectoryNamespace::normalize_scheme("ABFS"), "azblob"); - assert_eq!(DirectoryNamespace::normalize_scheme("file"), "fs"); - assert_eq!(DirectoryNamespace::normalize_scheme("FILE"), "fs"); - assert_eq!(DirectoryNamespace::normalize_scheme("gcs"), "gcs"); - assert_eq!(DirectoryNamespace::normalize_scheme("random"), "random"); - } - - #[test] - fn test_fs_scheme_url() { - let storage_options = HashMap::new(); - - // Test file:// URLs - let (scheme, config) = - DirectoryNamespace::parse_storage_path("file:///absolute/path", &storage_options) - .unwrap(); - assert!(matches!(scheme, opendal::Scheme::Fs)); - assert_eq!(config.get("root").unwrap(), "/absolute/path"); - - // Test fs:// URLs - let (scheme, config) = - DirectoryNamespace::parse_storage_path("fs:///absolute/path", &storage_options) - .unwrap(); - assert!(matches!(scheme, opendal::Scheme::Fs)); - assert_eq!(config.get("root").unwrap(), "/absolute/path"); - } - - #[test] - fn test_storage_options_passed_through() { - // Test that storage options are properly passed through parse_storage_path - let mut storage_options = HashMap::new(); - storage_options.insert("aws_access_key_id".to_string(), "test_key".to_string()); - storage_options.insert( - "aws_secret_access_key".to_string(), - "test_secret".to_string(), - ); - storage_options.insert("region".to_string(), "us-west-2".to_string()); - - // Test with S3 - storage options should be included - let (scheme, config) = - DirectoryNamespace::parse_storage_path("s3://my-bucket/path", &storage_options) - .unwrap(); - assert!(matches!(scheme, opendal::Scheme::S3)); - assert_eq!(config.get("bucket").unwrap(), "my-bucket"); - assert_eq!(config.get("root").unwrap(), "path"); - assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key"); - assert_eq!(config.get("aws_secret_access_key").unwrap(), "test_secret"); - assert_eq!(config.get("region").unwrap(), "us-west-2"); - - // Test with local filesystem - storage options should still be included - let (scheme, config) = - DirectoryNamespace::parse_storage_path("/local/path", &storage_options).unwrap(); - assert!(matches!(scheme, opendal::Scheme::Fs)); - assert_eq!(config.get("root").unwrap(), "/local/path"); - // Even for local fs, storage options should be passed through - assert_eq!(config.get("aws_access_key_id").unwrap(), "test_key"); - } - #[tokio::test] async fn test_create_table_with_ipc_data() { use arrow::array::{Int32Array, StringArray}; @@ -1338,7 +1285,7 @@ mod tests { assert!(response.location.unwrap().ends_with("empty_table.lance")); // Verify the .lance-reserved file was created in the correct location - let table_dir = temp_dir.path().join("empty_table.lance"); + let table_dir = temp_dir.join("empty_table.lance"); assert!(table_dir.exists()); assert!(table_dir.is_dir()); @@ -1396,7 +1343,7 @@ mod tests { assert!(create_response.location.is_some()); // Verify it exists - let table_dir = temp_dir.path().join("empty_table_to_drop.lance"); + let table_dir = temp_dir.join("empty_table_to_drop.lance"); assert!(table_dir.exists()); let reserved_file = table_dir.join(".lance-reserved"); assert!(reserved_file.exists()); diff --git a/rust/lance-namespace-impls/src/lib.rs b/rust/lance-namespace-impls/src/lib.rs index 27aaeeb5658..29cd4e0a372 100644 --- a/rust/lance-namespace-impls/src/lib.rs +++ b/rust/lance-namespace-impls/src/lib.rs @@ -7,21 +7,38 @@ //! //! ## Features //! -//! - `dir`: Directory-based namespace implementation that stores tables as Lance datasets +//! - `rest`: REST API-based namespace implementation +//! - `dir-aws`, `dir-azure`, `dir-gcp`, `dir-oss`: Cloud storage backend support for directory namespace (via lance-io) +//! +//! ## Implementations +//! +//! - `DirectoryNamespace`: Directory-based implementation (always available) +//! - `RestNamespace`: REST API-based implementation (requires `rest` feature) +//! +//! ## Usage +//! +//! The recommended way to connect to a namespace is using [`ConnectBuilder`]: +//! +//! ```no_run +//! # use lance_namespace_impls::ConnectBuilder; +//! # async fn example() -> Result<(), Box> { +//! let namespace = ConnectBuilder::new("dir") +//! .property("root", "/path/to/data") +//! .connect() +//! .await?; +//! # Ok(()) +//! # } +//! ``` pub mod connect; +pub mod dir; #[cfg(feature = "rest")] pub mod rest; -#[cfg(feature = "dir")] -pub mod dir; - -// Re-export connect function -pub use connect::connect; +// Re-export connect builder +pub use connect::ConnectBuilder; +pub use dir::{DirectoryNamespace, DirectoryNamespaceBuilder}; #[cfg(feature = "rest")] -pub use rest::RestNamespace; - -#[cfg(feature = "dir")] -pub use dir::{connect_dir, DirectoryNamespace, DirectoryNamespaceConfig}; +pub use rest::{RestNamespace, RestNamespaceBuilder}; diff --git a/rust/lance-namespace-impls/src/rest.rs b/rust/lance-namespace-impls/src/rest.rs index fd57fe2b43c..202fca7b68b 100644 --- a/rust/lance-namespace-impls/src/rest.rs +++ b/rust/lance-namespace-impls/src/rest.rs @@ -31,48 +31,121 @@ use lance_core::{box_error, Error, Result}; use lance_namespace::LanceNamespace; -/// Configuration for REST namespace +/// Builder for creating a RestNamespace. +/// +/// This builder provides a fluent API for configuring and establishing +/// connections to REST-based Lance namespaces. +/// +/// # Examples +/// +/// ```no_run +/// # use lance_namespace_impls::RestNamespaceBuilder; +/// # fn example() -> Result<(), Box> { +/// // Create a REST namespace +/// let namespace = RestNamespaceBuilder::new("http://localhost:8080") +/// .delimiter(".") +/// .header("Authorization", "Bearer token") +/// .build(); +/// # Ok(()) +/// # } +/// ``` #[derive(Debug, Clone)] -pub struct RestNamespaceConfig { - /// The delimiter used for object identifiers +pub struct RestNamespaceBuilder { + uri: String, delimiter: String, - /// Additional headers to send with requests - additional_headers: HashMap, - /// The base URI for the REST API - uri: Option, - /// Path to the client certificate file (PEM format) for mTLS + headers: HashMap, cert_file: Option, - /// Path to the client private key file (PEM format) for mTLS key_file: Option, - /// Path to the CA certificate file for server verification (PEM format) ssl_ca_cert: Option, - /// Whether to verify the hostname in the server's certificate assert_hostname: bool, } -impl RestNamespaceConfig { - /// Header prefix for additional headers - const HEADER_PREFIX: &'static str = "header."; - - /// Default delimiter +impl RestNamespaceBuilder { + /// Default delimiter for object identifiers const DEFAULT_DELIMITER: &'static str = "."; - /// Create a new configuration from a map of properties - pub fn new(properties: HashMap) -> Self { + /// Create a new RestNamespaceBuilder with the specified URI. + /// + /// # Arguments + /// + /// * `uri` - Base URI for the REST API + pub fn new(uri: impl Into) -> Self { + Self { + uri: uri.into(), + delimiter: Self::DEFAULT_DELIMITER.to_string(), + headers: HashMap::new(), + cert_file: None, + key_file: None, + ssl_ca_cert: None, + assert_hostname: true, + } + } + + /// Create a RestNamespaceBuilder from properties HashMap. + /// + /// This method parses a properties map into builder configuration. + /// It expects: + /// - `uri`: The base URI for the REST API (required) + /// - `delimiter`: Delimiter for object identifiers (optional, defaults to ".") + /// - `header.*`: Additional headers (optional, prefix will be stripped) + /// - `tls.cert_file`: Path to client certificate file (optional) + /// - `tls.key_file`: Path to client private key file (optional) + /// - `tls.ssl_ca_cert`: Path to CA certificate file (optional) + /// - `tls.assert_hostname`: Whether to verify hostname (optional, defaults to true) + /// + /// # Arguments + /// + /// * `properties` - Configuration properties + /// + /// # Returns + /// + /// Returns a `RestNamespaceBuilder` instance. + /// + /// # Errors + /// + /// Returns an error if the `uri` property is missing. + /// + /// # Examples + /// + /// ```no_run + /// # use lance_namespace_impls::RestNamespaceBuilder; + /// # use std::collections::HashMap; + /// # fn example() -> Result<(), Box> { + /// let mut properties = HashMap::new(); + /// properties.insert("uri".to_string(), "http://localhost:8080".to_string()); + /// properties.insert("delimiter".to_string(), "/".to_string()); + /// properties.insert("header.Authorization".to_string(), "Bearer token".to_string()); + /// + /// let namespace = RestNamespaceBuilder::from_properties(properties)? + /// .build(); + /// # Ok(()) + /// # } + /// ``` + pub fn from_properties(properties: HashMap) -> Result { + // Extract URI (required) + let uri = properties + .get("uri") + .cloned() + .ok_or_else(|| Error::Namespace { + source: "Missing required property 'uri' for REST namespace".into(), + location: snafu::location!(), + })?; + + // Extract delimiter (optional) let delimiter = properties .get("delimiter") .cloned() .unwrap_or_else(|| Self::DEFAULT_DELIMITER.to_string()); - let uri = properties.get("uri").cloned(); - - let mut additional_headers = HashMap::new(); + // Extract headers (properties prefixed with "header.") + let mut headers = HashMap::new(); for (key, value) in &properties { - if let Some(header_name) = key.strip_prefix(Self::HEADER_PREFIX) { - additional_headers.insert(header_name.to_string(), value.clone()); + if let Some(header_name) = key.strip_prefix("header.") { + headers.insert(header_name.to_string(), value.clone()); } } + // Extract TLS options let cert_file = properties.get("tls.cert_file").cloned(); let key_file = properties.get("tls.key_file").cloned(); let ssl_ca_cert = properties.get("tls.ssl_ca_cert").cloned(); @@ -81,30 +154,95 @@ impl RestNamespaceConfig { .and_then(|v| v.parse::().ok()) .unwrap_or(true); - Self { - delimiter, - additional_headers, + Ok(Self { uri, + delimiter, + headers, cert_file, key_file, ssl_ca_cert, assert_hostname, - } + }) + } + + /// Set the delimiter for object identifiers. + /// + /// # Arguments + /// + /// * `delimiter` - Delimiter string (e.g., ".", "/") + pub fn delimiter(mut self, delimiter: impl Into) -> Self { + self.delimiter = delimiter.into(); + self + } + + /// Add a custom header to the HTTP requests. + /// + /// # Arguments + /// + /// * `name` - Header name + /// * `value` - Header value + pub fn header(mut self, name: impl Into, value: impl Into) -> Self { + self.headers.insert(name.into(), value.into()); + self + } + + /// Add multiple custom headers to the HTTP requests. + /// + /// # Arguments + /// + /// * `headers` - HashMap of headers to add + pub fn headers(mut self, headers: HashMap) -> Self { + self.headers.extend(headers); + self } - /// Get the delimiter - pub fn delimiter(&self) -> &str { - &self.delimiter + /// Set the client certificate file for mTLS. + /// + /// # Arguments + /// + /// * `cert_file` - Path to the certificate file (PEM format) + pub fn cert_file(mut self, cert_file: impl Into) -> Self { + self.cert_file = Some(cert_file.into()); + self } - /// Get additional headers - pub fn additional_headers(&self) -> &HashMap { - &self.additional_headers + /// Set the client private key file for mTLS. + /// + /// # Arguments + /// + /// * `key_file` - Path to the private key file (PEM format) + pub fn key_file(mut self, key_file: impl Into) -> Self { + self.key_file = Some(key_file.into()); + self } - /// Get the URI - pub fn uri(&self) -> Option<&str> { - self.uri.as_deref() + /// Set the CA certificate file for server verification. + /// + /// # Arguments + /// + /// * `ssl_ca_cert` - Path to the CA certificate file (PEM format) + pub fn ssl_ca_cert(mut self, ssl_ca_cert: impl Into) -> Self { + self.ssl_ca_cert = Some(ssl_ca_cert.into()); + self + } + + /// Set whether to verify the hostname in the server's certificate. + /// + /// # Arguments + /// + /// * `assert_hostname` - Whether to verify hostname + pub fn assert_hostname(mut self, assert_hostname: bool) -> Self { + self.assert_hostname = assert_hostname; + self + } + + /// Build the RestNamespace. + /// + /// # Returns + /// + /// Returns a `RestNamespace` instance. + pub fn build(self) -> RestNamespace { + RestNamespace::from_builder(self) } } @@ -144,23 +282,33 @@ fn convert_api_error(err: lance_namespace::apis::Error) - } /// REST implementation of Lance Namespace +/// +/// # Examples +/// +/// ```no_run +/// # use lance_namespace_impls::RestNamespaceBuilder; +/// # fn example() -> Result<(), Box> { +/// // Use the builder to create a namespace +/// let namespace = RestNamespaceBuilder::new("http://localhost:8080") +/// .build(); +/// # Ok(()) +/// # } +/// ``` pub struct RestNamespace { - config: RestNamespaceConfig, + delimiter: String, reqwest_config: Configuration, } impl RestNamespace { - /// Create a new REST namespace with the given configuration - pub fn new(properties: HashMap) -> Self { - let config = RestNamespaceConfig::new(properties); - + /// Create a new REST namespace from builder + pub(crate) fn from_builder(builder: RestNamespaceBuilder) -> Self { // Build reqwest client with custom headers if provided let mut client_builder = reqwest::Client::builder(); // Add custom headers to the client - if !config.additional_headers().is_empty() { + if !builder.headers.is_empty() { let mut headers = reqwest::header::HeaderMap::new(); - for (key, value) in config.additional_headers() { + for (key, value) in &builder.headers { if let (Ok(header_name), Ok(header_value)) = ( reqwest::header::HeaderName::from_bytes(key.as_bytes()), reqwest::header::HeaderValue::from_str(value), @@ -172,7 +320,7 @@ impl RestNamespace { } // Configure mTLS if certificate and key files are provided - if let (Some(cert_file), Some(key_file)) = (&config.cert_file, &config.key_file) { + if let (Some(cert_file), Some(key_file)) = (&builder.cert_file, &builder.key_file) { if let (Ok(cert), Ok(key)) = (std::fs::read(cert_file), std::fs::read(key_file)) { if let Ok(identity) = reqwest::Identity::from_pem(&[&cert[..], &key[..]].concat()) { client_builder = client_builder.identity(identity); @@ -181,7 +329,7 @@ impl RestNamespace { } // Load CA certificate for server verification - if let Some(ca_cert_file) = &config.ssl_ca_cert { + if let Some(ca_cert_file) = &builder.ssl_ca_cert { if let Ok(ca_cert) = std::fs::read(ca_cert_file) { if let Ok(ca_cert) = reqwest::Certificate::from_pem(&ca_cert) { client_builder = client_builder.add_root_certificate(ca_cert); @@ -190,7 +338,7 @@ impl RestNamespace { } // Configure hostname verification - client_builder = client_builder.danger_accept_invalid_hostnames(!config.assert_hostname); + client_builder = client_builder.danger_accept_invalid_hostnames(!builder.assert_hostname); let client = client_builder .build() @@ -198,26 +346,19 @@ impl RestNamespace { let mut reqwest_config = Configuration::new(); reqwest_config.client = client; - if let Some(uri) = config.uri() { - reqwest_config.base_path = uri.to_string(); - } + reqwest_config.base_path = builder.uri; Self { - config, + delimiter: builder.delimiter, reqwest_config, } } /// Create a new REST namespace with custom configuration (for testing) #[cfg(test)] - pub fn with_configuration( - properties: HashMap, - reqwest_config: Configuration, - ) -> Self { - let config = RestNamespaceConfig::new(properties); - + pub fn with_configuration(delimiter: String, reqwest_config: Configuration) -> Self { Self { - config, + delimiter, reqwest_config, } } @@ -229,12 +370,12 @@ impl LanceNamespace for RestNamespace { &self, request: ListNamespacesRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; namespace_api::list_namespaces( &self.reqwest_config, &id, - Some(self.config.delimiter()), + Some(&self.delimiter), request.page_token.as_deref(), request.limit, ) @@ -246,67 +387,47 @@ impl LanceNamespace for RestNamespace { &self, request: DescribeNamespaceRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - namespace_api::describe_namespace( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + namespace_api::describe_namespace(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn create_namespace( &self, request: CreateNamespaceRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - namespace_api::create_namespace( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + namespace_api::create_namespace(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - namespace_api::drop_namespace( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + namespace_api::drop_namespace(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - namespace_api::namespace_exists( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + namespace_api::namespace_exists(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn list_tables(&self, request: ListTablesRequest) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; table_api::list_tables( &self.reqwest_config, &id, - Some(self.config.delimiter()), + Some(&self.delimiter), request.page_token.as_deref(), request.limit, ) @@ -315,84 +436,54 @@ impl LanceNamespace for RestNamespace { } async fn describe_table(&self, request: DescribeTableRequest) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::describe_table( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::describe_table(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn register_table(&self, request: RegisterTableRequest) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::register_table( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::register_table(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn table_exists(&self, request: TableExistsRequest) -> Result<()> { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::table_exists( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::table_exists(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn drop_table(&self, request: DropTableRequest) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::drop_table( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::drop_table(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn deregister_table( &self, request: DeregisterTableRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::deregister_table( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::deregister_table(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn count_table_rows(&self, request: CountTableRowsRequest) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::count_table_rows( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::count_table_rows(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn create_table( @@ -400,7 +491,7 @@ impl LanceNamespace for RestNamespace { request: CreateTableRequest, request_data: Bytes, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; let properties_json = request .properties @@ -418,7 +509,7 @@ impl LanceNamespace for RestNamespace { &self.reqwest_config, &id, request_data.to_vec(), - Some(self.config.delimiter()), + Some(&self.delimiter), mode, request.location.as_deref(), properties_json.as_deref(), @@ -431,16 +522,11 @@ impl LanceNamespace for RestNamespace { &self, request: CreateEmptyTableRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::create_empty_table( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::create_empty_table(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn insert_into_table( @@ -448,7 +534,7 @@ impl LanceNamespace for RestNamespace { request: InsertIntoTableRequest, request_data: Bytes, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; use lance_namespace::models::insert_into_table_request::Mode; let mode = request.mode.as_ref().map(|m| match m { @@ -460,7 +546,7 @@ impl LanceNamespace for RestNamespace { &self.reqwest_config, &id, request_data.to_vec(), - Some(self.config.delimiter()), + Some(&self.delimiter), mode, ) .await @@ -472,7 +558,7 @@ impl LanceNamespace for RestNamespace { request: MergeInsertIntoTableRequest, request_data: Bytes, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; let on = request.on.as_deref().ok_or_else(|| Error::Namespace { source: "'on' field is required for merge insert".into(), @@ -484,7 +570,7 @@ impl LanceNamespace for RestNamespace { &id, on, request_data.to_vec(), - Some(self.config.delimiter()), + Some(&self.delimiter), request.when_matched_update_all, request.when_matched_update_all_filt.as_deref(), request.when_not_matched_insert_all, @@ -496,45 +582,31 @@ impl LanceNamespace for RestNamespace { } async fn update_table(&self, request: UpdateTableRequest) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::update_table( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::update_table(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn delete_from_table( &self, request: DeleteFromTableRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::delete_from_table( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::delete_from_table(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn query_table(&self, request: QueryTableRequest) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - let response = table_api::query_table( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error)?; + let response = + table_api::query_table(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error)?; // Convert response to bytes let bytes = response.bytes().await.map_err(|e| Error::IO { @@ -549,39 +621,29 @@ impl LanceNamespace for RestNamespace { &self, request: CreateTableIndexRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::create_table_index( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::create_table_index(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn list_table_indices( &self, request: ListTableIndicesRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; - table_api::list_table_indices( - &self.reqwest_config, - &id, - request, - Some(self.config.delimiter()), - ) - .await - .map_err(convert_api_error) + table_api::list_table_indices(&self.reqwest_config, &id, request, Some(&self.delimiter)) + .await + .map_err(convert_api_error) } async fn describe_table_index_stats( &self, request: DescribeTableIndexStatsRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; // Note: The index_name parameter seems to be missing from the request structure // This might need to be adjusted based on the actual API @@ -592,7 +654,7 @@ impl LanceNamespace for RestNamespace { &id, index_name, request, - Some(self.config.delimiter()), + Some(&self.delimiter), ) .await .map_err(convert_api_error) @@ -602,13 +664,13 @@ impl LanceNamespace for RestNamespace { &self, request: DescribeTransactionRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; transaction_api::describe_transaction( &self.reqwest_config, &id, request, - Some(self.config.delimiter()), + Some(&self.delimiter), ) .await .map_err(convert_api_error) @@ -618,13 +680,13 @@ impl LanceNamespace for RestNamespace { &self, request: AlterTransactionRequest, ) -> Result { - let id = object_id_str(&request.id, self.config.delimiter())?; + let id = object_id_str(&request.id, &self.delimiter)?; transaction_api::alter_transaction( &self.reqwest_config, &id, request, - Some(self.config.delimiter()), + Some(&self.delimiter), ) .await .map_err(convert_api_error) @@ -641,10 +703,9 @@ mod tests { /// Create a test REST namespace instance fn create_test_namespace() -> RestNamespace { - let mut properties = HashMap::new(); - properties.insert("uri".to_string(), "http://localhost:8080".to_string()); - properties.insert("delimiter".to_string(), ".".to_string()); - RestNamespace::new(properties) + RestNamespaceBuilder::new("http://localhost:8080") + .delimiter(".") + .build() } #[test] @@ -658,7 +719,9 @@ mod tests { ); properties.insert("header.X-Custom".to_string(), "value".to_string()); - let _namespace = RestNamespace::new(properties); + let _namespace = RestNamespaceBuilder::from_properties(properties) + .expect("Failed to create namespace builder") + .build(); // Successfully created the namespace - test passes if no panic } @@ -697,7 +760,9 @@ mod tests { "custom-value".to_string(), ); - let namespace = RestNamespace::new(properties); + let namespace = RestNamespaceBuilder::from_properties(properties) + .expect("Failed to create namespace builder") + .build(); let request = ListNamespacesRequest { id: Some(vec!["test".to_string()]), @@ -713,8 +778,11 @@ mod tests { #[test] fn test_default_configuration() { - let properties = HashMap::new(); - let _namespace = RestNamespace::new(properties); + let mut properties = HashMap::new(); + properties.insert("uri".to_string(), "http://localhost:8080".to_string()); + let _namespace = RestNamespaceBuilder::from_properties(properties) + .expect("Failed to create namespace builder") + .build(); // The default delimiter should be "." - test passes if no panic } @@ -724,7 +792,9 @@ mod tests { let mut properties = HashMap::new(); properties.insert("uri".to_string(), "https://api.example.com/v1".to_string()); - let _namespace = RestNamespace::new(properties); + let _namespace = RestNamespaceBuilder::from_properties(properties) + .expect("Failed to create namespace builder") + .build(); // Test passes if no panic } @@ -737,33 +807,38 @@ mod tests { properties.insert("tls.ssl_ca_cert".to_string(), "/path/to/ca.pem".to_string()); properties.insert("tls.assert_hostname".to_string(), "true".to_string()); - let config = RestNamespaceConfig::new(properties); - assert_eq!(config.cert_file, Some("/path/to/cert.pem".to_string())); - assert_eq!(config.key_file, Some("/path/to/key.pem".to_string())); - assert_eq!(config.ssl_ca_cert, Some("/path/to/ca.pem".to_string())); - assert!(config.assert_hostname); + let builder = RestNamespaceBuilder::from_properties(properties) + .expect("Failed to create namespace builder"); + assert_eq!(builder.cert_file, Some("/path/to/cert.pem".to_string())); + assert_eq!(builder.key_file, Some("/path/to/key.pem".to_string())); + assert_eq!(builder.ssl_ca_cert, Some("/path/to/ca.pem".to_string())); + assert!(builder.assert_hostname); } #[test] fn test_tls_config_default_assert_hostname() { let mut properties = HashMap::new(); + properties.insert("uri".to_string(), "https://api.example.com".to_string()); properties.insert("tls.cert_file".to_string(), "/path/to/cert.pem".to_string()); properties.insert("tls.key_file".to_string(), "/path/to/key.pem".to_string()); - let config = RestNamespaceConfig::new(properties); + let builder = RestNamespaceBuilder::from_properties(properties) + .expect("Failed to create namespace builder"); // Default should be true - assert!(config.assert_hostname); + assert!(builder.assert_hostname); } #[test] fn test_tls_config_disable_hostname_verification() { let mut properties = HashMap::new(); + properties.insert("uri".to_string(), "https://api.example.com".to_string()); properties.insert("tls.cert_file".to_string(), "/path/to/cert.pem".to_string()); properties.insert("tls.key_file".to_string(), "/path/to/key.pem".to_string()); properties.insert("tls.assert_hostname".to_string(), "false".to_string()); - let config = RestNamespaceConfig::new(properties); - assert!(!config.assert_hostname); + let builder = RestNamespaceBuilder::from_properties(properties) + .expect("Failed to create namespace builder"); + assert!(!builder.assert_hostname); } #[test] @@ -784,7 +859,9 @@ mod tests { ); // Should not panic even with nonexistent files (they're just ignored) - let _namespace = RestNamespace::new(properties); + let _namespace = RestNamespaceBuilder::from_properties(properties) + .expect("Failed to create namespace builder") + .build(); } #[tokio::test] @@ -805,14 +882,10 @@ mod tests { .await; // Create namespace with mock server URL - let mut properties = HashMap::new(); - properties.insert("uri".to_string(), mock_server.uri()); - properties.insert("delimiter".to_string(), ".".to_string()); - let mut reqwest_config = Configuration::new(); reqwest_config.base_path = mock_server.uri(); - let namespace = RestNamespace::with_configuration(properties, reqwest_config); + let namespace = RestNamespace::with_configuration(".".to_string(), reqwest_config); let request = ListNamespacesRequest { id: Some(vec!["test".to_string()]), @@ -848,13 +921,10 @@ mod tests { .await; // Create namespace with mock server URL - let mut properties = HashMap::new(); - properties.insert("uri".to_string(), mock_server.uri()); - let mut reqwest_config = Configuration::new(); reqwest_config.base_path = mock_server.uri(); - let namespace = RestNamespace::with_configuration(properties, reqwest_config); + let namespace = RestNamespace::with_configuration(".".to_string(), reqwest_config); let request = ListNamespacesRequest { id: Some(vec!["test".to_string()]), @@ -903,13 +973,10 @@ mod tests { .await; // Create namespace with mock server URL - let mut properties = HashMap::new(); - properties.insert("uri".to_string(), mock_server.uri()); - let mut reqwest_config = Configuration::new(); reqwest_config.base_path = mock_server.uri(); - let namespace = RestNamespace::with_configuration(properties, reqwest_config); + let namespace = RestNamespace::with_configuration(".".to_string(), reqwest_config); let request = CreateNamespaceRequest { id: Some(vec!["test".to_string(), "newnamespace".to_string()]), @@ -942,13 +1009,10 @@ mod tests { .await; // Create namespace with mock server URL - let mut properties = HashMap::new(); - properties.insert("uri".to_string(), mock_server.uri()); - let mut reqwest_config = Configuration::new(); reqwest_config.base_path = mock_server.uri(); - let namespace = RestNamespace::with_configuration(properties, reqwest_config); + let namespace = RestNamespace::with_configuration(".".to_string(), reqwest_config); let request = CreateTableRequest { id: Some(vec![ @@ -983,13 +1047,10 @@ mod tests { .await; // Create namespace with mock server URL - let mut properties = HashMap::new(); - properties.insert("uri".to_string(), mock_server.uri()); - let mut reqwest_config = Configuration::new(); reqwest_config.base_path = mock_server.uri(); - let namespace = RestNamespace::with_configuration(properties, reqwest_config); + let namespace = RestNamespace::with_configuration(".".to_string(), reqwest_config); let request = InsertIntoTableRequest { id: Some(vec![