Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,20 @@ pub static BLOB_DESC_LANCE_FIELD: LazyLock<Field> =
pub static BLOB_V2_DESC_FIELDS: LazyLock<Fields> = LazyLock::new(|| {
Fields::from(vec![
ArrowField::new("kind", DataType::UInt8, false),
ArrowField::new("position", DataType::UInt64, true),
ArrowField::new("size", DataType::UInt64, true),
ArrowField::new("blob_id", DataType::UInt32, true),
ArrowField::new("blob_uri", DataType::Utf8, true),
ArrowField::new("position", DataType::UInt64, false),
ArrowField::new("size", DataType::UInt64, false),
ArrowField::new("blob_id", DataType::UInt32, false),
ArrowField::new("blob_uri", DataType::Utf8, false),
])
});

pub static BLOB_V2_DESC_TYPE: LazyLock<DataType> =
LazyLock::new(|| DataType::Struct(BLOB_V2_DESC_FIELDS.clone()));

pub static BLOB_V2_DESC_FIELD: LazyLock<ArrowField> = LazyLock::new(|| {
ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), true).with_metadata(HashMap::from([
ArrowField::new("description", BLOB_V2_DESC_TYPE.clone(), false).with_metadata(HashMap::from([
(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string()),
("lance-encoding:packed".to_string(), "true".to_string()),
]))
});

Expand Down
6 changes: 4 additions & 2 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,15 +513,17 @@ impl Field {
///
/// If the field is not a blob, return the field itself.
pub fn into_unloaded_with_version(mut self, version: BlobVersion) -> Self {
if self.data_type().is_binary_like() && self.is_blob() {
if self.is_blob() {
match version {
BlobVersion::V2 => {
self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone();
self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone();
}
BlobVersion::V1 => {
self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_DESC_LANCE_FIELD.children.clone();
self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone();
}
}
}
Expand Down Expand Up @@ -1004,7 +1006,7 @@ impl TryFrom<&ArrowField> for Field {
// Check for JSON extension types (both Arrow and Lance)
let logical_type = if is_arrow_json_field(field) || is_json_field(field) {
LogicalType::from("json")
} else if is_blob_v2 {
} else if is_blob_v2 && !matches!(field.data_type(), DataType::Struct(_)) {
Comment thread
Xuanwo marked this conversation as resolved.
Outdated
LogicalType::from(super::BLOB_LOGICAL_TYPE)
} else {
LogicalType::try_from(field.data_type())?
Expand Down
135 changes: 102 additions & 33 deletions rust/lance-encoding/src/encodings/logical/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl BlobStructuralEncoder {

// Use the original field's name for the descriptor
let descriptor_field = Field::try_from(
ArrowField::new(&field.name, descriptor_data_type, field.nullable)
ArrowField::new(&field.name, descriptor_data_type, false)
Comment thread
Xuanwo marked this conversation as resolved.
Outdated
.with_metadata(descriptor_metadata),
)?;

Expand Down Expand Up @@ -267,7 +267,7 @@ impl FieldEncoder for BlobV2StructuralEncoder {
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
_repdef: RepDefBuilder,
mut repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
Expand All @@ -280,6 +280,11 @@ impl FieldEncoder for BlobV2StructuralEncoder {
};

let struct_arr = array.as_struct();
if let Some(validity) = struct_arr.nulls() {
repdef.add_validity_bitmap(validity.clone());
} else {
repdef.add_no_null(struct_arr.len());
}
let mut data_idx = None;
let mut uri_idx = None;
for (idx, field) in fields.iter().enumerate() {
Expand Down Expand Up @@ -310,12 +315,6 @@ impl FieldEncoder for BlobV2StructuralEncoder {
location: location!(),
});
}
if uri_is_set {
return Err(Error::NotSupported {
source: "External blob (uri) is not supported yet".into(),
location: location!(),
});
}
}

let binary_array = data_col;
Expand All @@ -327,34 +326,41 @@ impl FieldEncoder for BlobV2StructuralEncoder {
let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(binary_array.len());
let mut uri_builder = StringBuilder::with_capacity(binary_array.len(), 0);

for i in 0..binary_array.len() {
let is_null_row = match array.data_type() {
DataType::Struct(_) => array.is_null(i),
_ => binary_array.is_null(i),
};
if is_null_row {
kind_builder.append_null();
position_builder.append_null();
size_builder.append_null();
blob_id_builder.append_null();
uri_builder.append_null();
Comment on lines -330 to -340

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes forwards / backwards compatible?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe so. No blob v2 files have been written so far. It's part of file format 2.2

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I had forgotten we added a dedicated encoded / decoder for v2.

for i in 0..struct_arr.len() {
if struct_arr.is_null(i) {
// Packed struct does not support nullable fields; use empty/default values and rely on rep/def.
Comment thread
Xuanwo marked this conversation as resolved.
Outdated
kind_builder.append_value(0);
position_builder.append_value(0);
size_builder.append_value(0);
blob_id_builder.append_value(0);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like blob_id is always set to zero. Can you remind me what this is for again?

@Xuanwo Xuanwo Dec 3, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blob_id == 0 means it's empty (the valid value starts from 1).

And blob_id reprents the id used by packed & dedicated blobs which are both not implemented yet.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the !data_is_set branch then? I thought an inline blob was "data column is not null and uri is null" and a dedicated / packed blob is "data column is null and uri is not null"?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see the enum now. So the !data_is_set approach is external.

uri_builder.append_value("");
continue;
}

let value = binary_array.value(i);
kind_builder.append_value(0);

if value.is_empty() {
let data_is_set = !data_col.is_null(i);
if data_is_set {
let value = binary_array.value(i);
kind_builder.append_value(0);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use constants for kinds? or an enum that converts to/from an integer?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me, will update.

if value.is_empty() {
position_builder.append_value(0);
size_builder.append_value(0);
} else {
let position =
external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
position_builder.append_value(position);
size_builder.append_value(value.len() as u64);
}
blob_id_builder.append_value(0);
uri_builder.append_value("");
} else {
// external uri
let uri = uri_col.value(i);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if the URI is empty?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, we can check before writing.

kind_builder.append_value(3);
position_builder.append_value(0);
size_builder.append_value(0);
} else {
let position = external_buffers.add_buffer(LanceBuffer::from(Buffer::from(value)));
position_builder.append_value(position);
size_builder.append_value(value.len() as u64);
blob_id_builder.append_value(0);
uri_builder.append_value(uri);
}

blob_id_builder.append_null();
uri_builder.append_null();
}

let children: Vec<ArrayRef> = vec![
Expand All @@ -374,7 +380,7 @@ impl FieldEncoder for BlobV2StructuralEncoder {
self.descriptor_encoder.maybe_encode(
descriptor_array,
external_buffers,
RepDefBuilder::default(),
repdef,
row_number,
num_rows,
)
Expand Down Expand Up @@ -402,9 +408,16 @@ mod tests {
use crate::{
compression::DefaultCompressionStrategy,
encoder::{ColumnIndexSequence, EncodingOptions},
testing::{check_round_trip_encoding_of_data, TestCases},
testing::{
check_round_trip_encoding_of_data, check_round_trip_encoding_of_data_with_expected,
TestCases,
},
version::LanceFileVersion,
};
use arrow_array::{
ArrayRef, LargeBinaryArray, StringArray, StructArray, UInt32Array, UInt64Array, UInt8Array,
};
use arrow_array::LargeBinaryArray;
use arrow_schema::{DataType, Field as ArrowField};

#[test]
fn test_blob_encoder_creation() {
Expand Down Expand Up @@ -487,4 +500,60 @@ mod tests {
// Use the standard test harness
check_round_trip_encoding_of_data(vec![array], &TestCases::default(), blob_metadata).await;
}

#[tokio::test]
async fn test_blob_v2_external_round_trip() {
let blob_metadata =
HashMap::from([(lance_arrow::BLOB_META_KEY.to_string(), "true".to_string())]);

let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));

let data_array = LargeBinaryArray::from(vec![Some(b"inline".as_ref()), None, None]);
let uri_array = StringArray::from(vec![
None,
Some("file:///tmp/external.bin"),
Some("s3://bucket/blob"),
]);

let struct_array = StructArray::from(vec![
(data_field, Arc::new(data_array) as ArrayRef),
(uri_field, Arc::new(uri_array) as ArrayRef),
]);

let expected_descriptor = StructArray::from(vec![
(
Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
Arc::new(UInt8Array::from(vec![0, 3, 3])) as ArrayRef,
),
(
Arc::new(ArrowField::new("position", DataType::UInt64, false)),
Arc::new(UInt64Array::from(vec![0, 0, 0])) as ArrayRef,
),
(
Arc::new(ArrowField::new("size", DataType::UInt64, false)),
Arc::new(UInt64Array::from(vec![6, 0, 0])) as ArrayRef,
),
(
Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
Arc::new(UInt32Array::from(vec![0, 0, 0])) as ArrayRef,
),
(
Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"",
"file:///tmp/external.bin",
"s3://bucket/blob",
])) as ArrayRef,
),
]);

check_round_trip_encoding_of_data_with_expected(
vec![Arc::new(struct_array)],
Some(Arc::new(expected_descriptor)),
&TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
blob_metadata,
)
.await;
}
}
59 changes: 51 additions & 8 deletions rust/lance-encoding/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
use arrow_array::{make_array, Array, StructArray, UInt64Array};
use arrow_data::transform::{Capacities, MutableArrayData};
use arrow_ord::ord::make_comparator;
use arrow_schema::{DataType, Field, FieldRef, Schema, SortOptions};
use arrow_schema::{DataType, Field, Field as ArrowField, FieldRef, Schema, SortOptions};
use arrow_select::concat::concat;
use bytes::{Bytes, BytesMut};
use futures::{future::BoxFuture, FutureExt, StreamExt};
Expand Down Expand Up @@ -83,6 +83,12 @@ fn column_indices_from_schema_helper(
// In the old style, every field except FSL gets its own column. In the new style only primitive
// leaf fields get their own column.
for field in fields {
if is_structural_encoding && field.metadata().contains_key("lance-encoding:packed") {
column_indices.push(*column_counter);
*column_counter += 1;
continue;
}

match field.data_type() {
DataType::Struct(fields) => {
if !is_structural_encoding {
Expand Down Expand Up @@ -698,6 +704,15 @@ pub async fn check_round_trip_encoding_of_data(
data: Vec<Arc<dyn Array>>,
test_cases: &TestCases,
metadata: HashMap<String, String>,
) {
check_round_trip_encoding_of_data_with_expected(data, None, test_cases, metadata).await
}

pub async fn check_round_trip_encoding_of_data_with_expected(
data: Vec<Arc<dyn Array>>,
expected_override: Option<Arc<dyn Array>>,
test_cases: &TestCases,
metadata: HashMap<String, String>,
) {
let example_data = data.first().expect("Data must have at least one array");
let mut field = Field::new("", example_data.data_type().clone(), true);
Expand Down Expand Up @@ -725,8 +740,15 @@ pub async fn check_round_trip_encoding_of_data(
"Testing round trip encoding of data with file version {} and page size {}",
file_version, page_size
);
check_round_trip_encoding_inner(encoder, &field, data.clone(), test_cases, file_version)
.await
check_round_trip_encoding_inner(
encoder,
&field,
data.clone(),
expected_override.clone(),
test_cases,
file_version,
)
.await
}
}
}
Expand Down Expand Up @@ -795,6 +817,7 @@ async fn check_round_trip_encoding_inner(
mut encoder: Box<dyn FieldEncoder>,
field: &Field,
data: Vec<Arc<dyn Array>>,
expected_override: Option<Arc<dyn Array>>,
test_cases: &TestCases,
file_version: LanceFileVersion,
) {
Expand Down Expand Up @@ -902,8 +925,6 @@ async fn check_round_trip_encoding_inner(

let scheduler = Arc::new(SimulatedScheduler::new(encoded_data)) as Arc<dyn EncodingsIo>;

let schema = Schema::new(vec![field.clone()]);

let num_rows = data.iter().map(|arr| arr.len() as u64).sum::<u64>();
let concat_data = if test_cases.skip_validation {
None
Expand All @@ -924,16 +945,37 @@ async fn check_round_trip_encoding_inner(
Some(concat(&data.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>()).unwrap())
};

let expected_data = expected_override.clone().or_else(|| concat_data.clone());

let is_structural_encoding = file_version >= LanceFileVersion::V2_1;

let decode_field = if is_structural_encoding {
let mut lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
if lance_field.is_blob() && matches!(lance_field.data_type(), DataType::Struct(_)) {
lance_field =
lance_field.into_unloaded_with_version(lance_core::datatypes::BlobVersion::V2);
let mut arrow_field = ArrowField::from(&lance_field);
let mut metadata = arrow_field.metadata().clone();
metadata.insert("lance-encoding:packed".to_string(), "true".to_string());
arrow_field = arrow_field.with_metadata(metadata);
arrow_field
} else {
field.clone()
}
} else {
field.clone()
};

let schema = Schema::new(vec![decode_field]);

debug!("Testing full decode");
let scheduler_copy = scheduler.clone();
test_decode(
num_rows,
test_cases.batch_size,
&schema,
&column_infos,
concat_data.clone(),
expected_data.clone(),
scheduler_copy.clone(),
is_structural_encoding,
|mut decode_scheduler, tx| {
Expand All @@ -954,9 +996,9 @@ async fn check_round_trip_encoding_inner(
for range in &test_cases.ranges {
debug!("Testing decode of range {:?}", range);
let num_rows = range.end - range.start;
let expected = concat_data
let expected = expected_data
.as_ref()
.map(|concat_data| concat_data.slice(range.start as usize, num_rows as usize));
.map(|arr| arr.slice(range.start as usize, num_rows as usize));
let scheduler = scheduler.clone();
let range = range.clone();
test_decode(
Expand Down Expand Up @@ -1129,6 +1171,7 @@ async fn check_round_trip_random(
encoder_factory(file_version),
&field,
data,
None,
test_cases,
file_version,
)
Expand Down
Loading