Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,12 @@ impl BlockingDataset {
&mut self,
transaction: Transaction,
store_params: ObjectStoreParams,
detached: bool,
) -> Result<Self> {
let new_dataset = RT.block_on(
CommitBuilder::new(Arc::new(self.clone().inner))
.with_store_params(store_params)
.with_detached(detached)
.execute(transaction),
)?;
Ok(BlockingDataset { inner: new_dataset })
Expand Down
13 changes: 10 additions & 3 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use arrow::datatypes::Schema;
use arrow_schema::ffi::FFI_ArrowSchema;
use chrono::DateTime;
use jni::objects::{JByteArray, JLongArray, JMap, JObject, JString, JValue, JValueGen};
use jni::sys::jbyte;
use jni::sys::{jboolean, jbyte};
use jni::JNIEnv;
use lance::dataset::transaction::{
DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder,
Expand Down Expand Up @@ -727,17 +727,24 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCommitTransaction<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
java_transaction: JObject,
detached_jbool: jboolean,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_commit_transaction(&mut env, java_dataset, java_transaction)
inner_commit_transaction(
&mut env,
java_dataset,
java_transaction,
detached_jbool != 0,
)
)
}

fn inner_commit_transaction<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
java_transaction: JObject,
detached: bool,
) -> Result<JObject<'local>> {
let write_param_jobj = env
.call_method(&java_transaction, "writeParams", "()Ljava/util/Map;", &[])?
Expand Down Expand Up @@ -771,7 +778,7 @@ fn inner_commit_transaction<'local>(
let new_blocking_ds = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?;
dataset_guard.commit_transaction(transaction, store_params)?
dataset_guard.commit_transaction(transaction, store_params, detached)?
};
new_blocking_ds.into_java(env)
}
Expand Down
16 changes: 14 additions & 2 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,21 @@ public Transaction.Builder newTransactionBuilder() {
* @return A new instance of {@link Dataset} linked to committed version.
*/
public Dataset commitTransaction(Transaction transaction) {
return commitTransaction(transaction, false);
}

/**
* Commit a single transaction and return a new Dataset with the new version. Original dataset
* version will not be refreshed.
*
* @param transaction The transaction to commit
* @param detached If true, the commit will not be part of the main dataset lineage.
* @return A new instance of {@link Dataset} linked to committed version.
*/
public Dataset commitTransaction(Transaction transaction, boolean detached) {
Preconditions.checkNotNull(transaction);
try {
Dataset dataset = nativeCommitTransaction(transaction);
Dataset dataset = nativeCommitTransaction(transaction, detached);
if (selfManagedAllocator) {
dataset.allocator = new RootAllocator(Long.MAX_VALUE);
} else {
Expand All @@ -464,7 +476,7 @@ public Dataset commitTransaction(Transaction transaction) {
}
}

private native Dataset nativeCommitTransaction(Transaction transaction);
private native Dataset nativeCommitTransaction(Transaction transaction, boolean detached);

/**
* Drop a Dataset.
Expand Down