Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2284,6 +2284,7 @@ def add_bases(
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
retain_versions: Optional[int] = None,
*,
delete_unverified: bool = False,
error_if_tagged_old_versions: bool = True,
Expand All @@ -2303,8 +2304,11 @@ def cleanup_old_versions(
----------

older_than: timedelta, optional
Only versions older than this will be removed. If not specified, this
will default to two weeks.
Only versions older than this will be removed. If ``older_than`` and
``retain_versions`` are not specified, this will default to two weeks.

retain_versions: int, optional
Retain the last N versions of the dataset.

delete_unverified: bool, default False
Files leftover from a failed transaction may appear to be part of an
Expand All @@ -2324,10 +2328,14 @@ def cleanup_old_versions(
be ignored without any error and only untagged versions will be
cleaned up.
"""
if older_than is None:
if older_than is None and retain_versions is None:
older_than = timedelta(days=14)

return self._ds.cleanup_old_versions(
td_to_micros(older_than), delete_unverified, error_if_tagged_old_versions
td_to_micros(older_than) if older_than else None,
retain_versions,
delete_unverified,
error_if_tagged_old_versions,
)

def create_scalar_index(
Expand Down
38 changes: 38 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,44 @@ def test_cleanup_around_tagged_old_versions(tmp_path):
assert stats.old_versions == 1


def test_cleanup_with_retain_versions(tmp_path: Path):
base_dir = tmp_path / "cleanup_policy"
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
lance.write_dataset(table, base_dir, mode="create")
time.sleep(0.05)
lance.write_dataset(table, base_dir, mode="overwrite")
time.sleep(0.05)
lance.write_dataset(table, base_dir, mode="overwrite")
time.sleep(0.05)
ds = lance.write_dataset(table, base_dir, mode="append")

assert len(ds.versions()) == 4
stats = ds.cleanup_old_versions(retain_versions=3)
assert stats.old_versions == 1
assert len(ds.versions()) == 3
assert ds.count_rows() == len(ds.to_table())


def test_cleanup_with_older_than_and_retain_versions(tmp_path: Path):
base_dir = tmp_path / "cleanup_policy"
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
lance.write_dataset(table, base_dir, mode="create")
time.sleep(0.05)
lance.write_dataset(table, base_dir, mode="overwrite")
time.sleep(0.05)
lance.write_dataset(table, base_dir, mode="overwrite")
moment = datetime.now()
time.sleep(0.05)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the sleep calls necessary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is to verify the combined effect of older_than and retain_versions, the expected outcome should satisfy both constraints simultaneously. To avoid timing skew affecting the older_than evaluation and causing flakiness, I add sleep calls to stabilize the boundary conditions.

ds = lance.write_dataset(table, base_dir, mode="append")

stats = ds.cleanup_old_versions(
older_than=datetime.now() - moment, retain_versions=2
)
assert stats.old_versions == 2
assert len(ds.versions()) == 2
assert ds.count_rows() == len(ds.to_table())


def test_auto_cleanup(tmp_path):
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
Expand Down
35 changes: 23 additions & 12 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use arrow_data::ArrayData;
use arrow_schema::{DataType, Schema as ArrowSchema};
use async_trait::async_trait;
use blob::LanceBlobFile;
use chrono::{Duration, TimeDelta};
use chrono::{Duration, TimeDelta, Utc};
use futures::{StreamExt, TryFutureExt};
use lance_index::vector::bq::RQBuildParams;
use log::error;
Expand All @@ -33,6 +33,7 @@ use pyo3::{
use pyo3::{prelude::*, IntoPyObjectExt};
use snafu::location;

use lance::dataset::cleanup::CleanupPolicyBuilder;
use lance::dataset::index::LanceIndexStoreExt;
use lance::dataset::refs::{Ref, TagContents};
use lance::dataset::scanner::{
Expand Down Expand Up @@ -1496,23 +1497,33 @@ impl Dataset {
}

/// Cleanup old versions from the dataset
#[pyo3(signature = (older_than_micros, delete_unverified = None, error_if_tagged_old_versions = None))]
#[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None))]
fn cleanup_old_versions(
&self,
older_than_micros: i64,
older_than_micros: Option<i64>,
retain_versions: Option<usize>,
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
) -> PyResult<CleanupStats> {
let older_than = Duration::microseconds(older_than_micros);
let cleanup_stats = rt()
.block_on(
None,
self.ds.cleanup_old_versions(
older_than,
delete_unverified,
error_if_tagged_old_versions,
),
)?
.block_on(None, async {
let mut builder = CleanupPolicyBuilder::default();
if let Some(v) = older_than_micros {
let older_than = Duration::microseconds(v);
builder = builder.before_timestamp(Utc::now() - older_than);
}
if let Some(v) = retain_versions {
builder = builder.retain_n_versions(self.ds.as_ref(), v).await?;
}
if let Some(v) = delete_unverified {
builder = builder.delete_unverified(v);
}
if let Some(v) = error_if_tagged_old_versions {
builder = builder.error_if_tagged_old_versions(v);
}

self.ds.cleanup_with_policy(builder.build()).await
})?
.map_err(|err: lance::Error| PyIOError::new_err(err.to_string()))?;
Ok(CleanupStats {
bytes_removed: cleanup_stats.bytes_removed,
Expand Down