Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
}

let task_ctx = exec_context.session_ctx.task_ctx();
// Each Comet native execution corresponds to a single Spark partition,
// so we should always execute partition 0.
let stream = exec_context
.root_op
.as_ref()
.unwrap()
.native_plan
.execute(partition as usize, task_ctx)?;
.execute(0, task_ctx)?;
exec_context.stream = Some(stream);
} else {
// Pull input batches
Expand Down
16 changes: 5 additions & 11 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1317,17 +1317,11 @@ impl PhysicalPlanner {
&object_store_options,
)?;

// Generate file groups
let mut file_groups: Vec<Vec<PartitionedFile>> =
Vec::with_capacity(partition_count);
scan.file_partitions.iter().try_for_each(|partition| {
let files = self.get_partitioned_files(partition)?;
file_groups.push(files);
Ok::<(), ExecutionError>(())
})?;

// TODO: I think we can remove partition_count in the future, but leave for testing.
assert_eq!(file_groups.len(), partition_count);
// Comet serializes all partitions' PartitionedFiles, but we only want to read this
// Spark partition's PartitionedFiles
let files =
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
let partition_fields: Vec<Field> = partition_schema
.fields()
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,6 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
}

test("join") {
// TODO enable native_datafusion tests
// https://github.com/apache/datafusion-comet/issues/2660
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)

val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
df.createOrReplaceTempView("t2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ class CometJoinSuite extends CometTestBase {
}

test("Broadcast HashJoin without join filter") {
// TODO enable native_datafusion tests
// https://github.com/apache/datafusion-comet/issues/2660
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)

withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> "100",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
Expand Down Expand Up @@ -105,10 +101,6 @@ class CometJoinSuite extends CometTestBase {
}

test("Broadcast HashJoin with join filter") {
// TODO enable native_datafusion tests
// https://github.com/apache/datafusion-comet/issues/2660
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)

withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> "100",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
Expand Down
Loading