Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 13 additions & 8 deletions rust/lance-namespace-impls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ rust-version.workspace = true
[features]
default = []
rest = ["dep:reqwest", "dep:serde_json", "dep:url"]
dir = ["dep:lance", "dep:opendal", "dep:arrow", "dep:arrow-ipc", "dep:arrow-schema", "dep:url"]
# Cloud storage features for directory implementation - align with lance-io
dir-gcp = ["lance-io/gcp"]
dir-aws = ["lance-io/aws"]
dir-azure = ["lance-io/azure"]
dir-oss = ["lance-io/oss"]

[dependencies]
lance-namespace.workspace = true
Expand All @@ -30,15 +34,16 @@ reqwest = { version = "0.12", optional = true, default-features = false, feature
"rustls-tls-native-roots"
] }
serde_json = { workspace = true, optional = true }

# Directory implementation dependencies (optional, enabled by "dir" feature)
lance = { workspace = true, optional = true }
opendal = { workspace = true, optional = true, features = ["services-fs", "services-s3", "services-gcs", "services-azblob"] }
arrow = { workspace = true, optional = true }
arrow-ipc = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
url = { workspace = true, optional = true }

# Directory implementation dependencies (always enabled)
lance = { workspace = true }
lance-io = { workspace = true }
object_store = { workspace = true }
arrow = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }

# Common dependencies
async-trait.workspace = true
bytes.workspace = true
Expand Down
13 changes: 7 additions & 6 deletions rust/lance-namespace-impls/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,24 @@ This crate provides concrete implementations of the Lance namespace trait:

- Unified connection interface for all implementations
- **REST Namespace** - REST API client for remote Lance namespace servers (feature: `rest`)
- **Directory Namespace** - File system-based namespace that stores tables as Lance datasets (feature: `dir`)
- **Directory Namespace** - File system-based namespace that stores tables as Lance datasets (always available)

## Features

### REST Namespace (feature: `rest`)

The REST namespace implementation provides a client for connecting to remote Lance namespace servers via REST API.

### Directory Namespace (feature: `dir`)
### Directory Namespace (always available)

The directory namespace implementation stores tables as Lance datasets in a directory structure on local or cloud storage.

Supported storage backends:
- Local filesystem
- AWS S3
- Google Cloud Storage (GCS)
- Azure Blob Storage
- Local filesystem (always available)
- AWS S3 (feature: `dir-aws`)
- Google Cloud Storage (feature: `dir-gcp`)
- Azure Blob Storage (feature: `dir-azure`)
- Alibaba Cloud OSS (feature: `dir-oss`)

## Usage

Expand Down
270 changes: 195 additions & 75 deletions rust/lance-namespace-impls/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,100 +6,220 @@
use std::collections::HashMap;
use std::sync::Arc;

use lance::session::Session;
use lance_core::{Error, Result};
use lance_namespace::LanceNamespace;

/// Connect to a Lance namespace implementation.
/// Builder for creating Lance namespace connections.
///
/// This function creates a connection to a Lance namespace backend based on
/// the specified implementation type and configuration properties.
///
/// # Arguments
///
/// * `impl_name` - Implementation identifier. Supported values:
/// - "rest": REST API implementation (requires "rest" feature)
/// - "dir": Directory-based implementation (requires "dir" feature)
///
/// * `properties` - Configuration properties specific to the implementation.
/// Common properties:
/// - For REST: "uri" (base URL), "delimiter", "header.*" (custom headers)
/// - For DIR: "root" (directory path), "storage.*" (storage options)
///
/// # Returns
///
/// Returns a boxed trait object implementing the `LanceNamespace` trait.
/// This builder provides a fluent API for configuring and establishing
/// connections to Lance namespace implementations.
///
/// # Examples
///
/// ```no_run
/// use lance_namespace_impls::connect;
/// use std::collections::HashMap;
///
/// # use lance_namespace_impls::ConnectBuilder;
/// # use std::collections::HashMap;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// // Connect to REST implementation
/// let mut props = HashMap::new();
/// props.insert("uri".to_string(), "http://localhost:8080".to_string());
/// let namespace = connect("rest", props).await?;
/// // Connect to directory implementation
/// let namespace = ConnectBuilder::new("dir")
/// .property("root", "/path/to/data")
/// .property("storage.region", "us-west-2")
/// .connect()
/// .await?;
/// # Ok(())
/// # }
/// ```
///
/// ```no_run
/// # #[cfg(feature = "dir")]
/// # use lance_namespace_impls::ConnectBuilder;
/// # use lance::session::Session;
/// # use std::sync::Arc;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// use lance_namespace_impls::connect;
/// use std::collections::HashMap;
///
/// // Connect to directory implementation (requires "dir" feature)
/// let mut props = HashMap::new();
/// props.insert("root".to_string(), "/path/to/data".to_string());
/// let namespace = connect("dir", props).await?;
/// // Connect with a shared session
/// let session = Arc::new(Session::default());
/// let namespace = ConnectBuilder::new("dir")
/// .property("root", "/path/to/data")
/// .session(session)
/// .connect()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn connect(
impl_name: &str,
#[allow(unused)] properties: HashMap<String, String>,
) -> Result<Arc<dyn LanceNamespace>> {
match impl_name {
#[cfg(feature = "rest")]
"rest" => {
// Create REST implementation
Ok(Arc::new(crate::rest::RestNamespace::new(properties)))
}
#[cfg(not(feature = "rest"))]
"rest" => Err(Error::Namespace {
source: "REST namespace implementation requires 'rest' feature to be enabled".into(),
location: snafu::location!(),
}),
#[cfg(feature = "dir")]
"dir" => {
// Create directory implementation
crate::dir::connect_dir(properties).await
#[derive(Debug, Clone)]
pub struct ConnectBuilder {
impl_name: String,
properties: HashMap<String, String>,
session: Option<Arc<Session>>,
}

impl ConnectBuilder {
/// Create a new ConnectBuilder for the specified implementation.
///
/// # Arguments
///
/// * `impl_name` - Implementation identifier ("dir", "rest", etc.)
pub fn new(impl_name: impl Into<String>) -> Self {
Self {
impl_name: impl_name.into(),
properties: HashMap::new(),
session: None,
}
#[cfg(not(feature = "dir"))]
"dir" => Err(Error::Namespace {
source: "Directory namespace implementation requires 'dir' feature to be enabled"
}

/// Add a configuration property.
///
/// # Arguments
///
/// * `key` - Property key
/// * `value` - Property value
pub fn property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.properties.insert(key.into(), value.into());
self
}

/// Add multiple configuration properties.
///
/// # Arguments
///
/// * `properties` - HashMap of properties to add
pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
self.properties.extend(properties);
self
}

/// Set the Lance session to use for this connection.
///
/// When a session is provided, the namespace will reuse the session's
/// object store registry, allowing multiple namespaces and datasets
/// to share the same underlying storage connections.
///
/// # Arguments
///
/// * `session` - Arc-wrapped Lance session
pub fn session(mut self, session: Arc<Session>) -> Self {
self.session = Some(session);
self
}

/// Build and establish the connection to the namespace.
///
/// # Returns
///
/// Returns a trait object implementing `LanceNamespace`.
///
/// # Errors
///
/// Returns an error if:
/// - The implementation type is not supported
/// - Required configuration properties are missing
/// - Connection to the backend fails
pub async fn connect(self) -> Result<Arc<dyn LanceNamespace>> {
match self.impl_name.as_str() {
#[cfg(feature = "rest")]
"rest" => {
// Create REST implementation (REST doesn't use session)
crate::rest::RestNamespaceBuilder::from_properties(self.properties)
.map(|builder| Arc::new(builder.build()) as Arc<dyn LanceNamespace>)
}
#[cfg(not(feature = "rest"))]
"rest" => Err(Error::Namespace {
source: "REST namespace implementation requires 'rest' feature to be enabled"
.into(),
location: snafu::location!(),
}),
"dir" => {
// Create directory implementation (always available)
crate::dir::DirectoryNamespaceBuilder::from_properties(
self.properties,
self.session,
)?
.build()
.await
.map(|ns| Arc::new(ns) as Arc<dyn LanceNamespace>)
}
_ => Err(Error::Namespace {
source: format!(
"Implementation '{}' is not available. Supported: dir{}",
self.impl_name,
if cfg!(feature = "rest") { ", rest" } else { "" }
)
.into(),
location: snafu::location!(),
}),
_ => Err(Error::Namespace {
source: format!(
"Implementation '{}' is not available. Supported: {}{}",
impl_name,
if cfg!(feature = "rest") { "rest" } else { "" },
if cfg!(feature = "dir") {
if cfg!(feature = "rest") {
", dir"
} else {
"dir"
}
} else {
""
}
)
.into(),
location: snafu::location!(),
}),
location: snafu::location!(),
}),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use lance_core::utils::tempfile::TempStdDir;
use lance_namespace::models::ListTablesRequest;
use std::collections::HashMap;

#[tokio::test]
async fn test_connect_builder_basic() {
let temp_dir = TempStdDir::default();

let namespace = ConnectBuilder::new("dir")
.property("root", temp_dir.to_str().unwrap())
.connect()
.await
.unwrap();

// Verify we can use the namespace
let request = ListTablesRequest::new();
let response = namespace.list_tables(request).await.unwrap();
assert_eq!(response.tables.len(), 0);
}

#[tokio::test]
async fn test_connect_builder_with_properties() {
let temp_dir = TempStdDir::default();
let mut props = HashMap::new();
props.insert("storage.option1".to_string(), "value1".to_string());

let namespace = ConnectBuilder::new("dir")
.property("root", temp_dir.to_str().unwrap())
.properties(props)
.connect()
.await
.unwrap();

// Verify we can use the namespace
let request = ListTablesRequest::new();
let response = namespace.list_tables(request).await.unwrap();
assert_eq!(response.tables.len(), 0);
}

#[tokio::test]
async fn test_connect_builder_with_session() {
let temp_dir = TempStdDir::default();
let session = Arc::new(Session::default());

let namespace = ConnectBuilder::new("dir")
.property("root", temp_dir.to_str().unwrap())
.session(session.clone())
.connect()
.await
.unwrap();

// Verify we can use the namespace
let request = ListTablesRequest::new();
let response = namespace.list_tables(request).await.unwrap();
assert_eq!(response.tables.len(), 0);
}

#[tokio::test]
async fn test_connect_builder_invalid_impl() {
let result = ConnectBuilder::new("invalid")
.property("root", "/tmp")
.connect()
.await;

assert!(result.is_err());
let err = result.err().unwrap();
assert!(err.to_string().contains("not available"));
}
}
Loading
Loading