Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 10 additions & 31 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ use crate::{
RT,
};

#[derive(Debug, Clone)]
pub(crate) struct FragmentMergeResult {
fragment: Fragment,
schema: Schema,
}

#[derive(Debug, Clone)]
pub(crate) struct FragmentUpdateResult {
updated_fragment: Fragment,
Expand Down Expand Up @@ -332,6 +326,7 @@ pub extern "system" fn Java_org_lance_Fragment_nativeMergeColumns<'a>(
arrow_array_stream_addr: jlong, // memoryAddress of ArrowStream
left_on: JString, // left column name to join on
right_on: JString, // right column name to join on
arrow_schema_addr: jlong, // memoryAddress of arrow Schema
) -> JObject<'a> {
ok_or_throw_with_return!(
env,
Expand All @@ -341,7 +336,8 @@ pub extern "system" fn Java_org_lance_Fragment_nativeMergeColumns<'a>(
fragment_id,
arrow_array_stream_addr,
left_on,
right_on
right_on,
arrow_schema_addr
),
JObject::null()
)
Expand All @@ -355,6 +351,7 @@ fn inner_merge_column<'local>(
arrow_array_stream_addr: jlong,
left_on: JString,
right_on: JString,
arrow_schema_addr: jlong,
) -> Result<JObject<'local>> {
let (fragment_opt, max_field_id) = {
let dataset =
Expand All @@ -378,13 +375,13 @@ fn inner_merge_column<'local>(
let left_on_str: String = left_on.extract(env)?;
let right_on_str: String = right_on.extract(env)?;

let (new_frag, new_schema) =
let (new_frag, new_schema): (Fragment, Schema) =
RT.block_on(fragment.merge_columns(reader, &left_on_str, &right_on_str, max_field_id))?;
let result = FragmentMergeResult {
fragment: new_frag,
schema: new_schema,
};
result.into_java(env)

let ffi_schema = FFI_ArrowSchema::try_from(&arrow_schema::Schema::from(&new_schema))?;
unsafe { std::ptr::write_unaligned(arrow_schema_addr as *mut FFI_ArrowSchema, ffi_schema) }

new_frag.into_java(env)
}

#[no_mangle]
Expand Down Expand Up @@ -456,27 +453,9 @@ const FRAGMENT_METADATA_CLASS: &str = "org/lance/FragmentMetadata";
const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str ="(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;)V";
const ROW_ID_META_CLASS: &str = "org/lance/fragment/RowIdMeta";
const ROW_ID_META_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;)V";
const FRAGMENT_MERGE_RESULT_CLASS: &str = "org/lance/fragment/FragmentMergeResult";
const FRAGMENT_MERGE_RESULT_CONSTRUCTOR_SIG: &str =
"(Lorg/lance/FragmentMetadata;Lorg/lance/schema/LanceSchema;)V";
const FRAGMENT_UPDATE_RESULT_CLASS: &str = "org/lance/fragment/FragmentUpdateResult";
const FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;[J)V";

impl IntoJava for &FragmentMergeResult {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
let java_fragment_meta_data = self.fragment.into_java(env)?;
let java_lance_schema = self.schema.clone().into_java(env)?;
Ok(env.new_object(
FRAGMENT_MERGE_RESULT_CLASS,
FRAGMENT_MERGE_RESULT_CONSTRUCTOR_SIG,
&[
JValueGen::Object(&java_fragment_meta_data),
JValueGen::Object(&java_lance_schema),
],
)?)
}
}

impl IntoJava for &FragmentUpdateResult {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
let java_updated_fragment = self.updated_fragment.into_java(env)?;
Expand Down
21 changes: 17 additions & 4 deletions java/src/main/java/org/lance/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -150,16 +151,28 @@ public int countRows() {
* @return the fragment metadata and new schema.
*/
public FragmentMergeResult mergeColumns(ArrowArrayStream stream, String leftOn, String rightOn) {
return nativeMergeColumns(
dataset, fragmentMetadata.getId(), stream.memoryAddress(), leftOn, rightOn);
try (ArrowSchema ffiArrowSchema = ArrowSchema.allocateNew(dataset.allocator())) {
FragmentMetadata metadata =
nativeMergeColumns(
dataset,
fragmentMetadata.getId(),
stream.memoryAddress(),
leftOn,
rightOn,
ffiArrowSchema.memoryAddress());

Schema schema = Data.importSchema(dataset.allocator(), ffiArrowSchema, null);
return new FragmentMergeResult(metadata, schema);
}
}

private native FragmentMergeResult nativeMergeColumns(
private native FragmentMetadata nativeMergeColumns(
Dataset dataset,
long fragmentId,
long arrowStreamMemoryAddress,
String leftOn,
String rightOn);
String rightOn,
long schemaMemoryAddress);

/**
* Update existed columns into this Fragment, will return the new fragment with the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,25 @@
package org.lance.fragment;

import org.lance.FragmentMetadata;
import org.lance.schema.LanceSchema;

import com.google.common.base.MoreObjects;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.vector.types.pojo.Schema;

/**
* Result of {@link org.lance.Fragment#mergeColumns(ArrowArrayStream, String, String)
* Fragment.mergeColumns()}.
*/
public class FragmentMergeResult {
private final FragmentMetadata fragmentMetadata;
private final LanceSchema schema;
private final Schema schema;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this.

I wonder if we should fix the LanceSchema converting issue since the problem occurs there. Is there any blocking pointing that we could not use LanceSchema here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for comments.

I think LanceSchema should be fixed. It is another thing, we could submit another PR to fix it.

For Fragment.mergeColumns, it is sensible to return an Arrow schema, for three reasons:

  1. The conversion from Lance schema to Arrow schema is already correctly implemented in Rust, so we can just reuse it like the Dataset.getSchema.
  2. The Merge transaction itself expects an Arrow schema.
  3. Dataset's public methods use Arrow for data operation (read/write) . So Arrow schema can keep consistency with other methods.


public FragmentMergeResult(FragmentMetadata fragmentMetadata, LanceSchema schema) {
public FragmentMergeResult(FragmentMetadata fragmentMetadata, Schema schema) {
this.fragmentMetadata = fragmentMetadata;
this.schema = schema;
}

public LanceSchema getSchema() {
public Schema getSchema() {
return schema;
}

Expand Down
2 changes: 1 addition & 1 deletion java/src/test/java/org/lance/FragmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ void testMergeColumns(@TempDir Path tempDir) throws Exception {
.operation(
Merge.builder()
.fragments(Collections.singletonList(mergeResult.getFragmentMetadata()))
.schema(mergeResult.getSchema().asArrowSchema())
.schema(mergeResult.getSchema())
.build())
.readVersion(dataset.version())
.build();
Expand Down