Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lance-namespace-impls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ reqwest = { version = "0.12", optional = true, default-features = false, feature
# Directory implementation dependencies (always enabled)
url = { workspace = true }
lance = { workspace = true }
lance-index = { workspace = true }
lance-io = { workspace = true }
object_store = { workspace = true }
arrow = { workspace = true }
Expand Down
21 changes: 21 additions & 0 deletions rust/lance-namespace-impls/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub struct DirectoryNamespaceBuilder {
session: Option<Arc<Session>>,
manifest_enabled: bool,
dir_listing_enabled: bool,
inline_optimization_enabled: bool,
}

impl DirectoryNamespaceBuilder {
Expand All @@ -89,6 +90,7 @@ impl DirectoryNamespaceBuilder {
session: None,
manifest_enabled: true,
dir_listing_enabled: true, // Default to enabled for backwards compatibility
inline_optimization_enabled: true, // Default to enabled
}
}

Expand All @@ -110,13 +112,24 @@ impl DirectoryNamespaceBuilder {
self
}

/// Enable or disable inline optimization of the __manifest table.
///
/// When enabled (default), performs compaction and indexing on the __manifest table
/// after every write operation to maintain optimal performance.
/// When disabled, manual optimization must be performed separately.
pub fn inline_optimization_enabled(mut self, enabled: bool) -> Self {
self.inline_optimization_enabled = enabled;
self
}

/// Create a DirectoryNamespaceBuilder from properties HashMap.
///
/// This method parses a properties map into builder configuration.
/// It expects:
/// - `root`: The root directory path (required)
/// - `manifest_enabled`: Enable manifest-based table tracking (optional, default: true)
/// - `dir_listing_enabled`: Enable directory listing for table discovery (optional, default: true)
/// - `inline_optimization_enabled`: Enable inline optimization of __manifest table (optional, default: true)
/// - `storage.*`: Storage options (optional, prefix will be stripped)
///
/// # Arguments
Expand Down Expand Up @@ -190,12 +203,19 @@ impl DirectoryNamespaceBuilder {
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(true);

// Extract inline_optimization_enabled (default: true)
let inline_optimization_enabled = properties
.get("inline_optimization_enabled")
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(true);

Ok(Self {
root: root.trim_end_matches('/').to_string(),
storage_options,
session,
manifest_enabled,
dir_listing_enabled,
inline_optimization_enabled,
})
}

Expand Down Expand Up @@ -262,6 +282,7 @@ impl DirectoryNamespaceBuilder {
object_store.clone(),
base_path.clone(),
self.dir_listing_enabled,
self.inline_optimization_enabled,
)
.await
{
Expand Down
163 changes: 158 additions & 5 deletions rust/lance-namespace-impls/src/dir/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::StreamExt;
use lance::dataset::optimize::{compact_files, CompactionOptions};
use lance::dataset::WriteParams;
use lance::session::Session;
use lance::{dataset::scanner::Scanner, Dataset};
use lance_core::{box_error, Error, Result};
use lance_index::optimize::OptimizeOptions;
use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
use lance_index::traits::DatasetIndexExt;
use lance_index::IndexType;
use lance_io::object_store::ObjectStore;
use lance_namespace::models::{
CreateEmptyTableRequest, CreateEmptyTableResponse, CreateNamespaceRequest,
Expand All @@ -42,6 +47,14 @@
const MANIFEST_TABLE_NAME: &str = "__manifest";
const DELIMITER: &str = "$";

// Index names for the __manifest table
/// BTREE index on the object_id column for fast lookups
const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
/// Bitmap index on the object_type column for filtering by type
const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
/// LabelList index on the base_objects column for materialized view dependencies
const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";

/// Object types that can be stored in the manifest
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ObjectType {
Expand Down Expand Up @@ -226,6 +239,9 @@
/// If true, root namespace tables use {table_name}.lance naming
/// If false, they use namespace-prefixed names
dir_listing_enabled: bool,
/// Whether to perform inline optimization (compaction and indexing) on the __manifest table
/// after every write. Defaults to true.
inline_optimization_enabled: bool,
}

impl ManifestNamespace {
Expand All @@ -237,6 +253,7 @@
object_store: Arc<ObjectStore>,
base_path: Path,
dir_listing_enabled: bool,
inline_optimization_enabled: bool,
) -> Result<Self> {
let manifest_dataset =
Self::create_or_get_manifest(&root, object_store.clone(), session.clone()).await?;
Expand All @@ -249,6 +266,7 @@
base_path,
manifest_dataset,
dir_listing_enabled,
inline_optimization_enabled,
})
}

Expand Down Expand Up @@ -333,6 +351,121 @@
Ok(full_url.to_string())
}

/// Perform inline optimization on the __manifest table.
///
/// This method:
/// 1. Creates three indexes on the manifest table:
/// - BTREE index on object_id for fast lookups
/// - Bitmap index on object_type for filtering by type
/// - LabelList index on base_objects for materialized view dependencies
/// 2. Runs file compaction to merge small files
/// 3. Optimizes existing indices
///
/// This is called automatically after writes when inline_optimization_enabled is true.
async fn run_inline_optimization(&self) -> Result<()> {
if !self.inline_optimization_enabled {
return Ok(());
}

// Get a mutable reference to the dataset to perform optimization
let mut dataset_guard = self.manifest_dataset.get_mut().await?;
let dataset: &mut Dataset = &mut *dataset_guard;

Check warning on line 373 in rust/lance-namespace-impls/src/dir/manifest.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-namespace-impls/src/dir/manifest.rs
// Step 1: Create indexes if they don't already exist
let indices = dataset.load_indices().await.map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!("Failed to load indices: {}", e))),
location: location!(),
})?;

// Check which indexes already exist

Check warning on line 380 in rust/lance-namespace-impls/src/dir/manifest.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-namespace-impls/src/dir/manifest.rs
let has_object_id_index = indices.iter().any(|idx| idx.name == OBJECT_ID_INDEX_NAME);
let has_object_type_index = indices.iter().any(|idx| idx.name == OBJECT_TYPE_INDEX_NAME);
let has_base_objects_index = indices.iter().any(|idx| idx.name == BASE_OBJECTS_INDEX_NAME);

// Create BTREE index on object_id
if !has_object_id_index {
log::info!("Creating BTREE index '{}' on object_id for __manifest table", OBJECT_ID_INDEX_NAME);

Check warning on line 387 in rust/lance-namespace-impls/src/dir/manifest.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-namespace-impls/src/dir/manifest.rs
let params = ScalarIndexParams::for_builtin(BuiltinIndexType::BTree);
dataset
.create_index(&["object_id"], IndexType::BTree, Some(OBJECT_ID_INDEX_NAME.to_string()), &params, true)
.await
.map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!(
"Failed to create BTREE index on object_id: {}",
e
))),
location: location!(),
})?;
}

Check warning on line 400 in rust/lance-namespace-impls/src/dir/manifest.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-namespace-impls/src/dir/manifest.rs
// Create Bitmap index on object_type
if !has_object_type_index {
log::info!("Creating Bitmap index '{}' on object_type for __manifest table", OBJECT_TYPE_INDEX_NAME);
let params = ScalarIndexParams::default();
dataset
.create_index(&["object_type"], IndexType::Bitmap, Some(OBJECT_TYPE_INDEX_NAME.to_string()), &params, true)
.await
.map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!(
"Failed to create Bitmap index on object_type: {}",
e
))),
location: location!(),
})?;
}

Check warning on line 416 in rust/lance-namespace-impls/src/dir/manifest.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-namespace-impls/src/dir/manifest.rs
// Create LabelList index on base_objects
if !has_base_objects_index {
log::info!("Creating LabelList index '{}' on base_objects for __manifest table", BASE_OBJECTS_INDEX_NAME);
let params = ScalarIndexParams::default();
dataset
.create_index(&["base_objects"], IndexType::LabelList, Some(BASE_OBJECTS_INDEX_NAME.to_string()), &params, true)
.await
.map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!(
"Failed to create LabelList index on base_objects: {}",
e
))),
location: location!(),
})?;
}

// Step 2: Run file compaction
log::debug!("Running file compaction on __manifest table");
let compaction_metrics = compact_files(dataset, CompactionOptions::default(), None)
.await
.map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!(
"Failed to compact files: {}",
e
))),
location: location!(),
})?;

if compaction_metrics.fragments_removed > 0 {
log::info!(
"Compacted __manifest table: removed {} fragments, added {} fragments",
compaction_metrics.fragments_removed,
compaction_metrics.fragments_added
);
}

// Step 3: Optimize indices
log::debug!("Optimizing indices on __manifest table");
dataset
.optimize_indices(&OptimizeOptions::default())
.await
.map_err(|e| Error::IO {
source: box_error(std::io::Error::other(format!(
"Failed to optimize indices: {}",
e
))),
location: location!(),
})?;

Ok(())
}

/// Get the manifest schema
fn manifest_schema() -> Arc<ArrowSchema> {
Arc::new(ArrowSchema::new(vec![
Expand Down Expand Up @@ -511,30 +644,43 @@
object_type: ObjectType,
location: Option<String>,
) -> Result<()> {
self.insert_into_manifest_with_metadata(object_id, object_type, location, None)
self.insert_into_manifest_with_metadata(object_id, object_type, location, None, None)
.await
}

/// Insert an entry into the manifest table with metadata
/// Insert an entry into the manifest table with metadata and base_objects
async fn insert_into_manifest_with_metadata(
&self,
object_id: String,
object_type: ObjectType,
location: Option<String>,
metadata: Option<String>,
base_objects: Option<Vec<String>>,
) -> Result<()> {
use arrow::array::builder::{ListBuilder, StringBuilder};

let schema = Self::manifest_schema();

// Create empty base_objects array
// Create base_objects array from the provided list
let string_builder = StringBuilder::new();
let mut list_builder = ListBuilder::new(string_builder).with_field(Arc::new(Field::new(
"object_id",
DataType::Utf8,
true,
)));
list_builder.append_null();

match base_objects {
Some(objects) => {
for obj in objects {
list_builder.values().append_value(obj);
}
list_builder.append(true);
}
None => {
list_builder.append_null();
}
}

let base_objects_array = list_builder.finish();

// Create arrays with optional values
Expand Down Expand Up @@ -621,6 +767,9 @@
let new_dataset = Arc::try_unwrap(new_dataset_arc).unwrap_or_else(|arc| (*arc).clone());
self.manifest_dataset.set_latest(new_dataset).await;

// Run inline optimization after write
self.run_inline_optimization().await?;

Ok(())
}

Expand All @@ -639,6 +788,10 @@
} // Drop the guard here

self.manifest_dataset.reload().await?;

// Run inline optimization after delete
self.run_inline_optimization().await?;

Comment on lines 639 to +831
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Inline optimization failures break manifest operations

After inserting into the __manifest table we immediately call self.run_inline_optimization().await?; (lines 767‑772), and the same happens after deletions (lines 790‑794). The manifest row has already been merged or deleted before this call, yet any error from creating indexes, compacting files, or optimizing indices bubbles up to the caller. This makes user operations non‑atomic: a transient failure during the optional optimization causes create_table/register_table to report an error even though the entry was already added, and drop_table/deregister_table returns an error after the manifest row is gone, preventing the physical data cleanup from running and leaving orphaned files. Inline optimization should be best‑effort or rolled back; as written it can leave the namespace in an inconsistent state whenever compaction or index maintenance fails.

Useful? React with 👍 / 👎.

Ok(())
}

Expand Down Expand Up @@ -1191,10 +1344,10 @@
None
} else {
Some(serde_json::to_string(props).ok()?)
}

Check warning on line 1347 in rust/lance-namespace-impls/src/dir/manifest.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-namespace-impls/src/dir/manifest.rs
});

self.insert_into_manifest_with_metadata(object_id, ObjectType::Namespace, None, metadata)
self.insert_into_manifest_with_metadata(object_id, ObjectType::Namespace, None, metadata, None)
.await?;

Ok(CreateNamespaceResponse {
Expand Down
Loading