Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3586,6 +3586,88 @@ def sql(self, sql: str) -> "SqlQueryBuilder":
"""
return SqlQueryBuilder(self._ds.sql(sql))

def delta(
self,
compared_against: Optional[int] = None,
*,
begin_version: Optional[int] = None,
end_version: Optional[int] = None,
) -> "DatasetDelta":
"""
Compare changes between two versions of this dataset.

You must specify either ``compared_against`` (shorthand for comparing the
current version against a specific older version) or both ``begin_version``
and ``end_version`` for an explicit range.

Parameters
----------
compared_against : int, optional
The version to compare the current dataset version against.
This is a shorthand for setting ``begin_version=compared_against``
and ``end_version=self.version``.
begin_version : int, optional
The start version (exclusive) for the comparison range.
Must be used together with ``end_version``.
end_version : int, optional
The end version (inclusive) for the comparison range.
Must be used together with ``begin_version``.

Returns
-------
DatasetDelta
An object that can list transactions or stream inserted/updated rows.

Raises
------
ValueError
If both ``compared_against`` and version range are specified,
or if neither is specified.

Examples
--------
.. code-block:: python

import lance
import pyarrow as pa

# Write initial data (v1)
ds = lance.write_dataset(
pa.table({"id": [1, 2], "val": ["a", "b"]}),
"memory://delta_demo"
)

# Append some data to create v2
ds_append = lance.write_dataset(
pa.table({"id": [3], "val": ["c"]}),
"memory://delta_demo",
mode="append"
)

# Compute inserted rows from v1 -> v2 (shorthand)
delta = ds_append.delta(compared_against=1)
reader = delta.get_inserted_rows()
for batch in reader:
print(batch)

# Or using explicit version range
delta = ds_append.delta(begin_version=1, end_version=2)
"""
has_compared_against = compared_against is not None

builder = _DatasetDeltaBuilder(self._ds.delta())

if has_compared_against:
builder = builder.compared_against_version(compared_against)
else:
if begin_version:
builder = builder.with_begin_version(begin_version)

if end_version:
builder = builder.with_end_version(end_version)

return builder.build()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree DatasetDeltaBuilder could be _DatasetDeltaBuilder. Can we reuse the arg check in the builder.build() function?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? It should already use it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see what you mean, you mean we don't need to duplicate the checks, agree


@property
def optimize(self) -> "DatasetOptimizer":
return DatasetOptimizer(self)
Expand Down Expand Up @@ -3750,6 +3832,61 @@ def build(self) -> SqlQuery:
return SqlQuery(self._builder.build())


class DatasetDelta:
"""
A view of differences between two versions.

Created by :meth:`LanceDataset.delta`.
Provides convenient methods to stream inserted/updated rows or list transactions.
"""

def __init__(self, delta):
self._delta = delta

def list_transactions(self) -> List[Transaction]:
"""
List transactions in the range from begin_version + 1 to end_version.
"""
return self._delta.list_transactions()

def get_inserted_rows(self) -> pa.RecordBatchReader:
"""
Return a streaming RecordBatchReader for inserted rows.
"""
return self._delta.get_inserted_rows()

def get_updated_rows(self) -> pa.RecordBatchReader:
"""
Return a streaming RecordBatchReader for updated rows.
"""
return self._delta.get_updated_rows()


class _DatasetDeltaBuilder:
"""Internal builder for :class:`DatasetDelta`.

This class is not part of the public API. Use :meth:`LanceDataset.delta` instead.
"""

def __init__(self, builder):
self._builder = builder

def compared_against_version(self, version: int) -> "_DatasetDeltaBuilder":
self._builder = self._builder.compared_against_version(version)
return self

def with_begin_version(self, version: int) -> "_DatasetDeltaBuilder":
self._builder = self._builder.with_begin_version(version)
return self

def with_end_version(self, version: int) -> "_DatasetDeltaBuilder":
self._builder = self._builder.with_end_version(version)
return self

def build(self) -> DatasetDelta:
return DatasetDelta(self._builder.build())


class BulkCommitResult(TypedDict):
dataset: LanceDataset
merged: Transaction
Expand Down
136 changes: 136 additions & 0 deletions python/python/tests/test_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import pyarrow as pa
import pytest
from lance import write_dataset


def test_delta_get_inserted_rows():
# Create initial dataset (version 1)
table1 = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"val": pa.array(["a", "b", "c"], type=pa.string()),
}
)
ds = write_dataset(table1, "memory://delta_api_test", enable_stable_row_ids=True)

# Append more rows to create version 2
table2 = pa.table(
{
"id": pa.array([4, 5], type=pa.int32()),
"val": pa.array(["d", "e"], type=pa.string()),
}
)
ds.insert(table2)

# Build delta compared to v1 and fetch inserted rows
delta = ds.delta(compared_against=1)
print(delta.list_transactions())
reader = delta.get_inserted_rows()

# Sum rows from all batches
total_rows = 0
for batch in reader:
total_rows += batch.num_rows

assert total_rows == 2


def test_delta_get_updated_rows():
# Create initial dataset (version 1)
table1 = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"val": pa.array(["a", "b", "c"], type=pa.string()),
}
)
ds = write_dataset(
table1, "memory://delta_api_test_update", enable_stable_row_ids=True
)

# Update an existing row to create version 2
update_stats = ds.update({"val": "'b_updated'"}, where="id = 2")
assert update_stats["num_rows_updated"] == 1

# Build delta compared to v1 and fetch updated rows
delta = ds.delta(compared_against=1)

# Ensure the transaction is an Update (not an Append/Delete)
txs = delta.list_transactions()
assert len(txs) == 1
assert type(txs[0].operation).__name__ == "Update"

reader = delta.get_updated_rows()

# Collect updated rows and validate contents
total_rows = 0
for batch in reader:
total_rows += batch.num_rows

assert total_rows == 1

# Ensure no inserted rows are present in this diff
inserted_reader = delta.get_inserted_rows()
total_inserted = 0
for batch in inserted_reader:
total_inserted += batch.num_rows
assert total_inserted == 0


def test_delta_with_explicit_version_range():
# Create initial dataset (version 1)
table1 = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"val": pa.array(["a", "b", "c"], type=pa.string()),
}
)
ds = write_dataset(
table1, "memory://delta_version_range_test", enable_stable_row_ids=True
)

# Append more rows to create version 2
table2 = pa.table(
{
"id": pa.array([4, 5], type=pa.int32()),
"val": pa.array(["d", "e"], type=pa.string()),
}
)
ds.insert(table2)

# Use explicit version range instead of compared_against
delta = ds.delta(begin_version=1, end_version=2)
reader = delta.get_inserted_rows()

total_rows = 0
for batch in reader:
total_rows += batch.num_rows

assert total_rows == 2


def test_delta_validation_errors():
table = pa.table({"id": pa.array([1, 2, 3], type=pa.int32())})
ds = write_dataset(table, "memory://delta_validation_test")

# Error: no parameters specified
with pytest.raises(ValueError, match="Must specify either"):
ds.delta()

# Error: only begin_version specified
with pytest.raises(
ValueError,
match="Invalid user input: Must specify both with_begin_version "
"and with_end_version",
):
ds.delta(begin_version=1)

# Error: only end_version specified
with pytest.raises(
ValueError,
match="Invalid user input: Must specify both with_begin_version "
"and with_end_version",
):
ds.delta(end_version=2)
85 changes: 85 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2625,6 +2625,14 @@ impl Dataset {
.map(|desc| PyIndexDescription::new(desc.as_ref(), self.ds.as_ref()))
.collect())
}

/// Create a delta builder to explore changes between dataset versions.
#[pyo3(signature=())]
fn delta(&self) -> PyResult<DatasetDeltaBuilder> {
let ds = self.ds.as_ref().clone();
let builder = ds.delta();
Ok(DatasetDeltaBuilder { builder })
}
}

#[pyclass(name = "SqlQuery", module = "_lib", subclass)]
Expand Down Expand Up @@ -2726,6 +2734,83 @@ impl SqlQueryBuilder {
}
}

// -------------------- Delta API Bindings --------------------

#[pyclass(name = "DatasetDelta", module = "_lib", subclass)]
pub struct DatasetDelta {
inner: lance::dataset::delta::DatasetDelta,
}

#[pymethods]
impl DatasetDelta {
/// List transactions between begin_version+1 and end_version.
fn list_transactions(
&self,
) -> PyResult<Vec<PyLance<lance::dataset::transaction::Transaction>>> {
let txs = rt()
.block_on(None, self.inner.list_transactions())?
.infer_error()?;
Ok(txs.into_iter().map(PyLance).collect())
}

/// Get inserted rows between begin_version (exclusive) and end_version (inclusive) as a stream reader.
fn get_inserted_rows(&self) -> PyResult<PyObject> {
use arrow::pyarrow::IntoPyArrow;
use arrow_array::RecordBatchReader;
let stream = rt()
.block_on(None, self.inner.get_inserted_rows())?
.infer_error()?;
let reader: Box<dyn RecordBatchReader + Send> = Box::new(LanceReader::from_stream(stream));
Python::with_gil(|py| reader.into_pyarrow(py))
}

/// Get updated rows between begin_version (exclusive) and end_version (inclusive) as a stream reader.
fn get_updated_rows(&self) -> PyResult<PyObject> {
use arrow::pyarrow::IntoPyArrow;
use arrow_array::RecordBatchReader;
let stream = rt()
.block_on(None, self.inner.get_updated_rows())?
.infer_error()?;
let reader: Box<dyn RecordBatchReader + Send> = Box::new(LanceReader::from_stream(stream));
Python::with_gil(|py| reader.into_pyarrow(py))
}
}

#[pyclass(name = "DatasetDeltaBuilder", module = "_lib", subclass)]
#[derive(Clone)]
pub struct DatasetDeltaBuilder {
builder: lance::dataset::delta::DatasetDeltaBuilder,
}

#[pymethods]
impl DatasetDeltaBuilder {
#[pyo3(signature = (version))]
fn compared_against_version(&self, version: u64) -> Self {
Self {
builder: self.builder.clone().compared_against_version(version),
}
}

#[pyo3(signature = (begin_version))]
fn with_begin_version(&self, begin_version: u64) -> Self {
Self {
builder: self.builder.clone().with_begin_version(begin_version),
}
}

#[pyo3(signature = (end_version))]
fn with_end_version(&self, end_version: u64) -> Self {
Self {
builder: self.builder.clone().with_end_version(end_version),
}
}

fn build(&self) -> PyResult<DatasetDelta> {
let delta = self.builder.clone().build().infer_error()?;
Ok(DatasetDelta { inner: delta })
}
}

#[derive(FromPyObject)]
pub enum PyWriteDest {
Dataset(Dataset),
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use snafu::location;
/// # Ok(())
/// # }
/// ```
#[derive(Clone, Debug)]
pub struct DatasetDeltaBuilder {
dataset: Dataset,
compared_against_version: Option<u64>,
Expand Down
Loading