Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 13 additions & 8 deletions rust/lance-namespace-impls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ rust-version.workspace = true
[features]
default = []
rest = ["dep:reqwest", "dep:serde_json", "dep:url"]
dir = ["dep:lance", "dep:opendal", "dep:arrow", "dep:arrow-ipc", "dep:arrow-schema", "dep:url"]
# Cloud storage features for directory implementation - align with lance-io
dir-gcp = ["lance-io/gcp"]
dir-aws = ["lance-io/aws"]
dir-azure = ["lance-io/azure"]
dir-oss = ["lance-io/oss"]

[dependencies]
lance-namespace.workspace = true
Expand All @@ -30,15 +34,16 @@ reqwest = { version = "0.12", optional = true, default-features = false, feature
"rustls-tls-native-roots"
] }
serde_json = { workspace = true, optional = true }

# Directory implementation dependencies (optional, enabled by "dir" feature)
lance = { workspace = true, optional = true }
opendal = { workspace = true, optional = true, features = ["services-fs", "services-s3", "services-gcs", "services-azblob"] }
arrow = { workspace = true, optional = true }
arrow-ipc = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
url = { workspace = true, optional = true }

# Directory implementation dependencies (always enabled)
lance = { workspace = true }
lance-io = { workspace = true }
object_store = { workspace = true }
arrow = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }

# Common dependencies
async-trait.workspace = true
bytes.workspace = true
Expand Down
13 changes: 7 additions & 6 deletions rust/lance-namespace-impls/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,24 @@ This crate provides concrete implementations of the Lance namespace trait:

- Unified connection interface for all implementations
- **REST Namespace** - REST API client for remote Lance namespace servers (feature: `rest`)
- **Directory Namespace** - File system-based namespace that stores tables as Lance datasets (feature: `dir`)
- **Directory Namespace** - File system-based namespace that stores tables as Lance datasets (always available)

## Features

### REST Namespace (feature: `rest`)

The REST namespace implementation provides a client for connecting to remote Lance namespace servers via REST API.

### Directory Namespace (feature: `dir`)
### Directory Namespace (always available)

The directory namespace implementation stores tables as Lance datasets in a directory structure on local or cloud storage.

Supported storage backends:
- Local filesystem
- AWS S3
- Google Cloud Storage (GCS)
- Azure Blob Storage
- Local filesystem (always available)
- AWS S3 (feature: `dir-aws`)
- Google Cloud Storage (feature: `dir-gcp`)
- Azure Blob Storage (feature: `dir-azure`)
- Alibaba Cloud OSS (feature: `dir-oss`)

## Usage

Expand Down
270 changes: 195 additions & 75 deletions rust/lance-namespace-impls/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,100 +6,220 @@
use std::collections::HashMap;
use std::sync::Arc;

use lance::session::Session;
use lance_core::{Error, Result};
use lance_namespace::LanceNamespace;

/// Connect to a Lance namespace implementation.
/// Builder for creating Lance namespace connections.
///
/// This function creates a connection to a Lance namespace backend based on
/// the specified implementation type and configuration properties.
///
/// # Arguments
///
/// * `impl_name` - Implementation identifier. Supported values:
/// - "rest": REST API implementation (requires "rest" feature)
/// - "dir": Directory-based implementation (requires "dir" feature)
///
/// * `properties` - Configuration properties specific to the implementation.
/// Common properties:
/// - For REST: "uri" (base URL), "delimiter", "header.*" (custom headers)
/// - For DIR: "root" (directory path), "storage.*" (storage options)
///
/// # Returns
///
/// Returns a boxed trait object implementing the `LanceNamespace` trait.
/// This builder provides a fluent API for configuring and establishing
/// connections to Lance namespace implementations.
///
/// # Examples
///
/// ```no_run
/// use lance_namespace_impls::connect;
/// use std::collections::HashMap;
///
/// # use lance_namespace_impls::ConnectBuilder;
/// # use std::collections::HashMap;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// // Connect to REST implementation
/// let mut props = HashMap::new();
/// props.insert("uri".to_string(), "http://localhost:8080".to_string());
/// let namespace = connect("rest", props).await?;
/// // Connect to directory implementation
/// let namespace = ConnectBuilder::new("dir")
/// .property("root", "/path/to/data")
/// .property("storage.region", "us-west-2")
/// .connect()
/// .await?;
/// # Ok(())
/// # }
/// ```
///
/// ```no_run
/// # #[cfg(feature = "dir")]
/// # use lance_namespace_impls::ConnectBuilder;
/// # use lance::session::Session;
/// # use std::sync::Arc;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// use lance_namespace_impls::connect;
/// use std::collections::HashMap;
///
/// // Connect to directory implementation (requires "dir" feature)
/// let mut props = HashMap::new();
/// props.insert("root".to_string(), "/path/to/data".to_string());
/// let namespace = connect("dir", props).await?;
/// // Connect with a shared session
/// let session = Arc::new(Session::default());
/// let namespace = ConnectBuilder::new("dir")
/// .property("root", "/path/to/data")
/// .session(session)
/// .connect()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn connect(
impl_name: &str,
#[allow(unused)] properties: HashMap<String, String>,
) -> Result<Arc<dyn LanceNamespace>> {
match impl_name {
#[cfg(feature = "rest")]
"rest" => {
// Create REST implementation
Ok(Arc::new(crate::rest::RestNamespace::new(properties)))
}
#[cfg(not(feature = "rest"))]
"rest" => Err(Error::Namespace {
source: "REST namespace implementation requires 'rest' feature to be enabled".into(),
location: snafu::location!(),
}),
#[cfg(feature = "dir")]
"dir" => {
// Create directory implementation
crate::dir::connect_dir(properties).await
#[derive(Debug, Clone)]
pub struct ConnectBuilder {
impl_name: String,
properties: HashMap<String, String>,
session: Option<Arc<Session>>,
}

impl ConnectBuilder {
/// Create a new ConnectBuilder for the specified implementation.
///
/// # Arguments
///
/// * `impl_name` - Implementation identifier ("dir", "rest", etc.)
pub fn new(impl_name: impl Into<String>) -> Self {
Self {
impl_name: impl_name.into(),
properties: HashMap::new(),
session: None,
}
#[cfg(not(feature = "dir"))]
"dir" => Err(Error::Namespace {
source: "Directory namespace implementation requires 'dir' feature to be enabled"
}

/// Add a configuration property.
///
/// # Arguments
///
/// * `key` - Property key
/// * `value` - Property value
pub fn property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.properties.insert(key.into(), value.into());
self
}

/// Add multiple configuration properties.
///
/// # Arguments
///
/// * `properties` - HashMap of properties to add
pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
self.properties.extend(properties);
self
}

/// Set the Lance session to use for this connection.
///
/// When a session is provided, the namespace will reuse the session's
/// object store registry, allowing multiple namespaces and datasets
/// to share the same underlying storage connections.
///
/// # Arguments
///
/// * `session` - Arc-wrapped Lance session
pub fn session(mut self, session: Arc<Session>) -> Self {
self.session = Some(session);
self
}

/// Build and establish the connection to the namespace.
///
/// # Returns
///
/// Returns a trait object implementing `LanceNamespace`.
///
/// # Errors
///
/// Returns an error if:
/// - The implementation type is not supported
/// - Required configuration properties are missing
/// - Connection to the backend fails
pub async fn connect(self) -> Result<Arc<dyn LanceNamespace>> {
match self.impl_name.as_str() {
#[cfg(feature = "rest")]
"rest" => {
// Create REST implementation (REST doesn't use session)
crate::rest::RestNamespaceBuilder::from_properties(self.properties)
.map(|builder| Arc::new(builder.build()) as Arc<dyn LanceNamespace>)
}
#[cfg(not(feature = "rest"))]
"rest" => Err(Error::Namespace {
source: "REST namespace implementation requires 'rest' feature to be enabled"
.into(),
location: snafu::location!(),
}),
"dir" => {
// Create directory implementation (always available)
crate::dir::DirectoryNamespaceBuilder::from_properties(
self.properties,
self.session,
)?
.build()
.await
.map(|ns| Arc::new(ns) as Arc<dyn LanceNamespace>)
}
_ => Err(Error::Namespace {
source: format!(
"Implementation '{}' is not available. Supported: dir{}",
self.impl_name,
if cfg!(feature = "rest") { ", rest" } else { "" }
)
.into(),
location: snafu::location!(),
}),
_ => Err(Error::Namespace {
source: format!(
"Implementation '{}' is not available. Supported: {}{}",
impl_name,
if cfg!(feature = "rest") { "rest" } else { "" },
if cfg!(feature = "dir") {
if cfg!(feature = "rest") {
", dir"
} else {
"dir"
}
} else {
""
}
)
.into(),
location: snafu::location!(),
}),
location: snafu::location!(),
}),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use lance_core::utils::tempfile::TempStdDir;
use lance_namespace::models::ListTablesRequest;
use std::collections::HashMap;

#[tokio::test]
async fn test_connect_builder_basic() {
let temp_dir = TempStdDir::default();

let namespace = ConnectBuilder::new("dir")
.property("root", temp_dir.to_str().unwrap())
.connect()
.await
.unwrap();

// Verify we can use the namespace
let request = ListTablesRequest::new();
let response = namespace.list_tables(request).await.unwrap();
assert_eq!(response.tables.len(), 0);
}

#[tokio::test]
async fn test_connect_builder_with_properties() {
let temp_dir = TempStdDir::default();
let mut props = HashMap::new();
props.insert("storage.option1".to_string(), "value1".to_string());

let namespace = ConnectBuilder::new("dir")
.property("root", temp_dir.to_str().unwrap())
.properties(props)
.connect()
.await
.unwrap();

// Verify we can use the namespace
let request = ListTablesRequest::new();
let response = namespace.list_tables(request).await.unwrap();
assert_eq!(response.tables.len(), 0);
}

#[tokio::test]
async fn test_connect_builder_with_session() {
let temp_dir = TempStdDir::default();
let session = Arc::new(Session::default());

let namespace = ConnectBuilder::new("dir")
.property("root", temp_dir.to_str().unwrap())
.session(session.clone())
.connect()
.await
.unwrap();

// Verify we can use the namespace
let request = ListTablesRequest::new();
let response = namespace.list_tables(request).await.unwrap();
assert_eq!(response.tables.len(), 0);
}

#[tokio::test]
async fn test_connect_builder_invalid_impl() {
let result = ConnectBuilder::new("invalid")
.property("root", "/tmp")
.connect()
.await;

assert!(result.is_err());
let err = result.err().unwrap();
assert!(err.to_string().contains("not available"));
}
}
Loading
Loading