From fc55498ab135b531a14078448783df7df4d6f6f1 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 29 Oct 2025 08:57:36 +0800 Subject: [PATCH 1/8] feat(python): expose DatasetDeltaBuilder and relevant apis --- python/python/lance/dataset.py | 97 +++++++++++++++++++++++++++++++ python/python/tests/test_delta.py | 33 +++++++++++ python/src/dataset.rs | 84 ++++++++++++++++++++++++++ rust/lance/src/dataset/delta.rs | 1 + 4 files changed, 215 insertions(+) create mode 100755 python/python/tests/test_delta.py diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 801e809a7ef..6f0eb6cf3ba 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3586,6 +3586,36 @@ def sql(self, sql: str) -> "SqlQueryBuilder": """ return SqlQueryBuilder(self._ds.sql(sql)) + def delta(self) -> "DatasetDeltaBuilder": + """ + Create a Delta comparison builder to explore changes between two versions. + + Returns + ------- + DatasetDeltaBuilder + A chainable builder. Call ``build()`` to get a ``DatasetDelta``, which can list transactions or stream inserted/updated rows. + + Examples + ---- + .. code-block:: python + + import lance + import pyarrow as pa + + # Write initial data (v1) + ds = lance.write_dataset(pa.table({"id": [1, 2], "val": ["a", "b"]}), "memory://delta_demo") + + # Append some data to create v2 + ds_append = lance.write_dataset(pa.table({"id": [3], "val": ["c"]}), "memory://delta_demo", mode="append") + + # Compute inserted rows from v1 -> v2 + delta = ds_append.delta().compared_against_version(1).build() + reader = delta.get_inserted_rows() + for batch in reader: + print(batch) + """ + return DatasetDeltaBuilder(self._ds.delta()) + @property def optimize(self) -> "DatasetOptimizer": return DatasetOptimizer(self) @@ -3750,6 +3780,73 @@ def build(self) -> SqlQuery: return SqlQuery(self._builder.build()) +class DatasetDelta: + """ + A view of differences between two versions. + + Created by :meth:`DatasetDeltaBuilder.build`. Provides convenient methods to stream inserted/updated rows or list transactions. + """ + + def __init__(self, delta): + self._delta = delta + + def list_transactions(self) -> List[Transaction]: + """ + List transactions in the range from begin_version + 1 to end_version. + """ + return self._delta.list_transactions() + + def get_inserted_rows(self) -> pa.RecordBatchReader: + """ + Return a streaming RecordBatchReader for inserted rows. + """ + return self._delta.get_inserted_rows() + + def get_updated_rows(self) -> pa.RecordBatchReader: + """ + Return a streaming RecordBatchReader for updated rows. + """ + return self._delta.get_updated_rows() + + +class DatasetDeltaBuilder: + """ + A builder for :class:`DatasetDelta`. + + Supports chainable configuration: set a comparison version or an explicit version range, then call :meth:`build` to obtain the delta object. + """ + + def __init__(self, builder): + self._builder = builder + + def compared_against_version(self, version: int) -> "DatasetDeltaBuilder": + """ + Configure comparison against the given version. + """ + self._builder = self._builder.compared_against_version(version) + return self + + def with_begin_version(self, version: int) -> "DatasetDeltaBuilder": + """ + Set the start version (exclusive). Must be used together with :meth:`with_end_version`. + """ + self._builder = self._builder.with_begin_version(version) + return self + + def with_end_version(self, version: int) -> "DatasetDeltaBuilder": + """ + Set the end version (inclusive). Must be used together with :meth:`with_begin_version`. + """ + self._builder = self._builder.with_end_version(version) + return self + + def build(self) -> DatasetDelta: + """ + Build a :class:`DatasetDelta` object. + """ + return DatasetDelta(self._builder.build()) + + class BulkCommitResult(TypedDict): dataset: LanceDataset merged: Transaction diff --git a/python/python/tests/test_delta.py b/python/python/tests/test_delta.py new file mode 100755 index 00000000000..495982463c0 --- /dev/null +++ b/python/python/tests/test_delta.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +import pyarrow as pa + +from lance import write_dataset, LanceDataset + + +def test_delta_get_inserted_rows_basic(): + # Create initial dataset (version 1) + table1 = pa.table({ + "id": pa.array([1, 2, 3], type=pa.int32()), + "val": pa.array(["a", "b", "c"], type=pa.string()), + }) + ds1 = write_dataset(table1, "memory://delta_api_test") + + # Append more rows to create version 2 + table2 = pa.table({ + "id": pa.array([4, 5], type=pa.int32()), + "val": pa.array(["d", "e"], type=pa.string()), + }) + ds2 = write_dataset(table2, "memory://delta_api_test", mode="append") + + # Build delta compared to v1 and fetch inserted rows + delta = ds2.delta().compared_against_version(1).build() + reader = delta.get_inserted_rows() + + # Sum rows from all batches + total_rows = 0 + for batch in reader: + total_rows += batch.num_rows + + assert total_rows == 2 diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 334cbdf04f1..617792d9396 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2625,6 +2625,14 @@ impl Dataset { .map(|desc| PyIndexDescription::new(desc.as_ref(), self.ds.as_ref())) .collect()) } + + /// Create a delta builder to explore changes between dataset versions. + #[pyo3(signature=())] + fn delta(&self) -> PyResult { + let mut ds = self.ds.as_ref().clone(); + let builder = ds.delta(); + Ok(DatasetDeltaBuilder { builder }) + } } #[pyclass(name = "SqlQuery", module = "_lib", subclass)] @@ -2726,6 +2734,82 @@ impl SqlQueryBuilder { } } +// -------------------- Delta API Bindings -------------------- + +#[pyclass(name = "DatasetDelta", module = "_lib", subclass)] +#[derive(Clone)] +pub struct DatasetDelta { + inner: lance::dataset::delta::DatasetDelta, +} + +#[pymethods] +impl DatasetDelta { + /// List transactions between begin_version+1 and end_version. + fn list_transactions(&self) -> PyResult>> { + let txs = rt() + .block_on(None, self.inner.list_transactions())? + .infer_error()?; + Ok(txs.into_iter().map(PyLance).collect()) + } + + /// Get inserted rows between begin_version (exclusive) and end_version (inclusive) as a stream reader. + fn get_inserted_rows(&self) -> PyResult { + use arrow::pyarrow::IntoPyArrow; + use arrow_array::RecordBatchReader; + let stream = rt() + .block_on(None, self.inner.get_inserted_rows())? + .infer_error()?; + let reader: Box = Box::new(LanceReader::from_stream(stream)); + Python::with_gil(|py| reader.into_pyarrow(py)) + } + + /// Get updated rows between begin_version (exclusive) and end_version (inclusive) as a stream reader. + fn get_updated_rows(&self) -> PyResult { + use arrow::pyarrow::IntoPyArrow; + use arrow_array::RecordBatchReader; + let stream = rt() + .block_on(None, self.inner.get_updated_rows())? + .infer_error()?; + let reader: Box = Box::new(LanceReader::from_stream(stream)); + Python::with_gil(|py| reader.into_pyarrow(py)) + } +} + +#[pyclass(name = "DatasetDeltaBuilder", module = "_lib", subclass)] +#[derive(Clone)] +pub struct DatasetDeltaBuilder { + builder: lance::dataset::delta::DatasetDeltaBuilder, +} + +#[pymethods] +impl DatasetDeltaBuilder { + #[pyo3(signature = (version))] + fn compared_against_version(&self, version: u64) -> Self { + Self { + builder: self.builder.clone().compared_against_version(version), + } + } + + #[pyo3(signature = (begin_version))] + fn with_begin_version(&self, begin_version: u64) -> Self { + Self { + builder: self.builder.clone().with_begin_version(begin_version), + } + } + + #[pyo3(signature = (end_version))] + fn with_end_version(&self, end_version: u64) -> Self { + Self { + builder: self.builder.clone().with_end_version(end_version), + } + } + + fn build(&self) -> PyResult { + let delta = self.builder.clone().build().infer_error()?; + Ok(DatasetDelta { inner: delta }) + } +} + #[derive(FromPyObject)] pub enum PyWriteDest { Dataset(Dataset), diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index f27fe358e5f..e0a4ee0a1ee 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -35,6 +35,7 @@ use snafu::location; /// # Ok(()) /// # } /// ``` +#[derive(Clone, Debug)] pub struct DatasetDeltaBuilder { dataset: Dataset, compared_against_version: Option, From f51ffb0fd8d72708471a21a0634481b941c63d6e Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 29 Oct 2025 11:33:46 +0800 Subject: [PATCH 2/8] fix code style issue --- python/python/lance/dataset.py | 21 +++++++++++++++------ python/python/tests/test_delta.py | 27 +++++++++++++++------------ 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 6f0eb6cf3ba..8301796c9ad 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3593,7 +3593,8 @@ def delta(self) -> "DatasetDeltaBuilder": Returns ------- DatasetDeltaBuilder - A chainable builder. Call ``build()`` to get a ``DatasetDelta``, which can list transactions or stream inserted/updated rows. + A chainable builder. Call ``build()`` to get a ``DatasetDelta``, + which can list transactions or stream inserted/updated rows. Examples ---- @@ -3606,7 +3607,11 @@ def delta(self) -> "DatasetDeltaBuilder": ds = lance.write_dataset(pa.table({"id": [1, 2], "val": ["a", "b"]}), "memory://delta_demo") # Append some data to create v2 - ds_append = lance.write_dataset(pa.table({"id": [3], "val": ["c"]}), "memory://delta_demo", mode="append") + ds_append = lance.write_dataset( + pa.table({"id": [3], "val": ["c"]}), + "memory://delta_demo", + mode="append" + ) # Compute inserted rows from v1 -> v2 delta = ds_append.delta().compared_against_version(1).build() @@ -3784,7 +3789,8 @@ class DatasetDelta: """ A view of differences between two versions. - Created by :meth:`DatasetDeltaBuilder.build`. Provides convenient methods to stream inserted/updated rows or list transactions. + Created by :meth:`DatasetDeltaBuilder.build`. + Provides convenient methods to stream inserted/updated rows or list transactions. """ def __init__(self, delta): @@ -3813,7 +3819,8 @@ class DatasetDeltaBuilder: """ A builder for :class:`DatasetDelta`. - Supports chainable configuration: set a comparison version or an explicit version range, then call :meth:`build` to obtain the delta object. + Supports chainable configuration: set a comparison version or + an explicit version range, then call :meth:`build` to obtain the delta object. """ def __init__(self, builder): @@ -3828,14 +3835,16 @@ def compared_against_version(self, version: int) -> "DatasetDeltaBuilder": def with_begin_version(self, version: int) -> "DatasetDeltaBuilder": """ - Set the start version (exclusive). Must be used together with :meth:`with_end_version`. + Set the start version (exclusive). + Must be used together with :meth:`with_end_version`. """ self._builder = self._builder.with_begin_version(version) return self def with_end_version(self, version: int) -> "DatasetDeltaBuilder": """ - Set the end version (inclusive). Must be used together with :meth:`with_begin_version`. + Set the end version (inclusive). + Must be used together with :meth:`with_begin_version`. """ self._builder = self._builder.with_end_version(version) return self diff --git a/python/python/tests/test_delta.py b/python/python/tests/test_delta.py index 495982463c0..9b065325346 100755 --- a/python/python/tests/test_delta.py +++ b/python/python/tests/test_delta.py @@ -2,23 +2,26 @@ # SPDX-FileCopyrightText: Copyright The Lance Authors import pyarrow as pa +from lance import write_dataset -from lance import write_dataset, LanceDataset - -def test_delta_get_inserted_rows_basic(): +def test_delta_get_inserted_rows(): # Create initial dataset (version 1) - table1 = pa.table({ - "id": pa.array([1, 2, 3], type=pa.int32()), - "val": pa.array(["a", "b", "c"], type=pa.string()), - }) - ds1 = write_dataset(table1, "memory://delta_api_test") + table1 = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "val": pa.array(["a", "b", "c"], type=pa.string()), + } + ) + write_dataset(table1, "memory://delta_api_test") # Append more rows to create version 2 - table2 = pa.table({ - "id": pa.array([4, 5], type=pa.int32()), - "val": pa.array(["d", "e"], type=pa.string()), - }) + table2 = pa.table( + { + "id": pa.array([4, 5], type=pa.int32()), + "val": pa.array(["d", "e"], type=pa.string()), + } + ) ds2 = write_dataset(table2, "memory://delta_api_test", mode="append") # Build delta compared to v1 and fetch inserted rows From a2a5355f1270b3deaa2276ca53a7b3c806f82907 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 29 Oct 2025 11:57:21 +0800 Subject: [PATCH 3/8] fix code style issue --- python/src/dataset.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 617792d9396..ade2b4516ca 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2629,7 +2629,7 @@ impl Dataset { /// Create a delta builder to explore changes between dataset versions. #[pyo3(signature=())] fn delta(&self) -> PyResult { - let mut ds = self.ds.as_ref().clone(); + let ds = self.ds.as_ref().clone(); let builder = ds.delta(); Ok(DatasetDeltaBuilder { builder }) } @@ -2737,7 +2737,6 @@ impl SqlQueryBuilder { // -------------------- Delta API Bindings -------------------- #[pyclass(name = "DatasetDelta", module = "_lib", subclass)] -#[derive(Clone)] pub struct DatasetDelta { inner: lance::dataset::delta::DatasetDelta, } @@ -2745,7 +2744,9 @@ pub struct DatasetDelta { #[pymethods] impl DatasetDelta { /// List transactions between begin_version+1 and end_version. - fn list_transactions(&self) -> PyResult>> { + fn list_transactions( + &self, + ) -> PyResult>> { let txs = rt() .block_on(None, self.inner.list_transactions())? .infer_error()?; From 006f04d438533a3527622f011db8896f2be27249 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 29 Oct 2025 14:30:45 +0800 Subject: [PATCH 4/8] fix test issue --- python/python/tests/test_delta.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/python/tests/test_delta.py b/python/python/tests/test_delta.py index 9b065325346..d9cdca2d195 100755 --- a/python/python/tests/test_delta.py +++ b/python/python/tests/test_delta.py @@ -13,7 +13,7 @@ def test_delta_get_inserted_rows(): "val": pa.array(["a", "b", "c"], type=pa.string()), } ) - write_dataset(table1, "memory://delta_api_test") + ds = write_dataset(table1, "memory://delta_api_test", enable_stable_row_ids=True) # Append more rows to create version 2 table2 = pa.table( @@ -22,10 +22,11 @@ def test_delta_get_inserted_rows(): "val": pa.array(["d", "e"], type=pa.string()), } ) - ds2 = write_dataset(table2, "memory://delta_api_test", mode="append") + ds.insert(table2) # Build delta compared to v1 and fetch inserted rows - delta = ds2.delta().compared_against_version(1).build() + delta = ds.delta().compared_against_version(1).build() + print(delta.list_transactions()) reader = delta.get_inserted_rows() # Sum rows from all batches From 11a24bd63d88e2aa84cf9c46dd663b46a65d10a6 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 29 Oct 2025 20:47:48 +0800 Subject: [PATCH 5/8] add test case for diff update --- python/python/tests/test_delta.py | 41 +++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/python/python/tests/test_delta.py b/python/python/tests/test_delta.py index d9cdca2d195..c947b514b3a 100755 --- a/python/python/tests/test_delta.py +++ b/python/python/tests/test_delta.py @@ -35,3 +35,44 @@ def test_delta_get_inserted_rows(): total_rows += batch.num_rows assert total_rows == 2 + + +def test_delta_get_updated_rows(): + # Create initial dataset (version 1) + table1 = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "val": pa.array(["a", "b", "c"], type=pa.string()), + } + ) + ds = write_dataset( + table1, "memory://delta_api_test_update", enable_stable_row_ids=True + ) + + # Update an existing row to create version 2 + update_stats = ds.update({"val": "'b_updated'"}, where="id = 2") + assert update_stats["num_rows_updated"] == 1 + + # Build delta compared to v1 and fetch updated rows + delta = ds.delta().compared_against_version(1).build() + + # Ensure the transaction is an Update (not an Append/Delete) + txs = delta.list_transactions() + assert len(txs) == 1 + assert type(txs[0].operation).__name__ == "Update" + + reader = delta.get_updated_rows() + + # Collect updated rows and validate contents + total_rows = 0 + for batch in reader: + total_rows += batch.num_rows + + assert total_rows == 1 + + # Ensure no inserted rows are present in this diff + inserted_reader = delta.get_inserted_rows() + total_inserted = 0 + for batch in inserted_reader: + total_inserted += batch.num_rows + assert total_inserted == 0 From 7447ab063c742db64fe6a5a9f8b5a97f57d06824 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 26 Nov 2025 10:40:00 -0800 Subject: [PATCH 6/8] make it more functional style --- python/python/lance/dataset.py | 113 +++++++++++++++++++++--------- python/python/tests/test_delta.py | 58 ++++++++++++++- 2 files changed, 136 insertions(+), 35 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 8301796c9ad..5c9c9759636 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3586,25 +3586,56 @@ def sql(self, sql: str) -> "SqlQueryBuilder": """ return SqlQueryBuilder(self._ds.sql(sql)) - def delta(self) -> "DatasetDeltaBuilder": + def delta( + self, + compared_against: Optional[int] = None, + *, + begin_version: Optional[int] = None, + end_version: Optional[int] = None, + ) -> "DatasetDelta": """ - Create a Delta comparison builder to explore changes between two versions. + Compare changes between two versions of this dataset. + + You must specify either ``compared_against`` (shorthand for comparing the + current version against a specific older version) or both ``begin_version`` + and ``end_version`` for an explicit range. + + Parameters + ---------- + compared_against : int, optional + The version to compare the current dataset version against. + This is a shorthand for setting ``begin_version=compared_against`` + and ``end_version=self.version``. + begin_version : int, optional + The start version (exclusive) for the comparison range. + Must be used together with ``end_version``. + end_version : int, optional + The end version (inclusive) for the comparison range. + Must be used together with ``begin_version``. Returns ------- - DatasetDeltaBuilder - A chainable builder. Call ``build()`` to get a ``DatasetDelta``, - which can list transactions or stream inserted/updated rows. + DatasetDelta + An object that can list transactions or stream inserted/updated rows. + + Raises + ------ + ValueError + If both ``compared_against`` and version range are specified, + or if neither is specified. Examples - ---- + -------- .. code-block:: python import lance import pyarrow as pa # Write initial data (v1) - ds = lance.write_dataset(pa.table({"id": [1, 2], "val": ["a", "b"]}), "memory://delta_demo") + ds = lance.write_dataset( + pa.table({"id": [1, 2], "val": ["a", "b"]}), + "memory://delta_demo" + ) # Append some data to create v2 ds_append = lance.write_dataset( @@ -3613,13 +3644,45 @@ def delta(self) -> "DatasetDeltaBuilder": mode="append" ) - # Compute inserted rows from v1 -> v2 - delta = ds_append.delta().compared_against_version(1).build() + # Compute inserted rows from v1 -> v2 (shorthand) + delta = ds_append.delta(compared_against=1) reader = delta.get_inserted_rows() for batch in reader: print(batch) + + # Or using explicit version range + delta = ds_append.delta(begin_version=1, end_version=2) """ - return DatasetDeltaBuilder(self._ds.delta()) + has_compared_against = compared_against is not None + has_range = begin_version is not None or end_version is not None + + if has_compared_against and has_range: + raise ValueError( + "Cannot specify both 'compared_against' and " + "'begin_version'/'end_version'. Use one or the other." + ) + + if not has_compared_against and not has_range: + raise ValueError( + "Must specify either 'compared_against' or both " + "'begin_version' and 'end_version'." + ) + + if has_range: + if begin_version is None or end_version is None: + raise ValueError( + "Both 'begin_version' and 'end_version' must be specified together." + ) + + builder = _DatasetDeltaBuilder(self._ds.delta()) + + if has_compared_against: + builder = builder.compared_against_version(compared_against) + else: + builder = builder.with_begin_version(begin_version) + builder = builder.with_end_version(end_version) + + return builder.build() @property def optimize(self) -> "DatasetOptimizer": @@ -3789,7 +3852,7 @@ class DatasetDelta: """ A view of differences between two versions. - Created by :meth:`DatasetDeltaBuilder.build`. + Created by :meth:`LanceDataset.delta`. Provides convenient methods to stream inserted/updated rows or list transactions. """ @@ -3815,44 +3878,28 @@ def get_updated_rows(self) -> pa.RecordBatchReader: return self._delta.get_updated_rows() -class DatasetDeltaBuilder: - """ - A builder for :class:`DatasetDelta`. +class _DatasetDeltaBuilder: + """Internal builder for :class:`DatasetDelta`. - Supports chainable configuration: set a comparison version or - an explicit version range, then call :meth:`build` to obtain the delta object. + This class is not part of the public API. Use :meth:`LanceDataset.delta` instead. """ def __init__(self, builder): self._builder = builder - def compared_against_version(self, version: int) -> "DatasetDeltaBuilder": - """ - Configure comparison against the given version. - """ + def compared_against_version(self, version: int) -> "_DatasetDeltaBuilder": self._builder = self._builder.compared_against_version(version) return self - def with_begin_version(self, version: int) -> "DatasetDeltaBuilder": - """ - Set the start version (exclusive). - Must be used together with :meth:`with_end_version`. - """ + def with_begin_version(self, version: int) -> "_DatasetDeltaBuilder": self._builder = self._builder.with_begin_version(version) return self - def with_end_version(self, version: int) -> "DatasetDeltaBuilder": - """ - Set the end version (inclusive). - Must be used together with :meth:`with_begin_version`. - """ + def with_end_version(self, version: int) -> "_DatasetDeltaBuilder": self._builder = self._builder.with_end_version(version) return self def build(self) -> DatasetDelta: - """ - Build a :class:`DatasetDelta` object. - """ return DatasetDelta(self._builder.build()) diff --git a/python/python/tests/test_delta.py b/python/python/tests/test_delta.py index c947b514b3a..c50cd34c32e 100755 --- a/python/python/tests/test_delta.py +++ b/python/python/tests/test_delta.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright The Lance Authors import pyarrow as pa +import pytest from lance import write_dataset @@ -25,7 +26,7 @@ def test_delta_get_inserted_rows(): ds.insert(table2) # Build delta compared to v1 and fetch inserted rows - delta = ds.delta().compared_against_version(1).build() + delta = ds.delta(compared_against=1) print(delta.list_transactions()) reader = delta.get_inserted_rows() @@ -54,7 +55,7 @@ def test_delta_get_updated_rows(): assert update_stats["num_rows_updated"] == 1 # Build delta compared to v1 and fetch updated rows - delta = ds.delta().compared_against_version(1).build() + delta = ds.delta(compared_against=1) # Ensure the transaction is an Update (not an Append/Delete) txs = delta.list_transactions() @@ -76,3 +77,56 @@ def test_delta_get_updated_rows(): for batch in inserted_reader: total_inserted += batch.num_rows assert total_inserted == 0 + + +def test_delta_with_explicit_version_range(): + # Create initial dataset (version 1) + table1 = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "val": pa.array(["a", "b", "c"], type=pa.string()), + } + ) + ds = write_dataset( + table1, "memory://delta_version_range_test", enable_stable_row_ids=True + ) + + # Append more rows to create version 2 + table2 = pa.table( + { + "id": pa.array([4, 5], type=pa.int32()), + "val": pa.array(["d", "e"], type=pa.string()), + } + ) + ds.insert(table2) + + # Use explicit version range instead of compared_against + delta = ds.delta(begin_version=1, end_version=2) + reader = delta.get_inserted_rows() + + total_rows = 0 + for batch in reader: + total_rows += batch.num_rows + + assert total_rows == 2 + + +def test_delta_validation_errors(): + table = pa.table({"id": pa.array([1, 2, 3], type=pa.int32())}) + ds = write_dataset(table, "memory://delta_validation_test") + + # Error: no parameters specified + with pytest.raises(ValueError, match="Must specify either"): + ds.delta() + + # Error: both compared_against and version range specified + with pytest.raises(ValueError, match="Cannot specify both"): + ds.delta(compared_against=1, begin_version=1, end_version=2) + + # Error: only begin_version specified + with pytest.raises(ValueError, match="Both 'begin_version' and 'end_version'"): + ds.delta(begin_version=1) + + # Error: only end_version specified + with pytest.raises(ValueError, match="Both 'begin_version' and 'end_version'"): + ds.delta(end_version=2) From ff2cc75eec0fe1cd701bd3cd65189768dddba550 Mon Sep 17 00:00:00 2001 From: yanghua Date: Thu, 27 Nov 2025 10:44:33 +0800 Subject: [PATCH 7/8] refactor: remove unnecessary parameter check logic --- python/python/lance/dataset.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 5c9c9759636..2db0e5a232f 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3654,25 +3654,6 @@ def delta( delta = ds_append.delta(begin_version=1, end_version=2) """ has_compared_against = compared_against is not None - has_range = begin_version is not None or end_version is not None - - if has_compared_against and has_range: - raise ValueError( - "Cannot specify both 'compared_against' and " - "'begin_version'/'end_version'. Use one or the other." - ) - - if not has_compared_against and not has_range: - raise ValueError( - "Must specify either 'compared_against' or both " - "'begin_version' and 'end_version'." - ) - - if has_range: - if begin_version is None or end_version is None: - raise ValueError( - "Both 'begin_version' and 'end_version' must be specified together." - ) builder = _DatasetDeltaBuilder(self._ds.delta()) From d292d6653e5f522dd6588d42a55bfd56555fa2a3 Mon Sep 17 00:00:00 2001 From: yanghua Date: Thu, 27 Nov 2025 11:58:23 +0800 Subject: [PATCH 8/8] fix test issue --- python/python/lance/dataset.py | 7 +++++-- python/python/tests/test_delta.py | 16 ++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 2db0e5a232f..8265ad3f793 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3660,8 +3660,11 @@ def delta( if has_compared_against: builder = builder.compared_against_version(compared_against) else: - builder = builder.with_begin_version(begin_version) - builder = builder.with_end_version(end_version) + if begin_version: + builder = builder.with_begin_version(begin_version) + + if end_version: + builder = builder.with_end_version(end_version) return builder.build() diff --git a/python/python/tests/test_delta.py b/python/python/tests/test_delta.py index c50cd34c32e..589dab8dc3f 100755 --- a/python/python/tests/test_delta.py +++ b/python/python/tests/test_delta.py @@ -119,14 +119,18 @@ def test_delta_validation_errors(): with pytest.raises(ValueError, match="Must specify either"): ds.delta() - # Error: both compared_against and version range specified - with pytest.raises(ValueError, match="Cannot specify both"): - ds.delta(compared_against=1, begin_version=1, end_version=2) - # Error: only begin_version specified - with pytest.raises(ValueError, match="Both 'begin_version' and 'end_version'"): + with pytest.raises( + ValueError, + match="Invalid user input: Must specify both with_begin_version " + "and with_end_version", + ): ds.delta(begin_version=1) # Error: only end_version specified - with pytest.raises(ValueError, match="Both 'begin_version' and 'end_version'"): + with pytest.raises( + ValueError, + match="Invalid user input: Must specify both with_begin_version " + "and with_end_version", + ): ds.delta(end_version=2)