Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ impl BlockingDataset {
Ok(version)
}

pub fn version_id(&self) -> u64 {
self.inner.version_id()
}

pub fn list_versions(&self) -> Result<Vec<Version>> {
let versions = RT.block_on(self.inner.versions())?;
Ok(versions)
Expand Down Expand Up @@ -1625,6 +1629,20 @@ fn inner_get_version<'local>(
version.into_java(env)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeGetVersionId(
mut env: JNIEnv,
java_dataset: JObject,
) -> jlong {
ok_or_throw_with_return!(env, inner_get_version_id(&mut env, java_dataset), -1) as jlong
}

fn inner_get_version_id(env: &mut JNIEnv, java_dataset: JObject) -> Result<u64> {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
Ok(dataset_guard.version_id())
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeGetLatestVersionId(
mut env: JNIEnv,
Expand Down
7 changes: 6 additions & 1 deletion java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,14 @@ public String uri() {
* @return the version id of the dataset
*/
public long version() {
return getVersion().getId();
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
return nativeGetVersionId();
}
}

private native long nativeGetVersionId();

/**
* Gets the currently checked out version of the dataset.
*
Expand Down
10 changes: 10 additions & 0 deletions java/src/test/java/org/lance/DatasetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ void testDatasetVersion(@TempDir Path tempDir) {
try (Dataset dataset = testDataset.createEmptyDataset()) {
ZonedDateTime time1 = dataset.getVersion().getDataTime();
assertEquals(1, dataset.version());
assertEquals(dataset.getVersion().getId(), dataset.version());
assertTrue(time1.isEqual(before) || time1.isAfter(before));
assertTrue(time1.isEqual(ZonedDateTime.now()) || time1.isBefore(ZonedDateTime.now()));
assertEquals(time1.getZone(), Clock.systemUTC().getZone());
Expand All @@ -187,8 +188,10 @@ void testDatasetVersion(@TempDir Path tempDir) {
try (Dataset dataset2 = testDataset.write(1, 5)) {
ZonedDateTime time2 = dataset2.getVersion().getDataTime();
assertEquals(1, dataset.version());
assertEquals(dataset.getVersion().getId(), dataset.version());
assertEquals(2, dataset.latestVersion());
assertEquals(2, dataset2.version());
assertEquals(dataset2.getVersion().getId(), dataset2.version());
assertEquals(2, dataset2.latestVersion());
assertTrue(time2.isEqual(before) || time2.isAfter(before));
assertTrue(time2.isEqual(time1) || time2.isAfter(time1));
Expand All @@ -198,6 +201,7 @@ void testDatasetVersion(@TempDir Path tempDir) {
ReadOptions options1 = new ReadOptions.Builder().setVersion(1).build();
try (Dataset datasetV1 = Dataset.open(allocator, datasetPath, options1)) {
assertEquals(1, datasetV1.version());
assertEquals(datasetV1.getVersion().getId(), datasetV1.version());
assertTrue(time1.isEqual(dataset.getVersion().getDataTime()));
assertEquals(2, datasetV1.latestVersion());
}
Expand All @@ -206,26 +210,31 @@ void testDatasetVersion(@TempDir Path tempDir) {
try (Dataset dataset3 = testDataset.write(2, 3)) {
ZonedDateTime time3 = dataset3.getVersion().getDataTime();
assertEquals(1, dataset.version());
assertEquals(dataset.getVersion().getId(), dataset.version());
assertTrue(time1.isEqual(dataset.getVersion().getDataTime()));
assertEquals(3, dataset.latestVersion());
assertEquals(2, dataset2.version());
assertEquals(dataset2.getVersion().getId(), dataset2.version());
assertTrue(time2.isEqual(dataset2.getVersion().getDataTime()));
assertEquals(3, dataset2.latestVersion());
assertTrue(time3.isEqual(before) || time3.isAfter(before));
assertEquals(3, dataset3.version());
assertEquals(dataset3.getVersion().getId(), dataset3.version());
assertEquals(3, dataset3.latestVersion());

// Open dataset with version 2
ReadOptions options2 = new ReadOptions.Builder().setVersion(2).build();
try (Dataset datasetV2 = Dataset.open(allocator, datasetPath, options2)) {
assertEquals(2, datasetV2.version());
assertEquals(datasetV2.getVersion().getId(), datasetV2.version());
assertTrue(time2.isEqual(datasetV2.getVersion().getDataTime()));
assertEquals(3, datasetV2.latestVersion());
}

// Open dataset with latest version (3)
try (Dataset datasetLatest = Dataset.open(datasetPath, allocator)) {
assertEquals(3, datasetLatest.version());
assertEquals(datasetLatest.getVersion().getId(), datasetLatest.version());
assertTrue(time3.isEqual(datasetLatest.getVersion().getDataTime()));
assertEquals(3, datasetLatest.latestVersion());
}
Expand All @@ -242,6 +251,7 @@ void testDatasetVersion(@TempDir Path tempDir) {
assertArrayEquals(versions.toArray(), dataset3.listVersions().toArray());
dataset.checkoutLatest();
assertEquals(3, dataset.version());
assertEquals(dataset.getVersion().getId(), dataset.version());
assertTrue(time3.isEqual(dataset.getVersion().getDataTime()));
assertEquals(3, dataset.latestVersion());

Expand Down
5 changes: 5 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ def test_version_id(tmp_path: Path):
assert updated_ds.version == 2
assert updated_ds.latest_version == 2

historical_ds = updated_ds.checkout_version(1)
assert historical_ds.version == 1
assert historical_ds.latest_version == 2
assert historical_ds.checkout_version(historical_ds.latest_version).version == 2


def test_checkout(tmp_path: Path):
tab = pa.table({"a": range(3)})
Expand Down
4 changes: 2 additions & 2 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,7 @@ impl Dataset {

/// Fetches the currently checked out version of the dataset.
fn version(&self) -> PyResult<u64> {
Ok(self.ds.version().version)
Ok(self.ds.version_id())
}

fn latest_version(self_: PyRef<'_, Self>) -> PyResult<u64> {
Expand Down Expand Up @@ -3010,7 +3010,7 @@ impl Dataset {
} else {
Ok(Ref::Version(
self.ds.manifest.branch.clone(),
Some(self.ds.version().version),
Some(self.ds.version_id()),
))
}
}
Expand Down
12 changes: 12 additions & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,18 @@ impl Dataset {
self.session.clone()
}

/// Get the currently checked-out version id.
///
/// This is a cheap accessor that reads the id directly from the loaded
/// manifest without constructing the full [Version] summary.
pub fn version_id(&self) -> u64 {
self.manifest.version
}

/// Get the currently checked-out version details.
///
/// This constructs a full [Version], including summary metadata derived
/// from the loaded manifest fragments.
pub fn version(&self) -> Version {
Version::from(self.manifest.as_ref())
}
Expand Down
47 changes: 47 additions & 0 deletions rust/lance/src/dataset/tests/dataset_versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,53 @@ async fn test_strict_overwrite() {
.expect("Unstrict overwrite should succeed when committing to a stale version");
}

#[tokio::test]
async fn test_version_id_fast_path() {
let test_uri = TempStrDir::default();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::UInt32,
false,
)]));

let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_iter_values(0..5))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema.clone());

let original = Dataset::write(reader, &test_uri, None).await.unwrap();
assert_eq!(original.version_id(), 1);
assert_eq!(original.version_id(), original.version().version);
assert_eq!(original.latest_version_id().await.unwrap(), 1);

let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_iter_values(5..10))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema);
let updated = Dataset::write(
reader,
&test_uri,
Some(WriteParams {
mode: WriteMode::Append,
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(updated.version_id(), 2);
assert_eq!(updated.version_id(), updated.version().version);
assert_eq!(updated.latest_version_id().await.unwrap(), 2);

let historical = updated.checkout_version(1).await.unwrap();
assert_eq!(historical.version_id(), 1);
assert_eq!(historical.version_id(), historical.version().version);
assert_eq!(historical.latest_version_id().await.unwrap(), 2);
}

#[rstest]
#[tokio::test]
async fn test_restore(
Expand Down
Loading