Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1640,12 +1640,11 @@ def take_blobs(
if ids is not None:
lance_blob_files = self._ds.take_blobs(ids, blob_column)
elif addresses is not None:
# ROW ids and Row address are the same until stable ROW ID is implemented.
lance_blob_files = self._ds.take_blobs(addresses, blob_column)
lance_blob_files = self._ds.take_blobs_by_addresses(addresses, blob_column)
elif indices is not None:
lance_blob_files = self._ds.take_blobs_by_indices(indices, blob_column)
else:
raise ValueError("Either ids or indices must be specified")
raise ValueError("Either ids, addresses, or indices must be specified")
return [BlobFile(lance_blob_file) for lance_blob_file in lance_blob_files]

def head(self, num_rows, **kwargs):
Expand Down
10 changes: 10 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ class _Dataset:
columns_with_transform: Optional[List[Tuple[str, str]]] = None,
) -> pa.RecordBatch: ...
def take_blobs(
self,
row_ids: List[int],
blob_column: str,
) -> List[LanceBlobFile]: ...
def take_blobs_by_addresses(
self,
row_addresses: List[int],
blob_column: str,
) -> List[LanceBlobFile]: ...
def take_blobs_by_indices(
self,
row_indices: List[int],
blob_column: str,
Expand Down
43 changes: 43 additions & 0 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,49 @@ def test_blob_files_by_address(dataset_with_blobs):
assert f.read() == expected


def test_blob_files_by_address_with_stable_row_ids(tmp_path):
table = pa.table(
{
"blobs": pa.array([b"foo"], pa.large_binary()),
"idx": pa.array([0], pa.uint64()),
},
schema=pa.schema(
[
pa.field(
"blobs", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
),
pa.field("idx", pa.uint64()),
]
),
)
ds = lance.write_dataset(
table,
tmp_path / "test_ds",
enable_stable_row_ids=True,
)

ds.insert(
pa.table(
{
"blobs": pa.array([b"bar"], pa.large_binary()),
"idx": pa.array([1], pa.uint64()),
},
schema=table.schema,
)
)

t = ds.to_table(columns=["idx"], with_row_address=True)
# Pick a row from the second fragment. With stable row ids enabled, the row address
# is not a valid row id, so calling the row-id based API would fail.
Comment thread
Xuanwo marked this conversation as resolved.
Outdated
row_idx = t.column("idx").to_pylist().index(1)
addr = t.column("_rowaddr").to_pylist()[row_idx]

blobs = ds.take_blobs("blobs", addresses=[addr])
assert len(blobs) == 1
with blobs[0] as f:
assert f.read() == b"bar"


def test_blob_by_indices(tmp_path, dataset_with_blobs):
indices = [0, 4]
blobs = dataset_with_blobs.take_blobs("blobs", indices=indices)
Expand Down
20 changes: 18 additions & 2 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1174,13 +1174,29 @@ impl Dataset {

fn take_blobs(
self_: PyRef<'_, Self>,
row_indices: Vec<u64>,
row_ids: Vec<u64>,
blob_column: &str,
) -> PyResult<Vec<LanceBlobFile>> {
let blobs = rt()
.block_on(
Some(self_.py()),
self_.ds.take_blobs(&row_ids, blob_column),
)?
.infer_error()?;
Ok(blobs.into_iter().map(LanceBlobFile::from).collect())
}

fn take_blobs_by_addresses(
self_: PyRef<'_, Self>,
row_addresses: Vec<u64>,
blob_column: &str,
) -> PyResult<Vec<LanceBlobFile>> {
let blobs = rt()
.block_on(
Some(self_.py()),
self_.ds.take_blobs(&row_indices, blob_column),
self_
.ds
.take_blobs_by_addresses(&row_addresses, blob_column),
)?
.infer_error()?;
Ok(blobs.into_iter().map(LanceBlobFile::from).collect())
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,7 @@ impl Dataset {
TakeBuilder::try_new_from_ids(self.clone(), row_ids.to_vec(), projection.into())
}

/// Take [BlobFile] by row ids (row address).
/// Take [BlobFile] by row IDs.
pub async fn take_blobs(
self: &Arc<Self>,
row_ids: &[u64],
Expand Down
Loading