Skip to content
Merged
5 changes: 3 additions & 2 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
/// Key used by lance to mark a field as a blob
/// TODO: Use Arrow extension mechanism instead?
pub const BLOB_META_KEY: &str = "lance-encoding:blob";
/// Key used by Lance to record the blob column format version.
pub const BLOB_VERSION_META_KEY: &str = "lance-encoding:blob-version";

/// Arrow extension type name for Lance blob v2 columns
pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";

type Result<T> = std::result::Result<T, ArrowError>;

Expand Down
6 changes: 5 additions & 1 deletion rust/lance-arrow/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema};

use crate::BLOB_META_KEY;
use crate::{ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME};

pub enum Indentation {
OneLine,
Expand Down Expand Up @@ -103,6 +103,10 @@ impl FieldExt for Field {
fn is_blob(&self) -> bool {
let field_metadata = self.metadata();
field_metadata.get(BLOB_META_KEY).is_some()
|| field_metadata
.get(ARROW_EXT_NAME_KEY)
.map(|value| value == BLOB_V2_EXT_NAME)
.unwrap_or(false)
}
}

Expand Down
11 changes: 7 additions & 4 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ pub static BLOB_V2_DESC_TYPE: LazyLock<DataType> =
pub static BLOB_V2_DESC_FIELD: LazyLock<ArrowField> = LazyLock::new(|| {
ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), true).with_metadata(HashMap::from([
(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()),
(
lance_arrow::BLOB_VERSION_META_KEY.to_string(),
"2".to_string(),
),
]))
});

pub static BLOB_V2_DESC_LANCE_FIELD: LazyLock<Field> =
LazyLock::new(|| Field::try_from(&*BLOB_V2_DESC_FIELD).unwrap());

pub const BLOB_LOGICAL_TYPE: &str = "blob";

/// LogicalType is a string presentation of arrow type.
/// to be serialized into protobuf.
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
Expand All @@ -95,6 +93,10 @@ impl LogicalType {
fn is_struct(&self) -> bool {
self.0 == "struct"
}

fn is_blob(&self) -> bool {
self.0 == BLOB_LOGICAL_TYPE
}
}

impl From<&str> for LogicalType {
Expand Down Expand Up @@ -228,6 +230,7 @@ impl TryFrom<&LogicalType> for DataType {
"binary" => Some(Binary),
"large_string" => Some(LargeUtf8),
"large_binary" => Some(LargeBinary),
BLOB_LOGICAL_TYPE => Some(LargeBinary),
"json" => Some(LargeBinary),
"date32:day" => Some(Date32),
"date64:ms" => Some(Date64),
Expand Down
143 changes: 84 additions & 59 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use deepsize::DeepSizeOf;
use lance_arrow::{
json::{is_arrow_json_field, is_json_field},
ARROW_EXT_NAME_KEY, *,
DataTypeExt, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME,
};
use snafu::location;

Expand All @@ -42,6 +42,15 @@
/// (3) The field must not be within a list type.
pub const LANCE_UNENFORCED_PRIMARY_KEY: &str = "lance-schema:unenforced-primary-key";

fn has_blob_v2_extension(field: &ArrowField) -> bool {
field.data_type() == &DataType::LargeBinary
&& field
.metadata()
.get(ARROW_EXT_NAME_KEY)
.map(|name| name == BLOB_V2_EXT_NAME)
.unwrap_or(false)
}

#[derive(Debug, Default)]
pub enum NullabilityComparison {
// If the nullabilities don't match then the fields don't match
Expand Down Expand Up @@ -86,17 +95,20 @@
}

impl BlobVersion {
pub fn from_metadata_value(value: Option<&str>) -> Self {
/// Convert a persisted string value (e.g. table config) into a blob version
pub fn from_config_value(value: &str) -> Option<Self> {
match value {
Some("2") => Self::V2,
_ => Self::V1,
"1" => Some(Self::V1),
"2" => Some(Self::V2),
_ => None,
}
}

pub fn metadata_value(self) -> Option<&'static str> {
/// Persistable string representation for table config.
pub fn config_value(self) -> &'static str {
match self {
Self::V1 => None,
Self::V2 => Some("2"),
Self::V1 => "1",
Self::V2 => "2",
}
}
}
Expand Down Expand Up @@ -261,7 +273,11 @@
} else {
let mut new_field = self.clone();
new_field.children = children;
Some(projection.blob_handling.unload_if_needed(new_field))
Some(
projection
.blob_handling
.unload_if_needed(new_field, projection.blob_version),
)
}
}

Expand Down Expand Up @@ -486,38 +502,20 @@
/// Blob fields will load descriptions by default
pub fn is_blob(&self) -> bool {
self.metadata.contains_key(BLOB_META_KEY)
}

pub fn blob_version(&self) -> BlobVersion {
if !self.is_blob() {
return BlobVersion::V1;
}
let value = self.metadata.get(BLOB_VERSION_META_KEY).map(|s| s.as_str());
BlobVersion::from_metadata_value(value)
}

pub fn set_blob_version(&mut self, version: BlobVersion) {
if !self.is_blob() {
return;
}
match version.metadata_value() {
Some(value) => {
self.metadata
.insert(BLOB_VERSION_META_KEY.to_string(), value.to_string());
}
None => {
self.metadata.remove(BLOB_VERSION_META_KEY);
}
}
|| self
.metadata
.get(ARROW_EXT_NAME_KEY)
.map(|name| name == BLOB_V2_EXT_NAME)
.unwrap_or(false)
}

/// If the field is a blob, return a new field with the same name and id
/// but with the data type set to a struct of the blob description fields.
///
/// If the field is not a blob, return the field itself.
pub fn into_unloaded(mut self) -> Self {
pub fn into_unloaded_with_version(mut self, version: BlobVersion) -> Self {
if self.data_type().is_binary_like() && self.is_blob() {
match self.blob_version() {
match version {
BlobVersion::V2 => {
self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone();
Expand Down Expand Up @@ -991,15 +989,24 @@
DataType::LargeList(item) => vec![Self::try_from(item.as_ref())?],
_ => vec![],
};
let metadata = field.metadata().clone();
let mut metadata = field.metadata().clone();
let unenforced_primary_key = metadata
.get(LANCE_UNENFORCED_PRIMARY_KEY)
.map(|s| matches!(s.to_lowercase().as_str(), "true" | "1" | "yes"))
.unwrap_or(false);
let is_blob_v2 = has_blob_v2_extension(field);

if is_blob_v2 {
metadata
.entry(BLOB_META_KEY.to_string())
.or_insert_with(|| "true".to_string());
}

// Check for JSON extension types (both Arrow and Lance)
let logical_type = if is_arrow_json_field(field) || is_json_field(field) {
LogicalType::from("json")
} else if is_blob_v2 {
LogicalType::from(super::BLOB_LOGICAL_TYPE)
} else {
LogicalType::try_from(field.data_type())?
};
Expand Down Expand Up @@ -1039,6 +1046,19 @@
let out = Self::new(&field.name, field.data_type(), field.nullable);
let mut metadata = field.metadata.clone();

if field.logical_type.is_blob() {
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),

Check warning on line 1051 in rust/lance-core/src/datatypes/field.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-core/src/datatypes/field.rs
lance_arrow::BLOB_V2_EXT_NAME.to_string(),
);
metadata
.entry(ARROW_EXT_META_KEY.to_string())
.or_default();
metadata
.entry(BLOB_META_KEY.to_string())
.or_insert_with(|| "true".to_string());
}

// Add JSON extension metadata if this is a JSON field
if field.logical_type.0 == "json" {
metadata.insert(
Expand All @@ -1057,6 +1077,8 @@

use arrow_array::{DictionaryArray, StringArray, UInt32Array};
use arrow_schema::{Fields, TimeUnit};
use lance_arrow::{ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BLOB_META_KEY, BLOB_V2_EXT_NAME};
use std::collections::HashMap;
#[test]
fn arrow_field_to_field() {
for (name, data_type) in [
Expand Down Expand Up @@ -1530,38 +1552,41 @@
}

#[test]
fn blob_version_detection_and_setting() {
let mut metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]);
let field_v1: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata.clone())
.try_into()
.unwrap();
assert_eq!(field_v1.blob_version(), BlobVersion::V1);

metadata.insert(BLOB_VERSION_META_KEY.to_string(), "2".to_string());
let mut field_v2: Field = ArrowField::new("blob", DataType::LargeBinary, true)
fn blob_into_unloaded_selects_v2_layout() {
let metadata = HashMap::from([(BLOB_META_KEY.to_string(), "true".to_string())]);
let field: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
assert_eq!(field_v2.blob_version(), BlobVersion::V2);

field_v2.set_blob_version(BlobVersion::V1);
assert_eq!(field_v2.blob_version(), BlobVersion::V1);
assert!(!field_v2.metadata.contains_key(BLOB_VERSION_META_KEY));
let unloaded = field.into_unloaded_with_version(BlobVersion::V2);
assert_eq!(unloaded.children.len(), 5);
assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type);
}

#[test]
fn blob_into_unloaded_selects_v2_layout() {
fn blob_extension_roundtrip() {
let metadata = HashMap::from([
(BLOB_META_KEY.to_string(), "true".to_string()),
(BLOB_VERSION_META_KEY.to_string(), "2".to_string()),
(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string()),
(ARROW_EXT_META_KEY.to_string(), "".to_string()),
]);
let field: Field = ArrowField::new("blob", DataType::LargeBinary, true)
.with_metadata(metadata)
.try_into()
.unwrap();
let unloaded = field.into_unloaded();
assert_eq!(unloaded.children.len(), 5);
assert_eq!(unloaded.logical_type, BLOB_V2_DESC_LANCE_FIELD.logical_type);
let arrow_field =
ArrowField::new("blob", DataType::LargeBinary, true).with_metadata(metadata);
let field = Field::try_from(&arrow_field).unwrap();
assert_eq!(
field.logical_type,
LogicalType::from(crate::datatypes::BLOB_LOGICAL_TYPE)
);
assert!(field.is_blob());
assert_eq!(field.data_type(), DataType::LargeBinary);

let roundtrip: ArrowField = ArrowField::from(&field);
assert_eq!(
roundtrip.metadata().get(ARROW_EXT_NAME_KEY),
Some(&BLOB_V2_EXT_NAME.to_string())
);
assert_eq!(
roundtrip.metadata().get(BLOB_META_KEY),
Some(&"true".to_string())
);
}
}
Loading
Loading