diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index b7165088328..5dc4f124300 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -283,10 +283,12 @@ impl BlockingDataset { &mut self, transaction: Transaction, store_params: ObjectStoreParams, + detached: bool, ) -> Result { let new_dataset = RT.block_on( CommitBuilder::new(Arc::new(self.clone().inner)) .with_store_params(store_params) + .with_detached(detached) .execute(transaction), )?; Ok(BlockingDataset { inner: new_dataset }) @@ -322,13 +324,14 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>( _obj: JObject, arrow_schema_addr: jlong, path: JString, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - storage_options_obj: JObject, // Map + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional + storage_options_obj: JObject, // Map s3_credentials_refresh_offset_seconds_obj: JObject, // Optional initial_bases: JObject, target_bases: JObject, @@ -345,6 +348,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>( mode, enable_stable_row_ids, data_storage_version, + enable_v2_manifest_paths, storage_options_obj, s3_credentials_refresh_offset_seconds_obj, initial_bases, @@ -358,13 +362,14 @@ fn inner_create_with_ffi_schema<'local>( env: &mut JNIEnv<'local>, arrow_schema_addr: jlong, path: JString, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - storage_options_obj: JObject, // Map + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional + storage_options_obj: JObject, // Map s3_credentials_refresh_offset_seconds_obj: JObject, // Optional initial_bases: JObject, target_bases: JObject, @@ -383,6 +388,7 @@ fn inner_create_with_ffi_schema<'local>( mode, enable_stable_row_ids, data_storage_version, + enable_v2_manifest_paths, storage_options_obj, JObject::null(), // No provider for schema-only creation s3_credentials_refresh_offset_seconds_obj, @@ -412,13 +418,14 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>( _obj: JObject, arrow_array_stream_addr: jlong, path: JString, - max_rows_per_file: JObject, // Optional - max_rows_per_group: JObject, // Optional - max_bytes_per_file: JObject, // Optional - mode: JObject, // Optional - enable_stable_row_ids: JObject, // Optional - data_storage_version: JObject, // Optional - storage_options_obj: JObject, // Map + max_rows_per_file: JObject, // Optional + max_rows_per_group: JObject, // Optional + max_bytes_per_file: JObject, // Optional + mode: JObject, // Optional + enable_stable_row_ids: JObject, // Optional + data_storage_version: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional + storage_options_obj: JObject, // Map s3_credentials_refresh_offset_seconds_obj: JObject, // Optional initial_bases: JObject, target_bases: JObject, @@ -434,6 +441,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>( max_bytes_per_file, mode, enable_stable_row_ids, + enable_v2_manifest_paths, data_storage_version, storage_options_obj, JObject::null(), @@ -456,6 +464,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo mode: JObject, // Optional enable_stable_row_ids: JObject, // Optional data_storage_version: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional storage_options_obj: JObject, // Map storage_options_provider_obj: JObject, // Optional s3_credentials_refresh_offset_seconds_obj: JObject, // Optional @@ -474,6 +483,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo mode, enable_stable_row_ids, data_storage_version, + enable_v2_manifest_paths, storage_options_obj, storage_options_provider_obj, s3_credentials_refresh_offset_seconds_obj, @@ -494,6 +504,7 @@ fn inner_create_with_ffi_stream<'local>( mode: JObject, // Optional enable_stable_row_ids: JObject, // Optional data_storage_version: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional storage_options_obj: JObject, // Map storage_options_provider_obj: JObject, // Optional s3_credentials_refresh_offset_seconds_obj: JObject, // Optional @@ -511,6 +522,7 @@ fn inner_create_with_ffi_stream<'local>( mode, enable_stable_row_ids, data_storage_version, + enable_v2_manifest_paths, storage_options_obj, storage_options_provider_obj, s3_credentials_refresh_offset_seconds_obj, @@ -530,6 +542,7 @@ fn create_dataset<'local>( mode: JObject, enable_stable_row_ids: JObject, data_storage_version: JObject, + enable_v2_manifest_paths: JObject, storage_options_obj: JObject, storage_options_provider_obj: JObject, // Optional s3_credentials_refresh_offset_seconds_obj: JObject, @@ -547,6 +560,7 @@ fn create_dataset<'local>( &mode, &enable_stable_row_ids, &data_storage_version, + Some(&enable_v2_manifest_paths), &storage_options_obj, &storage_options_provider_obj, &s3_credentials_refresh_offset_seconds_obj, diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index c3caec67d82..2ca3fe4824f 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -254,6 +254,7 @@ fn create_fragment<'a>( &mode, &enable_stable_row_ids, &data_storage_version, + None, &storage_options_obj, &storage_options_provider_obj, &s3_credentials_refresh_offset_seconds_obj, diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index 9b077e7498c..b94c877a15e 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -11,7 +11,7 @@ use arrow::datatypes::Schema; use arrow_schema::ffi::FFI_ArrowSchema; use chrono::DateTime; use jni::objects::{JByteArray, JLongArray, JMap, JObject, JString, JValue, JValueGen}; -use jni::sys::jbyte; +use jni::sys::{jboolean, jbyte}; use jni::JNIEnv; use lance::dataset::transaction::{ DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder, @@ -727,10 +727,16 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCommitTransaction<'local>( mut env: JNIEnv<'local>, java_dataset: JObject, java_transaction: JObject, + detached_jbool: jboolean, ) -> JObject<'local> { ok_or_throw!( env, - inner_commit_transaction(&mut env, java_dataset, java_transaction) + inner_commit_transaction( + &mut env, + java_dataset, + java_transaction, + detached_jbool != 0, + ) ) } @@ -738,6 +744,7 @@ fn inner_commit_transaction<'local>( env: &mut JNIEnv<'local>, java_dataset: JObject, java_transaction: JObject, + detached: bool, ) -> Result> { let write_param_jobj = env .call_method(&java_transaction, "writeParams", "()Ljava/util/Map;", &[])? @@ -771,7 +778,7 @@ fn inner_commit_transaction<'local>( let new_blocking_ds = { let mut dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?; - dataset_guard.commit_transaction(transaction, store_params)? + dataset_guard.commit_transaction(transaction, store_params, detached)? }; new_blocking_ds.into_java(env) } diff --git a/java/lance-jni/src/utils.rs b/java/lance-jni/src/utils.rs index 5cb55c200e1..f46f994e84d 100644 --- a/java/lance-jni/src/utils.rs +++ b/java/lance-jni/src/utils.rs @@ -47,6 +47,7 @@ pub fn extract_write_params( mode: &JObject, enable_stable_row_ids: &JObject, data_storage_version: &JObject, + enable_v2_manifest_paths: Option<&JObject>, storage_options_obj: &JObject, storage_options_provider_obj: &JObject, // Optional s3_credentials_refresh_offset_seconds_obj: &JObject, // Optional @@ -75,6 +76,16 @@ pub fn extract_write_params( data_storage_version_val.as_str(), )?); } + + // Enable v2 manifest paths by default. + write_params.enable_v2_manifest_paths = + if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths { + env.get_boolean_opt(enable_v2_manifest_paths)? + .unwrap_or(true) + } else { + true + }; + let storage_options: HashMap = extract_storage_options(env, storage_options_obj)?; diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index eb66702ddf5..55e8a8b0983 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -141,6 +141,7 @@ public static Dataset create( params.getMode(), params.getEnableStableRowIds(), params.getDataStorageVersion(), + params.getEnableV2ManifestPaths(), params.getStorageOptions(), params.getS3CredentialsRefreshOffsetSeconds(), params.getInitialBases(), @@ -201,6 +202,7 @@ static Dataset create( params.getMode(), params.getEnableStableRowIds(), params.getDataStorageVersion(), + params.getEnableV2ManifestPaths(), params.getStorageOptions(), Optional.ofNullable(storageOptionsProvider), params.getS3CredentialsRefreshOffsetSeconds(), @@ -219,6 +221,7 @@ private static native Dataset createWithFfiSchema( Optional mode, Optional enableStableRowIds, Optional dataStorageVersion, + Optional enableV2ManifestPaths, Map storageOptions, Optional s3CredentialsRefreshOffsetSeconds, Optional> initialBases, @@ -233,6 +236,7 @@ private static native Dataset createWithFfiStream( Optional mode, Optional enableStableRowIds, Optional dataStorageVersion, + Optional enableV2ManifestPaths, Map storageOptions, Optional s3CredentialsRefreshOffsetSeconds, Optional> initialBases, @@ -247,6 +251,7 @@ private static native Dataset createWithFfiStreamAndProvider( Optional mode, Optional enableStableRowIds, Optional dataStorageVersion, + Optional enableV2ManifestPaths, Map storageOptions, Optional storageOptionsProvider, Optional s3CredentialsRefreshOffsetSeconds, @@ -450,9 +455,21 @@ public Transaction.Builder newTransactionBuilder() { * @return A new instance of {@link Dataset} linked to committed version. */ public Dataset commitTransaction(Transaction transaction) { + return commitTransaction(transaction, false); + } + + /** + * Commit a single transaction and return a new Dataset with the new version. Original dataset + * version will not be refreshed. + * + * @param transaction The transaction to commit + * @param detached If true, the commit will not be part of the main dataset lineage. + * @return A new instance of {@link Dataset} linked to committed version. + */ + public Dataset commitTransaction(Transaction transaction, boolean detached) { Preconditions.checkNotNull(transaction); try { - Dataset dataset = nativeCommitTransaction(transaction); + Dataset dataset = nativeCommitTransaction(transaction, detached); if (selfManagedAllocator) { dataset.allocator = new RootAllocator(Long.MAX_VALUE); } else { @@ -464,7 +481,7 @@ public Dataset commitTransaction(Transaction transaction) { } } - private native Dataset nativeCommitTransaction(Transaction transaction); + private native Dataset nativeCommitTransaction(Transaction transaction, boolean detached); /** * Drop a Dataset. diff --git a/java/src/main/java/org/lance/WriteParams.java b/java/src/main/java/org/lance/WriteParams.java index f8b6304743d..2a4853383dd 100644 --- a/java/src/main/java/org/lance/WriteParams.java +++ b/java/src/main/java/org/lance/WriteParams.java @@ -56,6 +56,7 @@ public String getVersionString() { private final Optional mode; private final Optional enableStableRowIds; private final Optional dataStorageVersion; + private final Optional enableV2ManifestPaths; private Map storageOptions = new HashMap<>(); private final Optional s3CredentialsRefreshOffsetSeconds; private final Optional> initialBases; @@ -68,6 +69,7 @@ private WriteParams( Optional mode, Optional enableStableRowIds, Optional dataStorageVersion, + Optional enableV2ManifestPaths, Map storageOptions, Optional s3CredentialsRefreshOffsetSeconds, Optional> initialBases, @@ -78,6 +80,7 @@ private WriteParams( this.mode = mode; this.enableStableRowIds = enableStableRowIds; this.dataStorageVersion = dataStorageVersion; + this.enableV2ManifestPaths = enableV2ManifestPaths; this.storageOptions = storageOptions; this.s3CredentialsRefreshOffsetSeconds = s3CredentialsRefreshOffsetSeconds; this.initialBases = initialBases; @@ -113,6 +116,10 @@ public Optional getDataStorageVersion() { return dataStorageVersion.map(LanceFileVersion::getVersionString); } + public Optional getEnableV2ManifestPaths() { + return enableV2ManifestPaths; + } + public Map getStorageOptions() { return storageOptions; } @@ -148,6 +155,7 @@ public static class Builder { private Optional mode = Optional.empty(); private Optional enableStableRowIds = Optional.empty(); private Optional dataStorageVersion = Optional.empty(); + private Optional enableV2ManifestPaths; private Map storageOptions = new HashMap<>(); private Optional s3CredentialsRefreshOffsetSeconds = Optional.empty(); private Optional> initialBases = Optional.empty(); @@ -188,6 +196,11 @@ public Builder withEnableStableRowIds(boolean enableStableRowIds) { return this; } + public Builder withEnableV2ManifestPaths(boolean enableV2ManifestPaths) { + this.enableV2ManifestPaths = Optional.of(enableV2ManifestPaths); + return this; + } + public Builder withS3CredentialsRefreshOffsetSeconds(long s3CredentialsRefreshOffsetSeconds) { this.s3CredentialsRefreshOffsetSeconds = Optional.of(s3CredentialsRefreshOffsetSeconds); return this; @@ -211,6 +224,7 @@ public WriteParams build() { mode, enableStableRowIds, dataStorageVersion, + enableV2ManifestPaths, storageOptions, s3CredentialsRefreshOffsetSeconds, initialBases, diff --git a/java/src/test/java/org/lance/DatasetTest.java b/java/src/test/java/org/lance/DatasetTest.java index b29b3dcf39f..7c1e33518ee 100644 --- a/java/src/test/java/org/lance/DatasetTest.java +++ b/java/src/test/java/org/lance/DatasetTest.java @@ -67,6 +67,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1121,6 +1122,70 @@ void testReadTransaction(@TempDir Path tempDir) { } } + @Test + void testCommitTransactionDetachedTrue(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("testCommitTransactionDetachedTrue").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset suite = new TestUtils.SimpleTestDataset(allocator, datasetPath); + try (Dataset base = suite.createEmptyDataset(true)) { + assertEquals(1, base.version()); + assertEquals(1, base.latestVersion()); + assertEquals(0, base.countRows()); + long baseVersion = base.version(); + long baseLatestVersion = base.latestVersion(); + long baseRowCount = base.countRows(); + FragmentMetadata fragment = suite.createNewFragment(5); + Append append = Append.builder().fragments(Collections.singletonList(fragment)).build(); + Transaction transaction = base.newTransactionBuilder().operation(append).build(); + try (Dataset committed = base.commitTransaction(transaction, true)) { + // Original dataset is not refreshed to the new version. + assertEquals(baseVersion, base.version()); + assertEquals(baseRowCount, base.countRows()); + + // Latest version should not change. + assertEquals(base.latestVersion(), baseLatestVersion); + + // Committed dataset has a detached version. + assertNotEquals(baseVersion + 1, committed.version()); + assertNotEquals(committed.version(), committed.latestVersion()); + assertEquals(baseRowCount + 5, committed.countRows()); + } + } + } + } + + @Test + void testCommitTransactionDetachedTrueOnV1ManifestThrowsUnsupported(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("commitTransactionDetachedTrueOnV1").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset suite = new TestUtils.SimpleTestDataset(allocator, datasetPath); + try (Dataset dataset = suite.createEmptyDataset()) { + List versionsBefore = dataset.listVersions(); + long versionIdBefore = versionsBefore.get(0).getId(); + + FragmentMetadata fragment = suite.createNewFragment(3); + Append append = Append.builder().fragments(Collections.singletonList(fragment)).build(); + Transaction transaction = dataset.newTransactionBuilder().operation(append).build(); + UnsupportedOperationException ex = + assertThrows( + UnsupportedOperationException.class, + () -> dataset.commitTransaction(transaction, true)); + + // Error should indicate detached commits are not supported on v1 manifests. + assertNotNull(ex.getMessage()); + assertTrue(ex.getMessage().toLowerCase().contains("detached")); + + // Dataset state should remain unchanged after the failed detached commit. + assertEquals(1, dataset.version()); + assertEquals(1, dataset.latestVersion()); + assertEquals(0, dataset.countRows()); + List versionsAfter = dataset.listVersions(); + assertEquals(1, versionsAfter.size()); + assertEquals(versionIdBefore, versionsAfter.get(0).getId()); + } + } + } + @Test void testEnableStableRowIds(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("enable_stable_row_ids").toString(); diff --git a/java/src/test/java/org/lance/TestUtils.java b/java/src/test/java/org/lance/TestUtils.java index b17848ef6fb..467d806ff79 100644 --- a/java/src/test/java/org/lance/TestUtils.java +++ b/java/src/test/java/org/lance/TestUtils.java @@ -78,8 +78,16 @@ public TestDataset(BufferAllocator allocator, String datasetPath) { public abstract Schema getSchema(); public Dataset createEmptyDataset() { + return createEmptyDataset(false); + } + + public Dataset createEmptyDataset(boolean enableV2Manifest) { Dataset dataset = - Dataset.create(allocator, datasetPath, getSchema(), new WriteParams.Builder().build()); + Dataset.create( + allocator, + datasetPath, + getSchema(), + new WriteParams.Builder().withEnableV2ManifestPaths(enableV2Manifest).build()); assertEquals(0, dataset.countRows()); assertEquals(getSchema(), dataset.getSchema()); List fragments = dataset.getFragments();