Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use jni::{
sys::{jbyteArray, jint, jlong, jlongArray},
JNIEnv,
};
use std::time::Instant;
use std::{collections::HashMap, sync::Arc, task::Poll};

use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
Expand Down Expand Up @@ -81,6 +82,8 @@ struct ExecutionContext {
pub runtime: Runtime,
/// Native metrics
pub metrics: Arc<GlobalRef>,
/// The time it took to create the native plan and configure the context
pub plan_creation_time: usize,
/// DataFusion SessionContext
pub session_ctx: Arc<SessionContext>,
/// Whether to enable additional debugging checks & messages
Expand Down Expand Up @@ -109,6 +112,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
// Init JVM classes
JVMClasses::init(&mut env);

let start = Instant::now();

let array = unsafe { JPrimitiveArray::from_raw(serialized_query) };
let bytes = env.convert_byte_array(array)?;

Expand Down Expand Up @@ -167,6 +172,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
// dictionaries will be dropped as well.
let session = prepare_datafusion_session_context(&configs, task_memory_manager)?;

let plan_creation_time = start.elapsed().as_millis() as usize;

let exec_context = Box::new(ExecutionContext {
id,
spark_plan,
Expand All @@ -177,6 +184,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
conf: configs,
runtime,
metrics,
plan_creation_time,
session_ctx: Arc::new(session),
debug_native,
explain_native,
Expand Down Expand Up @@ -335,20 +343,26 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
// Because we don't know if input arrays are dictionary-encoded when we create
// query plan, we need to defer stream initialization to first time execution.
if exec_context.root_op.is_none() {
let start = Instant::now();
let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx))
.with_exec_id(exec_context_id);
let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
)?;
let physical_plan_time = start.elapsed().as_millis() as usize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering should we info! the planner time?


exec_context.plan_creation_time += physical_plan_time;
exec_context.root_op = Some(Arc::clone(&root_op));
exec_context.scans = scans;

if exec_context.explain_native {
let formatted_plan_str =
DisplayableExecutionPlan::new(root_op.as_ref()).indent(true);
info!("Comet native query plan:\n {formatted_plan_str:}");
info!(
"Comet native query plan (plan creation took {} ms):\n {formatted_plan_str:}",
exec_context.plan_creation_time
);
}

let task_ctx = exec_context.session_ctx.task_ctx();
Expand Down Expand Up @@ -388,7 +402,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
if let Some(plan) = &exec_context.root_op {
let formatted_plan_str =
DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent(true);
info!("Comet native query plan with metrics:\n{formatted_plan_str:}");
info!("Comet native query plan with metrics (plan creation took {} ms):\n{formatted_plan_str:}", exec_context.plan_creation_time);
}
}

Expand Down
Loading