Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions python/ray/data/_internal/datasource/lance_datasource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Union

import numpy as np

Expand Down Expand Up @@ -29,6 +29,7 @@ class LanceDatasource(Datasource):
def __init__(
self,
uri: str,
version: Optional[Union[int, str]] = None,
columns: Optional[List[str]] = None,
filter: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
Expand All @@ -45,7 +46,9 @@ def __init__(
if filter is not None:
self.scanner_options["filter"] = filter
self.storage_options = storage_options
self.lance_ds = lance.dataset(uri=uri, storage_options=storage_options)
self.lance_ds = lance.dataset(
uri=uri, version=version, storage_options=storage_options
)

match = []
match.extend(self.READ_FRAGMENTS_ERRORS_TO_RETRY)
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4083,6 +4083,7 @@ def read_iceberg(
def read_lance(
uri: str,
*,
version: Optional[Union[int, str]] = None,
columns: Optional[List[str]] = None,
filter: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
Expand All @@ -4109,6 +4110,9 @@ def read_lance(
Args:
uri: The URI of the Lance dataset to read from. Local file paths, S3, and GCS
are supported.
version: Load a specific version of the Lance dataset. This can be an
integer version number or a string tag. By default, the
latest version is loaded.
columns: The columns to read. By default, all columns are read.
filter: Read returns only the rows matching the filter. By default, no
filter is applied.
Expand Down Expand Up @@ -4140,6 +4144,7 @@ def read_lance(
""" # noqa: E501
datasource = LanceDatasource(
uri=uri,
version=version,
columns=columns,
filter=filter,
storage_options=storage_options,
Expand Down
50 changes: 50 additions & 0 deletions python/ray/data/tests/test_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,56 @@ def test_lance_write_max_rows_per_file(data_path):
assert len(ds.get_fragments()) == 10


@pytest.mark.parametrize("data_path", [lazy_fixture("local_path")])
def test_lance_read_with_version(data_path):
# NOTE: Lance only works with PyArrow 12 or above.
pyarrow_version = get_pyarrow_version()
if pyarrow_version is not None and pyarrow_version < parse_version("12.0.0"):
return

# Write an initial dataset (version 1)
df1 = pa.table({"one": [2, 1, 3, 4, 6, 5], "two": ["b", "a", "c", "e", "g", "f"]})
setup_data_path = _unwrap_protocol(data_path)
path = os.path.join(setup_data_path, "test_version.lance")
lance.write_dataset(df1, path)

# Merge new data to create a later version (latest)
ds_lance = lance.dataset(path)
# Get the initial version
initial_version = ds_lance.version

df2 = pa.table(
{
"one": [1, 2, 3, 4, 5, 6],
"three": [4, 5, 8, 9, 12, 13],
"four": ["u", "v", "w", "x", "y", "z"],
}
)
ds_lance.merge(df2, "one")

# Default read should return the latest (merged) dataset.
ds_latest = ray.data.read_lance(path)

assert ds_latest.count() == 6
# Latest dataset should contain merged columns
assert "three" in ds_latest.schema().names

# Read the original version (0) and ensure it contains the original columns
ds_prev = ray.data.read_lance(path, version=initial_version)
assert ds_prev.count() == 6
assert ds_prev.schema().names == ["one", "two"]

values_prev = [[s["one"], s["two"]] for s in ds_prev.take_all()]
assert sorted(values_prev) == [
[1, "a"],
[2, "b"],
[3, "c"],
[4, "e"],
[5, "f"],
[6, "g"],
]


if __name__ == "__main__":
import sys

Expand Down