Skip to content
Merged
26 changes: 25 additions & 1 deletion rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,23 @@ impl Field {
.unwrap_or(false)
}

/// If the field is a blob, return a new field with the same name and id
Comment thread
Xuanwo marked this conversation as resolved.
Outdated
/// but with the data type set to a struct of the blob description fields.
///
/// If the field is not a blob, return the field itself.
pub fn unloaded_mut(&mut self) -> &mut Self {
if self.is_blob_v2() {
self.logical_type = BLOB_V2_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_V2_DESC_LANCE_FIELD.children.clone();
self.metadata = BLOB_V2_DESC_LANCE_FIELD.metadata.clone();
} else if self.is_blob() {
self.logical_type = BLOB_DESC_LANCE_FIELD.logical_type.clone();
self.children = BLOB_DESC_LANCE_FIELD.children.clone();
self.metadata = BLOB_DESC_LANCE_FIELD.metadata.clone();
}
self
}

/// If the field is a blob, return a new field with the same name and id
/// but with the data type set to a struct of the blob description fields.
///
Expand Down Expand Up @@ -744,6 +761,13 @@ impl Field {
(&self_type, &other_type),
(DataType::Struct(_), DataType::Struct(_)) | (DataType::List(_), DataType::List(_))
) {
// Blob v2 uses a struct logical type for descriptors, which differs from the logical
// input struct (data/uri). When intersecting schemas for projection we want to keep
// the projected blob layout instead of intersecting by child names.
if ignore_types && self.is_blob() && other.is_blob() {
Comment thread
Xuanwo marked this conversation as resolved.
Outdated
return Ok(self.clone());
}

let children = self
.children
.iter()
Expand Down Expand Up @@ -1015,7 +1039,7 @@ impl TryFrom<&ArrowField> for Field {
let logical_type = if is_arrow_json_field(field) || is_json_field(field) {
LogicalType::from("json")
} else if is_blob_v2 {
LogicalType::from(super::BLOB_LOGICAL_TYPE)
LogicalType::from("struct")
} else {
LogicalType::try_from(field.data_type())?
};
Expand Down
4 changes: 3 additions & 1 deletion rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,9 @@ pub enum BlobHandling {

impl BlobHandling {
fn should_unload(&self, field: &Field) -> bool {
if !field.data_type().is_binary_like() {
// Blob v2 columns are Structs, so we need to treat any blob-marked field as unloadable
// even if the physical data type is not binary-like.
if !(field.data_type().is_binary_like() || field.is_blob()) {
return false;
}
match self {
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-file/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,11 @@ impl ReaderProjection {
field_id_to_column_index,
&mut column_indices,
)?;
Ok(Self {
let projection = Self {
schema: Arc::new(schema.clone()),
column_indices,
})
};
Ok(projection)
}

/// Creates a projection that reads the entire file
Expand Down
9 changes: 9 additions & 0 deletions rust/lance-file/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,15 @@ impl FileWriter {
async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created", location!()))?;
schema.metadata = std::mem::take(&mut self.schema_metadata);
// Use descriptor layout for blob v2 in the footer to avoid exposing logical child fields.
//
// TODO(xuanwo): this doesn't work on nested struct, need better solution like fields_per_order_mut?
schema.fields.iter_mut().for_each(|f| {
if f.is_blob_v2() {
let _ = f.unloaded_mut();
}
Comment thread
Xuanwo marked this conversation as resolved.
});

let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
let file_descriptor_bytes = file_descriptor.encode_to_vec();
let file_descriptor_len = file_descriptor_bytes.len() as u64;
Expand Down
238 changes: 238 additions & 0 deletions rust/lance/src/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Convenience builders for Lance blob v2 input columns.
//!
//! Blob v2 expects a column shaped as `Struct<data: LargeBinary?, uri: Utf8?>` and
//! tagged with `ARROW:extension:name = "lance.blob.v2"`. This module offers a
//! type-safe builder to construct that struct without manually wiring metadata

use std::sync::Arc;

use arrow_array::{
builder::LargeBinaryBuilder, builder::StringBuilder, Array, ArrayRef, StructArray,
};
use arrow_buffer::NullBufferBuilder;
use arrow_schema::{DataType, Field};
use lance_arrow::{ARROW_EXT_NAME_KEY, BLOB_V2_EXT_NAME};

use crate::{Error, Result};

/// A typed wrapper around the blob v2 input struct column.
pub struct BlobArray {
inner: StructArray,
}

impl BlobArray {
/// Construct the Arrow field for a blob v2 column.
pub fn field(name: &str, nullable: bool) -> Field {
let metadata = [(ARROW_EXT_NAME_KEY.to_string(), BLOB_V2_EXT_NAME.to_string())]
.into_iter()
.collect();
Field::new(
name,
DataType::Struct(
vec![
Field::new("data", DataType::LargeBinary, true),
Field::new("uri", DataType::Utf8, true),
]
.into(),
),
nullable,
)
.with_metadata(metadata)
}

/// Borrow the underlying struct array.
pub fn as_struct(&self) -> &StructArray {
&self.inner
}

/// Number of rows.
pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

pub fn null_count(&self) -> usize {
self.inner.null_count()
}
Comment thread
Xuanwo marked this conversation as resolved.
Outdated
}

impl From<BlobArray> for ArrayRef {
fn from(value: BlobArray) -> Self {
Arc::new(value.inner)
}
}

/// Builder for [`BlobArray`].
///
/// The builder enforces that each row contains exactly one of `data` or `uri` (or is null).
pub struct BlobArrayBuilder {
data_builder: LargeBinaryBuilder,
uri_builder: StringBuilder,
validity: NullBufferBuilder,
expected_len: usize,
len: usize,
}

impl BlobArrayBuilder {
/// Create a new builder with the given row capacity.
pub fn new(capacity: usize) -> Self {
Self {
data_builder: LargeBinaryBuilder::with_capacity(capacity, 0),
uri_builder: StringBuilder::with_capacity(capacity, 0),
validity: NullBufferBuilder::new(capacity),
expected_len: capacity,
len: 0,
}
}

/// Append a blob backed by raw bytes.
pub fn push_bytes(&mut self, bytes: impl AsRef<[u8]>) -> Result<()> {
self.ensure_capacity()?;
self.validity.append_non_null();
self.data_builder.append_value(bytes);
self.uri_builder.append_null();
self.len += 1;
Ok(())
}

/// Append a blob referenced by URI.
pub fn push_uri(&mut self, uri: impl Into<String>) -> Result<()> {
self.ensure_capacity()?;
let uri = uri.into();
if uri.is_empty() {
return Err(Error::invalid_input(
"URI cannot be empty",
snafu::location!(),
));
}
self.validity.append_non_null();
self.data_builder.append_null();
self.uri_builder.append_value(uri);
self.len += 1;
Ok(())
}

/// Append an empty blob (inline, zero-length payload).
pub fn push_empty(&mut self) -> Result<()> {
self.ensure_capacity()?;
self.validity.append_non_null();
self.data_builder.append_value([]);
self.uri_builder.append_null();
self.len += 1;
Ok(())
}

/// Append a null row.
pub fn push_null(&mut self) -> Result<()> {
self.ensure_capacity()?;
self.validity.append_null();
self.data_builder.append_null();
self.uri_builder.append_null();
self.len += 1;
Ok(())
}

/// Finish building and return the [`BlobArray`].
pub fn finish(mut self) -> Result<BlobArray> {
if self.len != self.expected_len {
return Err(Error::invalid_input(
format!(
"Expected {} rows but received {}",
self.expected_len, self.len
),
snafu::location!(),
));
}

let data = Arc::new(self.data_builder.finish());
let uri = Arc::new(self.uri_builder.finish());
let validity = self.validity.finish();

let struct_array = StructArray::try_new(
vec![
Field::new("data", DataType::LargeBinary, true),
Field::new("uri", DataType::Utf8, true),
]
.into(),
vec![data as ArrayRef, uri as ArrayRef],
validity,
)?;

Ok(BlobArray {
inner: struct_array,
})
}

fn ensure_capacity(&self) -> Result<()> {
if self.len >= self.expected_len {
Err(Error::invalid_input(
"BlobArrayBuilder capacity exceeded",
snafu::location!(),
))
} else {
Ok(())
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_array::cast::AsArray;
use arrow_array::Array;

#[test]
fn test_field_metadata() {
let field = BlobArray::field("blob", true);
assert!(field.metadata().get(ARROW_EXT_NAME_KEY).is_some());
assert_eq!(
field.metadata().get(ARROW_EXT_NAME_KEY).unwrap(),
BLOB_V2_EXT_NAME
);
}

#[test]
fn test_builder_basic() {
let mut b = BlobArrayBuilder::new(4);
b.push_bytes(b"hi").unwrap();
b.push_uri("s3://bucket/key").unwrap();
b.push_empty().unwrap();
b.push_null().unwrap();

let arr = b.finish().unwrap();
assert_eq!(arr.len(), 4);
assert_eq!(arr.null_count(), 1);

let struct_arr = arr.as_struct();
let data = struct_arr.column(0).as_binary::<i64>();
let uri = struct_arr.column(1).as_string::<i32>();

assert_eq!(data.value(0), b"hi");
assert!(uri.is_null(0));
assert!(data.is_null(1));
assert_eq!(uri.value(1), "s3://bucket/key");
assert_eq!(data.value(2).len(), 0);
assert!(uri.is_null(2));
}

#[test]
fn test_capacity_error() {
let mut b = BlobArrayBuilder::new(1);
b.push_bytes(b"a").unwrap();
let err = b.push_bytes(b"b").unwrap_err();
assert!(err.to_string().contains("capacity exceeded"));
}

#[test]
fn test_empty_uri_rejected() {
let mut b = BlobArrayBuilder::new(1);
let err = b.push_uri("").unwrap_err();
assert!(err.to_string().contains("URI cannot be empty"));
}
}
47 changes: 46 additions & 1 deletion rust/lance/src/dataset/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,8 @@ mod tests {

use arrow::{array::AsArray, datatypes::UInt64Type};
use arrow_array::RecordBatch;
use arrow_array::{RecordBatchIterator, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance_arrow::DataTypeExt;
use lance_io::stream::RecordBatchStream;
Expand All @@ -794,7 +796,12 @@ mod tests {
use lance_file::version::LanceFileVersion;

use super::data_file_key_from_path;
use crate::{utils::test::TestDatasetGenerator, Dataset};
use crate::{
blob::{BlobArray, BlobArrayBuilder},
dataset::WriteParams,
utils::test::TestDatasetGenerator,
Dataset,
};

struct BlobTestFixture {
_test_dir: TempStrDir,
Expand Down Expand Up @@ -1054,4 +1061,42 @@ mod tests {
assert_eq!(data_file_key_from_path("abc.lance"), "abc");
assert_eq!(data_file_key_from_path("nested/path/xyz"), "xyz");
}

#[tokio::test]
async fn test_write_and_take_blobs_with_blob_array_builder() {
let test_dir = TempStrDir::default();

// Build a blob column with the new BlobArrayBuilder
let mut blob_builder = BlobArrayBuilder::new(2);
blob_builder.push_bytes(b"hello").unwrap();
blob_builder.push_bytes(b"world").unwrap();
let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap().into();

let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(vec![0, 1]));
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
BlobArray::field("blob", true),
]));

let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap();
let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());

let params = WriteParams::with_storage_version(LanceFileVersion::V2_2);
let dataset = Arc::new(
Dataset::write(reader, &test_dir, Some(params))
.await
.unwrap(),
);

let blobs = dataset
.take_blobs_by_indices(&[0, 1], "blob")
.await
.unwrap();

assert_eq!(blobs.len(), 2);
let first = blobs[0].read().await.unwrap();
let second = blobs[1].read().await.unwrap();
assert_eq!(first.as_ref(), b"hello");
assert_eq!(second.as_ref(), b"world");
}
}
Loading
Loading