diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 801e809a7ef..8265ad3f793 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3586,6 +3586,88 @@ def sql(self, sql: str) -> "SqlQueryBuilder": """ return SqlQueryBuilder(self._ds.sql(sql)) + def delta( + self, + compared_against: Optional[int] = None, + *, + begin_version: Optional[int] = None, + end_version: Optional[int] = None, + ) -> "DatasetDelta": + """ + Compare changes between two versions of this dataset. + + You must specify either ``compared_against`` (shorthand for comparing the + current version against a specific older version) or both ``begin_version`` + and ``end_version`` for an explicit range. + + Parameters + ---------- + compared_against : int, optional + The version to compare the current dataset version against. + This is a shorthand for setting ``begin_version=compared_against`` + and ``end_version=self.version``. + begin_version : int, optional + The start version (exclusive) for the comparison range. + Must be used together with ``end_version``. + end_version : int, optional + The end version (inclusive) for the comparison range. + Must be used together with ``begin_version``. + + Returns + ------- + DatasetDelta + An object that can list transactions or stream inserted/updated rows. + + Raises + ------ + ValueError + If both ``compared_against`` and version range are specified, + or if neither is specified. + + Examples + -------- + .. code-block:: python + + import lance + import pyarrow as pa + + # Write initial data (v1) + ds = lance.write_dataset( + pa.table({"id": [1, 2], "val": ["a", "b"]}), + "memory://delta_demo" + ) + + # Append some data to create v2 + ds_append = lance.write_dataset( + pa.table({"id": [3], "val": ["c"]}), + "memory://delta_demo", + mode="append" + ) + + # Compute inserted rows from v1 -> v2 (shorthand) + delta = ds_append.delta(compared_against=1) + reader = delta.get_inserted_rows() + for batch in reader: + print(batch) + + # Or using explicit version range + delta = ds_append.delta(begin_version=1, end_version=2) + """ + has_compared_against = compared_against is not None + + builder = _DatasetDeltaBuilder(self._ds.delta()) + + if has_compared_against: + builder = builder.compared_against_version(compared_against) + else: + if begin_version: + builder = builder.with_begin_version(begin_version) + + if end_version: + builder = builder.with_end_version(end_version) + + return builder.build() + @property def optimize(self) -> "DatasetOptimizer": return DatasetOptimizer(self) @@ -3750,6 +3832,61 @@ def build(self) -> SqlQuery: return SqlQuery(self._builder.build()) +class DatasetDelta: + """ + A view of differences between two versions. + + Created by :meth:`LanceDataset.delta`. + Provides convenient methods to stream inserted/updated rows or list transactions. + """ + + def __init__(self, delta): + self._delta = delta + + def list_transactions(self) -> List[Transaction]: + """ + List transactions in the range from begin_version + 1 to end_version. + """ + return self._delta.list_transactions() + + def get_inserted_rows(self) -> pa.RecordBatchReader: + """ + Return a streaming RecordBatchReader for inserted rows. + """ + return self._delta.get_inserted_rows() + + def get_updated_rows(self) -> pa.RecordBatchReader: + """ + Return a streaming RecordBatchReader for updated rows. + """ + return self._delta.get_updated_rows() + + +class _DatasetDeltaBuilder: + """Internal builder for :class:`DatasetDelta`. + + This class is not part of the public API. Use :meth:`LanceDataset.delta` instead. + """ + + def __init__(self, builder): + self._builder = builder + + def compared_against_version(self, version: int) -> "_DatasetDeltaBuilder": + self._builder = self._builder.compared_against_version(version) + return self + + def with_begin_version(self, version: int) -> "_DatasetDeltaBuilder": + self._builder = self._builder.with_begin_version(version) + return self + + def with_end_version(self, version: int) -> "_DatasetDeltaBuilder": + self._builder = self._builder.with_end_version(version) + return self + + def build(self) -> DatasetDelta: + return DatasetDelta(self._builder.build()) + + class BulkCommitResult(TypedDict): dataset: LanceDataset merged: Transaction diff --git a/python/python/tests/test_delta.py b/python/python/tests/test_delta.py new file mode 100755 index 00000000000..589dab8dc3f --- /dev/null +++ b/python/python/tests/test_delta.py @@ -0,0 +1,136 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +import pyarrow as pa +import pytest +from lance import write_dataset + + +def test_delta_get_inserted_rows(): + # Create initial dataset (version 1) + table1 = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "val": pa.array(["a", "b", "c"], type=pa.string()), + } + ) + ds = write_dataset(table1, "memory://delta_api_test", enable_stable_row_ids=True) + + # Append more rows to create version 2 + table2 = pa.table( + { + "id": pa.array([4, 5], type=pa.int32()), + "val": pa.array(["d", "e"], type=pa.string()), + } + ) + ds.insert(table2) + + # Build delta compared to v1 and fetch inserted rows + delta = ds.delta(compared_against=1) + print(delta.list_transactions()) + reader = delta.get_inserted_rows() + + # Sum rows from all batches + total_rows = 0 + for batch in reader: + total_rows += batch.num_rows + + assert total_rows == 2 + + +def test_delta_get_updated_rows(): + # Create initial dataset (version 1) + table1 = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "val": pa.array(["a", "b", "c"], type=pa.string()), + } + ) + ds = write_dataset( + table1, "memory://delta_api_test_update", enable_stable_row_ids=True + ) + + # Update an existing row to create version 2 + update_stats = ds.update({"val": "'b_updated'"}, where="id = 2") + assert update_stats["num_rows_updated"] == 1 + + # Build delta compared to v1 and fetch updated rows + delta = ds.delta(compared_against=1) + + # Ensure the transaction is an Update (not an Append/Delete) + txs = delta.list_transactions() + assert len(txs) == 1 + assert type(txs[0].operation).__name__ == "Update" + + reader = delta.get_updated_rows() + + # Collect updated rows and validate contents + total_rows = 0 + for batch in reader: + total_rows += batch.num_rows + + assert total_rows == 1 + + # Ensure no inserted rows are present in this diff + inserted_reader = delta.get_inserted_rows() + total_inserted = 0 + for batch in inserted_reader: + total_inserted += batch.num_rows + assert total_inserted == 0 + + +def test_delta_with_explicit_version_range(): + # Create initial dataset (version 1) + table1 = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "val": pa.array(["a", "b", "c"], type=pa.string()), + } + ) + ds = write_dataset( + table1, "memory://delta_version_range_test", enable_stable_row_ids=True + ) + + # Append more rows to create version 2 + table2 = pa.table( + { + "id": pa.array([4, 5], type=pa.int32()), + "val": pa.array(["d", "e"], type=pa.string()), + } + ) + ds.insert(table2) + + # Use explicit version range instead of compared_against + delta = ds.delta(begin_version=1, end_version=2) + reader = delta.get_inserted_rows() + + total_rows = 0 + for batch in reader: + total_rows += batch.num_rows + + assert total_rows == 2 + + +def test_delta_validation_errors(): + table = pa.table({"id": pa.array([1, 2, 3], type=pa.int32())}) + ds = write_dataset(table, "memory://delta_validation_test") + + # Error: no parameters specified + with pytest.raises(ValueError, match="Must specify either"): + ds.delta() + + # Error: only begin_version specified + with pytest.raises( + ValueError, + match="Invalid user input: Must specify both with_begin_version " + "and with_end_version", + ): + ds.delta(begin_version=1) + + # Error: only end_version specified + with pytest.raises( + ValueError, + match="Invalid user input: Must specify both with_begin_version " + "and with_end_version", + ): + ds.delta(end_version=2) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 334cbdf04f1..ade2b4516ca 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2625,6 +2625,14 @@ impl Dataset { .map(|desc| PyIndexDescription::new(desc.as_ref(), self.ds.as_ref())) .collect()) } + + /// Create a delta builder to explore changes between dataset versions. + #[pyo3(signature=())] + fn delta(&self) -> PyResult { + let ds = self.ds.as_ref().clone(); + let builder = ds.delta(); + Ok(DatasetDeltaBuilder { builder }) + } } #[pyclass(name = "SqlQuery", module = "_lib", subclass)] @@ -2726,6 +2734,83 @@ impl SqlQueryBuilder { } } +// -------------------- Delta API Bindings -------------------- + +#[pyclass(name = "DatasetDelta", module = "_lib", subclass)] +pub struct DatasetDelta { + inner: lance::dataset::delta::DatasetDelta, +} + +#[pymethods] +impl DatasetDelta { + /// List transactions between begin_version+1 and end_version. + fn list_transactions( + &self, + ) -> PyResult>> { + let txs = rt() + .block_on(None, self.inner.list_transactions())? + .infer_error()?; + Ok(txs.into_iter().map(PyLance).collect()) + } + + /// Get inserted rows between begin_version (exclusive) and end_version (inclusive) as a stream reader. + fn get_inserted_rows(&self) -> PyResult { + use arrow::pyarrow::IntoPyArrow; + use arrow_array::RecordBatchReader; + let stream = rt() + .block_on(None, self.inner.get_inserted_rows())? + .infer_error()?; + let reader: Box = Box::new(LanceReader::from_stream(stream)); + Python::with_gil(|py| reader.into_pyarrow(py)) + } + + /// Get updated rows between begin_version (exclusive) and end_version (inclusive) as a stream reader. + fn get_updated_rows(&self) -> PyResult { + use arrow::pyarrow::IntoPyArrow; + use arrow_array::RecordBatchReader; + let stream = rt() + .block_on(None, self.inner.get_updated_rows())? + .infer_error()?; + let reader: Box = Box::new(LanceReader::from_stream(stream)); + Python::with_gil(|py| reader.into_pyarrow(py)) + } +} + +#[pyclass(name = "DatasetDeltaBuilder", module = "_lib", subclass)] +#[derive(Clone)] +pub struct DatasetDeltaBuilder { + builder: lance::dataset::delta::DatasetDeltaBuilder, +} + +#[pymethods] +impl DatasetDeltaBuilder { + #[pyo3(signature = (version))] + fn compared_against_version(&self, version: u64) -> Self { + Self { + builder: self.builder.clone().compared_against_version(version), + } + } + + #[pyo3(signature = (begin_version))] + fn with_begin_version(&self, begin_version: u64) -> Self { + Self { + builder: self.builder.clone().with_begin_version(begin_version), + } + } + + #[pyo3(signature = (end_version))] + fn with_end_version(&self, end_version: u64) -> Self { + Self { + builder: self.builder.clone().with_end_version(end_version), + } + } + + fn build(&self) -> PyResult { + let delta = self.builder.clone().build().infer_error()?; + Ok(DatasetDelta { inner: delta }) + } +} + #[derive(FromPyObject)] pub enum PyWriteDest { Dataset(Dataset), diff --git a/rust/lance/src/dataset/delta.rs b/rust/lance/src/dataset/delta.rs index f27fe358e5f..e0a4ee0a1ee 100644 --- a/rust/lance/src/dataset/delta.rs +++ b/rust/lance/src/dataset/delta.rs @@ -35,6 +35,7 @@ use snafu::location; /// # Ok(()) /// # } /// ``` +#[derive(Clone, Debug)] pub struct DatasetDeltaBuilder { dataset: Dataset, compared_against_version: Option,