diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 29314b166e1..08f88c3c049 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -43,6 +43,8 @@ pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata"; /// Key used by lance to mark a field as a blob /// TODO: Use Arrow extension mechanism instead? pub const BLOB_META_KEY: &str = "lance-encoding:blob"; +/// Key used by Lance to record the blob column format version. +pub const BLOB_VERSION_META_KEY: &str = "lance-encoding:blob-version"; type Result = std::result::Result; diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 96861ca54d7..e7f576d4a18 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -18,7 +18,9 @@ mod field; mod schema; use crate::{Error, Result}; -pub use field::{Encoding, Field, NullabilityComparison, OnTypeMismatch, SchemaCompareOptions}; +pub use field::{ + BlobVersion, Encoding, Field, NullabilityComparison, OnTypeMismatch, SchemaCompareOptions, +}; pub use schema::{ escape_field_path_for_project, format_field_path, parse_field_path, FieldRef, OnMissing, Projectable, Projection, Schema, @@ -44,6 +46,32 @@ pub static BLOB_DESC_FIELD: LazyLock = LazyLock::new(|| { pub static BLOB_DESC_LANCE_FIELD: LazyLock = LazyLock::new(|| Field::try_from(&*BLOB_DESC_FIELD).unwrap()); +pub static BLOB_V2_DESC_FIELDS: LazyLock = LazyLock::new(|| { + Fields::from(vec![ + ArrowField::new("kind", DataType::UInt8, false), + ArrowField::new("position", DataType::UInt64, true), + ArrowField::new("size", DataType::UInt64, true), + ArrowField::new("blob_id", DataType::UInt32, true), + ArrowField::new("blob_uri", DataType::Utf8, true), + ]) +}); + +pub static BLOB_V2_DESC_TYPE: LazyLock = + LazyLock::new(|| DataType::Struct(BLOB_V2_DESC_FIELDS.clone())); + +pub static BLOB_V2_DESC_FIELD: LazyLock = LazyLock::new(|| { + ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), true).with_metadata(HashMap::from([ + (lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()), + ( + lance_arrow::BLOB_VERSION_META_KEY.to_string(), + "2".to_string(), + ), + ])) +}); + +pub static BLOB_V2_DESC_LANCE_FIELD: LazyLock = + LazyLock::new(|| Field::try_from(&*BLOB_V2_DESC_FIELD).unwrap()); + /// LogicalType is a string presentation of arrow type. /// to be serialized into protobuf. #[derive(Debug, Clone, PartialEq, DeepSizeOf)] diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 3817011d1cd..4b9af5fd24a 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -29,7 +29,10 @@ use super::{ schema::{compare_fields, explain_fields_difference}, Dictionary, LogicalType, Projection, }; -use crate::{datatypes::BLOB_DESC_LANCE_FIELD, Error, Result}; +use crate::{ + datatypes::{BLOB_DESC_LANCE_FIELD, BLOB_V2_DESC_LANCE_FIELD}, + Error, Result, +}; /// Use this config key in Arrow field metadata to indicate a column is a part of the primary key. /// The value can be any true values like `true`, `1`, `yes` (case-insensitive). @@ -69,6 +72,32 @@ pub struct SchemaCompareOptions { /// Allow out of order fields (default false) pub ignore_field_order: bool, } + +/// Blob column format version. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum BlobVersion { + /// Legacy blob format (position / size only). + #[default] + V1, + /// Blob v2 struct format. + V2, +} + +impl BlobVersion { + pub fn from_metadata_value(value: Option<&str>) -> Self { + match value { + Some("2") => Self::V2, + _ => Self::V1, + } + } + + pub fn metadata_value(self) -> Option<&'static str> { + match self { + Self::V1 => None, + Self::V2 => Some("2"), + } + } +} /// Encoding enum. #[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)] pub enum Encoding { @@ -457,14 +486,45 @@ impl Field { self.metadata.contains_key(BLOB_META_KEY) } + pub fn blob_version(&self) -> BlobVersion { + if !self.is_blob() { + return BlobVersion::V1; + } + let value = self.metadata.get(BLOB_VERSION_META_KEY).map(|s| s.as_str()); + BlobVersion::from_metadata_value(value) + } + + pub fn set_blob_version(&mut self, version: BlobVersion) { + if !self.is_blob() { + return; + } + match version.metadata_value() { + Some(value) => { + self.metadata + .insert(BLOB_VERSION_META_KEY.to_string(), value.to_string()); + } + None => { + self.metadata.remove(BLOB_VERSION_META_KEY); + } + } + } + /// If the field is a blob, return a new field with the same name and id /// but with the data type set to a struct of the blob description fields. /// /// If the field is not a blob, return the field itself. pub fn into_unloaded(mut self) -> Self { if self.data_type().is_binary_like() && self.is_blob() { - self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone(); - self.children = BLOB_DESC_LANCE_FIELD.children.clone(); + match self.blob_version() { + BlobVersion::V2 => { + self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone(); + self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone(); + } + BlobVersion::V1 => { + self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone(); + self.children = BLOB_DESC_LANCE_FIELD.children.clone(); + } + } } self } @@ -1466,4 +1526,40 @@ mod tests { assert!(f1.compare_with_options(&f2, &ignore_nullability)); assert!(f2.compare_with_options(&f1, &ignore_nullability)); } + + #[test] + fn blob_version_detection_and_setting() { + let mut metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]); + let field_v1: Field = ArrowField::new("blob", DataType::LargeBinary, true) + .with_metadata(metadata.clone()) + .try_into() + .unwrap(); + assert_eq!(field_v1.blob_version(), BlobVersion::V1); + + metadata.insert(BLOB_VERSION_META_KEY.to_string(), "2".to_string()); + let mut field_v2: Field = ArrowField::new("blob", DataType::LargeBinary, true) + .with_metadata(metadata) + .try_into() + .unwrap(); + assert_eq!(field_v2.blob_version(), BlobVersion::V2); + + field_v2.set_blob_version(BlobVersion::V1); + assert_eq!(field_v2.blob_version(), BlobVersion::V1); + assert!(!field_v2.metadata.contains_key(BLOB_VERSION_META_KEY)); + } + + #[test] + fn blob_into_unloaded_selects_v2_layout() { + let metadata = HashMap::from([ + (BLOB_META_KEY.to_string(), "true".to_string()), + (BLOB_VERSION_META_KEY.to_string(), "2".to_string()), + ]); + let field: Field = ArrowField::new("blob", DataType::LargeBinary, true) + .with_metadata(metadata) + .try_into() + .unwrap(); + let unloaded = field.into_unloaded(); + assert_eq!(unloaded.children.len(), 5); + assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type); + } } diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index d3bd7f4b158..71bf388773a 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -15,7 +15,7 @@ use deepsize::DeepSizeOf; use lance_arrow::*; use snafu::location; -use super::field::{Field, OnTypeMismatch, SchemaCompareOptions}; +use super::field::{BlobVersion, Field, OnTypeMismatch, SchemaCompareOptions}; use crate::{Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD, WILDCARD}; /// Lance Schema. @@ -146,6 +146,13 @@ impl Schema { } } + pub fn apply_blob_version(&mut self, version: BlobVersion, allow_change: bool) -> Result<()> { + for field in self.fields.iter_mut() { + apply_blob_version_to_field(field, version, allow_change)?; + } + Ok(()) + } + pub fn has_dictionary_types(&self) -> bool { self.fields.iter().any(|f| f.has_dictionary_types()) } @@ -880,6 +887,31 @@ fn explain_metadata_difference( } } +fn apply_blob_version_to_field( + field: &mut Field, + version: BlobVersion, + allow_change: bool, +) -> Result<()> { + if field.is_blob() { + let current = field.blob_version(); + if current != version && !allow_change { + return Err(Error::InvalidInput { + source: format!( + "Blob column '{}' uses version {:?}, expected {:?}", + field.name, current, version + ) + .into(), + location: location!(), + }); + } + field.set_blob_version(version); + } + for child in field.children.iter_mut() { + apply_blob_version_to_field(child, version, allow_change)?; + } + Ok(()) +} + /// What to do when a column is missing in the schema #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum OnMissing { @@ -1478,6 +1510,7 @@ mod tests { use std::sync::Arc; use super::*; + use crate::datatypes::BlobVersion; #[test] fn test_resolve_with_quoted_fields() { @@ -2498,4 +2531,23 @@ mod tests { .contains(error_message_contains[idx])); } } + + #[test] + fn apply_blob_version_requires_consistent_metadata() { + let arrow_field = ArrowField::new("blob", ArrowDataType::LargeBinary, true).with_metadata( + HashMap::from([ + (BLOB_META_KEY.to_string(), "true".to_string()), + (BLOB_VERSION_META_KEY.to_string(), "2".to_string()), + ]), + ); + let mut schema = + Schema::try_from(&ArrowSchema::new(vec![arrow_field])).expect("schema creation"); + + assert!(schema.apply_blob_version(BlobVersion::V1, false).is_err()); + + schema + .apply_blob_version(BlobVersion::V1, true) + .expect("allow metadata change when permitted"); + assert_eq!(schema.fields[0].blob_version(), BlobVersion::V1); + } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 74d028a93dc..1bdf05e640a 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -7,7 +7,7 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{Stream, StreamExt, TryStreamExt}; use lance_core::datatypes::{ - NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, + BlobVersion, NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions, }; use lance_core::error::LanceOptionExt; use lance_core::utils::tempfile::TempDir; @@ -41,6 +41,14 @@ use super::transaction::Transaction; use super::utils::SchemaAdapter; use super::DATA_DIR; +fn blob_version_for(storage_version: LanceFileVersion) -> BlobVersion { + if storage_version >= LanceFileVersion::V2_2 { + BlobVersion::V2 + } else { + BlobVersion::V1 + } +} + mod commit; pub mod delete; mod insert; @@ -580,7 +588,9 @@ pub async fn write_fragments_internal( // Make sure the max rows per group is not larger than the max rows per file params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file); - let (schema, storage_version) = if let Some(dataset) = dataset { + let allow_blob_version_change = + dataset.is_none() || matches!(params.mode, WriteMode::Overwrite); + let (mut schema, storage_version) = if let Some(dataset) = dataset { match params.mode { WriteMode::Append | WriteMode::Create => { // Append mode, so we need to check compatibility @@ -627,6 +637,9 @@ pub async fn write_fragments_internal( (converted_schema, params.storage_version_or_default()) }; + let target_blob_version = blob_version_for(storage_version); + schema.apply_blob_version(target_blob_version, allow_blob_version_change)?; + let fragments = do_write_fragments( object_store, base_dir,