Skip to content
2 changes: 0 additions & 2 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
/// Key used by lance to mark a field as a blob
/// TODO: Use Arrow extension mechanism instead?
pub const BLOB_META_KEY: &str = "lance-encoding:blob";
/// Key used by Lance to record the blob column format version.
pub const BLOB_VERSION_META_KEY: &str = "lance-encoding:blob-version";

type Result<T> = std::result::Result<T, ArrowError>;

Expand Down
4 changes: 0 additions & 4 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ pub static BLOB_V2_DESC_TYPE: LazyLock<DataType> =
pub static BLOB_V2_DESC_FIELD: LazyLock<ArrowField> = LazyLock::new(|| {
ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), true).with_metadata(HashMap::from([
(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()),
(
lance_arrow::BLOB_VERSION_META_KEY.to_string(),
"2".to_string(),
),
]))
});

Expand Down
76 changes: 18 additions & 58 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,20 @@ pub enum BlobVersion {
}

impl BlobVersion {
pub fn from_metadata_value(value: Option<&str>) -> Self {
/// Convert a persisted string value (e.g. table config) into a blob version
pub fn from_config_value(value: &str) -> Option<Self> {
match value {
Some("2") => Self::V2,
_ => Self::V1,
"1" => Some(Self::V1),
"2" => Some(Self::V2),
_ => None,
}
}

pub fn metadata_value(self) -> Option<&'static str> {
/// Persistable string representation for table config.
pub fn config_value(self) -> &'static str {
match self {
Self::V1 => None,
Self::V2 => Some("2"),
Self::V1 => "1",
Self::V2 => "2",
}
}
}
Expand Down Expand Up @@ -261,7 +264,11 @@ impl Field {
} else {
let mut new_field = self.clone();
new_field.children = children;
Some(projection.blob_handling.unload_if_needed(new_field))
Some(
projection
.blob_handling
.unload_if_needed(new_field, projection.blob_version),
)
}
}

Expand Down Expand Up @@ -488,36 +495,13 @@ impl Field {
self.metadata.contains_key(BLOB_META_KEY)
}

pub fn blob_version(&self) -> BlobVersion {
if !self.is_blob() {
return BlobVersion::V1;
}
let value = self.metadata.get(BLOB_VERSION_META_KEY).map(|s| s.as_str());
BlobVersion::from_metadata_value(value)
}

pub fn set_blob_version(&mut self, version: BlobVersion) {
if !self.is_blob() {
return;
}
match version.metadata_value() {
Some(value) => {
self.metadata
.insert(BLOB_VERSION_META_KEY.to_string(), value.to_string());
}
None => {
self.metadata.remove(BLOB_VERSION_META_KEY);
}
}
}

/// If the field is a blob, return a new field with the same name and id
/// but with the data type set to a struct of the blob description fields.
///
/// If the field is not a blob, return the field itself.
pub fn into_unloaded(mut self) -> Self {
pub fn into_unloaded_with_version(mut self, version: BlobVersion) -> Self {
Comment thread
Xuanwo marked this conversation as resolved.
if self.data_type().is_binary_like() && self.is_blob() {
match self.blob_version() {
match version {
BlobVersion::V2 => {
self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone();
Expand Down Expand Up @@ -1529,38 +1513,14 @@ mod tests {
assert!(f2.compare_with_options(&f1, &ignore_nullability));
}

#[test]
fn blob_version_detection_and_setting() {
let mut metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]);
let field_v1: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata.clone())
.try_into()
.unwrap();
assert_eq!(field_v1.blob_version(), BlobVersion::V1);

metadata.insert(BLOB_VERSION_META_KEY.to_string(), "2".to_string());
let mut field_v2: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
assert_eq!(field_v2.blob_version(), BlobVersion::V2);

field_v2.set_blob_version(BlobVersion::V1);
assert_eq!(field_v2.blob_version(), BlobVersion::V1);
assert!(!field_v2.metadata.contains_key(BLOB_VERSION_META_KEY));
}

#[test]
fn blob_into_unloaded_selects_v2_layout() {
let metadata = HashMap::from([
(BLOB_META_KEY.to_string(), "true".to_string()),
(BLOB_VERSION_META_KEY.to_string(), "2".to_string()),
]);
let metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]);
let field: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
let unloaded = field.into_unloaded();
let unloaded = field.into_unloaded_with_version(BlobVersion::V2);
assert_eq!(unloaded.children.len(), 5);
assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type);
}
Expand Down
64 changes: 10 additions & 54 deletions rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,6 @@ impl Schema {
}
}

pub fn apply_blob_version(&mut self, version: BlobVersion, allow_change: bool) -> Result<()> {
for field in self.fields.iter_mut() {
apply_blob_version_to_field(field, version, allow_change)?;
}
Ok(())
}

pub fn has_dictionary_types(&self) -> bool {
self.fields.iter().any(|f| f.has_dictionary_types())
}
Expand Down Expand Up @@ -893,31 +886,6 @@ fn explain_metadata_difference(
}
}

fn apply_blob_version_to_field(
field: &mut Field,
version: BlobVersion,
allow_change: bool,
) -> Result<()> {
if field.is_blob() {
let current = field.blob_version();
if current != version && !allow_change {
return Err(Error::InvalidInput {
source: format!(
"Blob column '{}' uses version {:?}, expected {:?}",
field.name, current, version
)
.into(),
location: location!(),
});
}
field.set_blob_version(version);
}
for child in field.children.iter_mut() {
apply_blob_version_to_field(child, version, allow_change)?;
}
Ok(())
}

/// What to do when a column is missing in the schema
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OnMissing {
Expand Down Expand Up @@ -972,9 +940,9 @@ impl BlobHandling {
}
}

pub fn unload_if_needed(&self, field: Field) -> Field {
pub fn unload_if_needed(&self, field: Field, version: BlobVersion) -> Field {
if self.should_unload(&field) {
field.into_unloaded()
field.into_unloaded_with_version(version)
} else {
field
}
Expand All @@ -994,6 +962,7 @@ pub struct Projection {
pub with_row_last_updated_at_version: bool,
pub with_row_created_at_version: bool,
pub blob_handling: BlobHandling,
pub blob_version: BlobVersion,
}

impl Debug for Projection {
Expand All @@ -1011,6 +980,7 @@ impl Debug for Projection {
&self.with_row_created_at_version,
)
.field("blob_handling", &self.blob_handling)
.field("blob_version", &self.blob_version)
.finish()
}
}
Expand All @@ -1026,6 +996,7 @@ impl Projection {
with_row_last_updated_at_version: false,
with_row_created_at_version: false,
blob_handling: BlobHandling::default(),
blob_version: BlobVersion::V1,
}
}

Expand Down Expand Up @@ -1059,6 +1030,11 @@ impl Projection {
self
}

pub fn with_blob_version(mut self, blob_version: BlobVersion) -> Self {
self.blob_version = blob_version;
self
}

fn add_field_children(field_ids: &mut HashSet<i32>, field: &Field) {
for child in &field.children {
field_ids.insert(child.id);
Expand Down Expand Up @@ -1516,7 +1492,6 @@ mod tests {
use std::sync::Arc;

use super::*;
use crate::datatypes::BlobVersion;

#[test]
fn test_resolve_with_quoted_fields() {
Expand Down Expand Up @@ -2537,23 +2512,4 @@ mod tests {
.contains(error_message_contains[idx]));
}
}

#[test]
fn apply_blob_version_requires_consistent_metadata() {
let arrow_field = ArrowField::new("blob", ArrowDataType::LargeBinary, true).with_metadata(
HashMap::from([
(BLOB_META_KEY.to_string(), "true".to_string()),
(BLOB_VERSION_META_KEY.to_string(), "2".to_string()),
]),
);
let mut schema =
Schema::try_from(&ArrowSchema::new(vec![arrow_field])).expect("schema creation");

assert!(schema.apply_blob_version(BlobVersion::V1, false).is_err());

schema
.apply_blob_version(BlobVersion::V1, true)
.expect("allow metadata change when permitted");
assert_eq!(schema.fields[0].blob_version(), BlobVersion::V1);
}
}
15 changes: 11 additions & 4 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ use futures::future::BoxFuture;
use futures::stream::{self, BoxStream, StreamExt, TryStreamExt};
use futures::{FutureExt, Stream};

use crate::dataset::blob::blob_version_from_config;
use crate::dataset::metadata::UpdateFieldMetadataBuilder;
use crate::dataset::transaction::translate_schema_metadata_updates;
use crate::session::caches::{DSMetadataCache, ManifestKey, TransactionKey};
use crate::session::index_caches::DSIndexCache;
use itertools::Itertools;
use lance_core::datatypes::{Field, OnMissing, OnTypeMismatch, Projectable, Projection};
use lance_core::datatypes::{
BlobVersion, Field, OnMissing, OnTypeMismatch, Projectable, Projection,
};
use lance_core::traits::DatasetTakeRows;
use lance_core::utils::address::RowAddress;
use lance_core::utils::tracing::{
Expand Down Expand Up @@ -56,7 +59,7 @@ use std::sync::Arc;
use take::row_offsets_to_row_addresses;
use tracing::{info, instrument};

mod blob;
pub(crate) mod blob;
mod branch_location;
pub mod builder;
pub mod cleanup;
Expand Down Expand Up @@ -1608,12 +1611,12 @@ impl Dataset {
/// Similar to [Self::schema], but only returns fields that are not marked as blob columns
/// Creates a new empty projection into the dataset schema
pub fn empty_projection(self: &Arc<Self>) -> Projection {
Projection::empty(self.clone())
Projection::empty(self.clone()).with_blob_version(self.blob_version())
}

/// Creates a projection that includes all columns in the dataset
pub fn full_projection(self: &Arc<Self>) -> Projection {
Projection::full(self.clone())
Projection::full(self.clone()).with_blob_version(self.blob_version())
}

/// Get fragments.
Expand Down Expand Up @@ -2332,6 +2335,10 @@ impl Dataset {
&self.manifest.config
}

pub(crate) fn blob_version(&self) -> BlobVersion {
blob_version_from_config(&self.manifest.config)
}

/// Delete keys from the config.
#[deprecated(
note = "Use the new update_config(values, replace) method - pass None values to delete keys"
Expand Down
Loading
Loading