Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions python/python/lance/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,44 @@ class Blob:
"""
A logical blob value for writing Lance blob columns.

A blob can be represented either by inlined bytes or by an external URI.
A blob can be represented as:
- inline bytes
- an external URI with position and size, if position and size are not set,
use the full uri.
"""

data: Optional[bytes] = None
uri: Optional[str] = None
position: Optional[int] = None
size: Optional[int] = None

def __post_init__(self) -> None:
if self.data is not None and self.uri is not None:
raise ValueError("Blob cannot have both data and uri")
if self.uri == "":
raise ValueError("Blob uri cannot be empty")
if (self.position is not None or self.size is not None) and self.uri is None:
raise ValueError("External packed blob must have a uri")
if (self.position is None) != (self.size is None):
raise ValueError(
"External blob must set both position and size, or neither"
)
if self.data is not None and self.position is not None:
raise ValueError(
"Blob cannot have both inline data and external slice metadata"
)

@staticmethod
def from_bytes(data: Union[bytes, bytearray, memoryview]) -> "Blob":
return Blob(data=bytes(data))

@staticmethod
def from_uri(uri: str) -> "Blob":
def from_uri(uri: str, position: int = None, size: int = None) -> "Blob":
if uri == "":
raise ValueError("Blob uri cannot be empty")
return Blob(uri=uri)
if position < 0 or size < 0:
raise ValueError("External blob position and size must be non-negative")
return Blob(uri=uri, position=position, size=size)

@staticmethod
def empty() -> "Blob":
Expand All @@ -55,6 +72,8 @@ def __init__(self) -> None:
[
pa.field("data", pa.large_binary(), nullable=True),
pa.field("uri", pa.utf8(), nullable=True),
pa.field("position", pa.uint64(), nullable=True),
pa.field("size", pa.uint64(), nullable=True),
]
)
pa.ExtensionType.__init__(self, storage_type, "lance.blob.v2")
Expand Down Expand Up @@ -98,18 +117,24 @@ class BlobArray(pa.ExtensionArray):
def from_pylist(cls, values: list[Any]) -> "BlobArray":
data_values: list[Optional[bytes]] = []
uri_values: list[Optional[str]] = []
position_values: list[Optional[int]] = []
size_values: list[Optional[int]] = []
null_mask: list[bool] = []

for v in values:
if v is None:
data_values.append(None)
uri_values.append(None)
position_values.append(None)
size_values.append(None)
null_mask.append(True)
continue

if isinstance(v, Blob):
data_values.append(v.data)
uri_values.append(v.uri)
position_values.append(v.position)
size_values.append(v.size)
null_mask.append(False)
continue

Expand All @@ -118,12 +143,16 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
raise ValueError("Blob uri cannot be empty")
data_values.append(None)
uri_values.append(v)
position_values.append(None)
size_values.append(None)
null_mask.append(False)
continue

if isinstance(v, (bytes, bytearray, memoryview)):
data_values.append(bytes(v))
uri_values.append(None)
position_values.append(None)
size_values.append(None)
null_mask.append(False)
continue

Expand All @@ -134,9 +163,13 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":

data_arr = pa.array(data_values, type=pa.large_binary())
uri_arr = pa.array(uri_values, type=pa.utf8())
position_arr = pa.array(position_values, type=pa.uint64())
size_arr = pa.array(size_values, type=pa.uint64())
mask_arr = pa.array(null_mask, type=pa.bool_())
storage = pa.StructArray.from_arrays(
[data_arr, uri_arr], names=["data", "uri"], mask=mask_arr
[data_arr, uri_arr, position_arr, size_arr],
names=["data", "uri", "position", "size"],
mask=mask_arr,
)
return pa.ExtensionArray.from_storage(BlobType(), storage) # type: ignore[return-value]

Expand Down
49 changes: 48 additions & 1 deletion python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import io
import tarfile

import lance
import pyarrow as pa
import pytest
from lance import BlobColumn
from lance import Blob, BlobColumn


def test_blob_read_from_binary():
Expand Down Expand Up @@ -332,3 +335,47 @@ def test_blob_extension_write_external(tmp_path):
assert blob.size() == 5
with blob as f:
assert f.read() == b"hello"


def test_blob_extension_write_external_slice(tmp_path):
tar_path = tmp_path / "container.tar"
names = ["a.bin", "b.bin", "c.bin"]
payloads = [b"alpha", b"bravo", b"charlie"]

# Build a tar container with three distinct binary entries.
with tarfile.open(tar_path, "w") as tf:
for name, data in zip(names, payloads):
info = tarfile.TarInfo(name)
info.size = len(data)
tf.addfile(info, io.BytesIO(data))

# Re-open the tar to obtain offsets and sizes for each member.
positions: list[int] = []
sizes: list[int] = []
with tarfile.open(tar_path, "r") as tf:
for name in names:
member = tf.getmember(name)
positions.append(member.offset_data)
sizes.append(member.size)

uri = tar_path.as_uri()

blob_values = [
Blob.from_uri(uri, position, size) for position, size in zip(positions, sizes)
]

table = pa.table({"blob": lance.blob_array(blob_values)})

ds = lance.write_dataset(
table,
tmp_path / "ds",
data_storage_version="2.2",
)

blobs = ds.take_blobs("blob", indices=[0, 1, 2])
assert len(blobs) == len(payloads)

for expected, blob_file in zip(payloads, blobs):
assert blob_file.size() == len(expected)
with blob_file as f:
assert f.read() == expected
5 changes: 4 additions & 1 deletion rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,10 @@ pub enum BlobKind {
Packed = 1,
/// Stored in a dedicated raw blob file; `blob_id` identifies the file, `size` is the full file length.
Dedicated = 2,
/// Not stored by Lance; `blob_uri` holds an absolute external URI, offsets are zero.
/// Not stored by Lance; `blob_uri` holds an absolute external URI.
///
/// External blobs can have a position and a size. Users can specify a range for an external blob.
/// If the position is not set, it defaults to 0, which points to the beginning of the blob.
External = 3,
}

Expand Down
83 changes: 76 additions & 7 deletions rust/lance-encoding/src/encodings/logical/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,20 @@ impl FieldEncoder for BlobV2StructuralEncoder {
blob_id_col.value(i),
"".to_string(),
),
BlobKind::External => (
BlobKind::External as u8,
0,
0,
0,
uri_col.value(i).to_string(),
),
BlobKind::External => {
let uri = uri_col.value(i).to_string();
let position = if packed_position_col.is_null(i) {
0
} else {
packed_position_col.value(i)
};
let size = if blob_size_col.is_null(i) {
0
} else {
blob_size_col.value(i)
};
(BlobKind::External as u8, position, size, 0, uri)
}
BlobKind::Packed => (
BlobKind::Packed as u8,
packed_position_col.value(i),
Expand Down Expand Up @@ -667,6 +674,68 @@ mod tests {
.await;
}

#[tokio::test]
async fn test_blob_v2_external_with_range_round_trip() {
let blob_metadata = HashMap::from([(
lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
lance_arrow::BLOB_V2_EXT_NAME.to_string(),
)]);

let kind_field = Arc::new(ArrowField::new("kind", DataType::UInt8, true));
let data_field = Arc::new(ArrowField::new("data", DataType::LargeBinary, true));
let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, true));
let blob_id_field = Arc::new(ArrowField::new("blob_id", DataType::UInt32, true));
let blob_size_field = Arc::new(ArrowField::new("blob_size", DataType::UInt64, true));
let position_field = Arc::new(ArrowField::new("position", DataType::UInt64, true));

let kind_array = UInt8Array::from(vec![BlobKind::External as u8]);
let data_array = LargeBinaryArray::from(vec![None::<&[u8]>]);
let uri_array = StringArray::from(vec![Some("memory://container.pack")]);
let blob_id_array = UInt32Array::from(vec![0]);
let blob_size_array = UInt64Array::from(vec![42]);
let position_array = UInt64Array::from(vec![7]);

let struct_array = StructArray::from(vec![
(kind_field, Arc::new(kind_array) as ArrayRef),
(data_field, Arc::new(data_array) as ArrayRef),
(uri_field, Arc::new(uri_array) as ArrayRef),
(blob_id_field, Arc::new(blob_id_array) as ArrayRef),
(blob_size_field, Arc::new(blob_size_array) as ArrayRef),
(position_field, Arc::new(position_array) as ArrayRef),
]);

let expected_descriptor = StructArray::from(vec![
(
Arc::new(ArrowField::new("kind", DataType::UInt8, false)),
Arc::new(UInt8Array::from(vec![BlobKind::External as u8])) as ArrayRef,
),
(
Arc::new(ArrowField::new("position", DataType::UInt64, false)),
Arc::new(UInt64Array::from(vec![7])) as ArrayRef,
),
(
Arc::new(ArrowField::new("size", DataType::UInt64, false)),
Arc::new(UInt64Array::from(vec![42])) as ArrayRef,
),
(
Arc::new(ArrowField::new("blob_id", DataType::UInt32, false)),
Arc::new(UInt32Array::from(vec![0])) as ArrayRef,
),
(
Arc::new(ArrowField::new("blob_uri", DataType::Utf8, false)),
Arc::new(StringArray::from(vec!["memory://container.pack"])) as ArrayRef,
),
]);

check_round_trip_encoding_of_data_with_expected(
vec![Arc::new(struct_array)],
Some(Arc::new(expected_descriptor)),
&TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
blob_metadata,
)
.await;
}

#[tokio::test]
async fn test_blob_v2_packed_round_trip() {
let blob_metadata = HashMap::from([(
Expand Down
34 changes: 30 additions & 4 deletions rust/lance/src/dataset/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ impl BlobPreprocessor {
Error::invalid_input("Blob struct missing `uri` field", location!())
})?
.as_string::<i32>();
let position_col = struct_arr
.column_by_name("position")
.map(|col| col.as_primitive::<UInt64Type>());
let size_col = struct_arr
.column_by_name("size")
.map(|col| col.as_primitive::<UInt64Type>());

let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0);
let mut uri_builder = StringBuilder::with_capacity(struct_arr.len(), 0);
Expand All @@ -262,6 +268,14 @@ impl BlobPreprocessor {

let has_data = !data_col.is_null(i);
let has_uri = !uri_col.is_null(i);
let has_position = position_col
.as_ref()
.map(|col| !col.is_null(i))
.unwrap_or(false);
let has_size = size_col
.as_ref()
.map(|col| !col.is_null(i))
.unwrap_or(false);
let data_len = if has_data { data_col.value(i).len() } else { 0 };

let dedicated_threshold = self.dedicated_thresholds[idx];
Expand Down Expand Up @@ -296,8 +310,18 @@ impl BlobPreprocessor {
data_builder.append_null();
uri_builder.append_value(uri_val);
blob_id_builder.append_null();
blob_size_builder.append_null();
position_builder.append_null();
if has_position && has_size {
let position = position_col
.as_ref()
.expect("position column must exist")
.value(i);
let size = size_col.as_ref().expect("size column must exist").value(i);
blob_size_builder.append_value(size);
position_builder.append_value(position);
} else {
blob_size_builder.append_null();
position_builder.append_null();
}
continue;
}

Expand Down Expand Up @@ -463,6 +487,7 @@ impl BlobFile {
}
pub async fn new_external(
uri: String,
position: u64,
size: u64,
registry: Arc<ObjectStoreRegistry>,
params: Arc<ObjectStoreParams>,
Expand All @@ -477,7 +502,7 @@ impl BlobFile {
Ok(Self {
object_store,
path,
position: 0,
position,
size,
kind: BlobKind::External,
uri: Some(uri),
Expand Down Expand Up @@ -820,14 +845,15 @@ async fn collect_blob_files_v2(
}
BlobKind::External => {
let uri = blob_uris.value(idx).to_string();
let position = positions.value(idx);
let size = sizes.value(idx);
let registry = dataset.session.store_registry();
let params = dataset
.store_params
.as_ref()
.map(|p| Arc::new((**p).clone()))
.unwrap_or_else(|| Arc::new(ObjectStoreParams::default()));
files.push(BlobFile::new_external(uri, size, registry, params).await?);
files.push(BlobFile::new_external(uri, position, size, registry, params).await?);
}
}
}
Expand Down