diff --git a/.gitignore b/.gitignore index 50983b4310..1e47926b27 100644 --- a/.gitignore +++ b/.gitignore @@ -108,3 +108,4 @@ logs/ # Claude Code guidance file (local only) CLAUDE.md +.claude/ \ No newline at end of file diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index 9283a6d603..f444b52c2a 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -34,7 +34,7 @@ required-features = ["build-binary"] [features] build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] -default = ["build-binary", "substrait"] +default = ["build-binary", "rest-api"] # job info can cache stage plans, in some cases where # task plans can be re-computed, cache behavior may need to be disabled. disable-stage-plan-cache = [] diff --git a/ballista/scheduler/src/api/handlers.rs b/ballista/scheduler/src/api/handlers.rs index de1546c590..afd3386222 100644 --- a/ballista/scheduler/src/api/handlers.rs +++ b/ballista/scheduler/src/api/handlers.rs @@ -56,6 +56,9 @@ pub struct JobResponse { pub num_stages: usize, pub completed_stages: usize, pub percent_complete: u8, + pub logical_plan: Option, + pub physical_plan: Option, + pub stage_plan: Option, } #[derive(Debug, serde::Serialize)] @@ -172,6 +175,9 @@ pub async fn get_jobs< num_stages: job.num_stages, completed_stages: job.completed_stages, percent_complete, + logical_plan: None, + physical_plan: None, + stage_plan: None, } }) .collect(); @@ -179,6 +185,75 @@ pub async fn get_jobs< Ok(Json(jobs)) } +pub async fn get_job< + T: AsLogicalPlan + Clone + Send + Sync + 'static, + U: AsExecutionPlan + Send + Sync + 'static, +>( + State(data_server): State>>, + Path(job_id): Path, +) -> Result { + let graph = data_server + .state + .task_manager + .get_job_execution_graph(&job_id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + let stage_plan = format!("{:?}", graph); + let job = graph.as_ref(); + let (plain_status, job_status) = match &job.status().status { + Some(Status::Queued(_)) => ("Queued".to_string(), "Queued".to_string()), + Some(Status::Running(_)) => ("Running".to_string(), "Running".to_string()), + Some(Status::Failed(error)) => { + ("Failed".to_string(), format!("Failed: {}", error.error)) + } + Some(Status::Successful(completed)) => { + let num_rows = completed + .partition_location + .iter() + .map(|p| p.partition_stats.as_ref().map(|s| s.num_rows).unwrap_or(0)) + .sum::(); + let num_rows_term = if num_rows == 1 { "row" } else { "rows" }; + let num_partitions = completed.partition_location.len(); + let num_partitions_term = if num_partitions == 1 { + "partition" + } else { + "partitions" + }; + ( + "Completed".to_string(), + format!( + "Completed. Produced {} {} containing {} {}. Elapsed time: {} ms.", + num_partitions, + num_partitions_term, + num_rows, + num_rows_term, + job.end_time() - job.start_time() + ), + ) + } + _ => ("Invalid".to_string(), "Invalid State".to_string()), + }; + + let num_stages = job.stage_count(); + let completed_stages = job.completed_stages(); + let percent_complete = + ((completed_stages as f32 / num_stages as f32) * 100_f32) as u8; + + Ok(Json(JobResponse { + job_id: job.job_id().to_string(), + job_name: job.job_name().to_string(), + job_status, + status: plain_status, + num_stages, + completed_stages, + percent_complete, + logical_plan: job.logical_plan().map(str::to_owned), + physical_plan: job.physical_plan().map(str::to_owned), + stage_plan: Some(stage_plan), + })) +} + pub async fn cancel_job< T: AsLogicalPlan + Clone + Send + Sync + 'static, U: AsExecutionPlan + Send + Sync + 'static, diff --git a/ballista/scheduler/src/api/mod.rs b/ballista/scheduler/src/api/mod.rs index 733a60f8c9..5e29154b3b 100644 --- a/ballista/scheduler/src/api/mod.rs +++ b/ballista/scheduler/src/api/mod.rs @@ -31,6 +31,7 @@ pub fn get_routes< .route("/api/executors", get(handlers::get_executors::)) .route("/api/jobs", get(handlers::get_jobs::)) .route("/api/job/{job_id}", patch(handlers::cancel_job::)) + .route("/api/job/{job_id}", get(handlers::get_job::)) .route( "/api/job/{job_id}/stages", get(handlers::get_query_stages::), diff --git a/ballista/scheduler/src/state/aqe/mod.rs b/ballista/scheduler/src/state/aqe/mod.rs index 718902bc33..32725097a3 100644 --- a/ballista/scheduler/src/state/aqe/mod.rs +++ b/ballista/scheduler/src/state/aqe/mod.rs @@ -109,6 +109,10 @@ pub(crate) struct AdaptiveExecutionGraph { failed_stage_attempts: HashMap>, /// Session config for this job session_config: Arc, + /// Logical plan as a human-readable string, captured at submission time. + logical_plan: Option, + /// Physical plan as a human-readable string, captured at submission time. + physical_plan: Option, } impl AdaptiveExecutionGraph { @@ -125,6 +129,8 @@ impl AdaptiveExecutionGraph { plan: Arc, queued_at: u64, session_config: Arc, + logical_plan: Option, + physical_plan: Option, ) -> ballista_core::error::Result { let mut planner = AdaptivePlanner::try_new(&session_config, plan, job_name.to_owned())?; @@ -178,6 +184,8 @@ impl AdaptiveExecutionGraph { task_id_gen: 0, failed_stage_attempts: HashMap::new(), session_config, + logical_plan, + physical_plan, }) } } @@ -503,6 +511,14 @@ impl ExecutionGraph for AdaptiveExecutionGraph { &self.status } + fn logical_plan(&self) -> Option<&str> { + self.logical_plan.as_deref() + } + + fn physical_plan(&self) -> Option<&str> { + self.physical_plan.as_deref() + } + fn start_time(&self) -> u64 { self.start_time } diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 27a3254794..c443918ef6 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -112,6 +112,12 @@ pub trait ExecutionGraph: Debug { /// Returns the current job status. fn status(&self) -> &JobStatus; + /// Returns the logical plan as a string, if captured at submission time. + fn logical_plan(&self) -> Option<&str>; + + /// Returns the physical plan as a string, if captured at submission time. + fn physical_plan(&self) -> Option<&str>; + /// Returns the timestamp when this job started execution. fn start_time(&self) -> u64; @@ -263,6 +269,10 @@ pub struct StaticExecutionGraph { failed_stage_attempts: HashMap>, /// Session config for this job session_config: Arc, + /// Logical plan as a human-readable string, captured at submission time. + logical_plan: Option, + /// Physical plan as a human-readable string, captured at submission time. + physical_plan: Option, } /// Information about a currently running task. @@ -298,6 +308,8 @@ impl StaticExecutionGraph { queued_at: u64, session_config: Arc, planner: &mut dyn DistributedPlanner, + logical_plan: Option, + physical_plan: Option, ) -> Result { let shuffle_stages = planner.plan_query_stages(job_id, plan, session_config.options())?; @@ -330,6 +342,8 @@ impl StaticExecutionGraph { task_id_gen: 0, failed_stage_attempts: HashMap::new(), session_config, + logical_plan, + physical_plan, }) } @@ -635,6 +649,14 @@ impl ExecutionGraph for StaticExecutionGraph { &self.status } + fn logical_plan(&self) -> Option<&str> { + self.logical_plan.as_deref() + } + + fn physical_plan(&self) -> Option<&str> { + self.physical_plan.as_deref() + } + fn start_time(&self) -> u64 { self.start_time } diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs b/ballista/scheduler/src/state/execution_graph_dot.rs index e08fd663f9..28f29711a4 100644 --- a/ballista/scheduler/src/state/execution_graph_dot.rs +++ b/ballista/scheduler/src/state/execution_graph_dot.rs @@ -612,6 +612,8 @@ filter_expr="] 0, Arc::new(SessionConfig::new_with_ballista()), &mut planner, + None, + None, ) } @@ -648,6 +650,8 @@ filter_expr="] 0, Arc::new(SessionConfig::new_with_ballista()), &mut planner, + None, + None, ) } } diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 82fa60c97b..6634943363 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -443,11 +443,16 @@ impl SchedulerState| { if node.output_partitioning().partition_count() == 0 { @@ -490,6 +495,8 @@ impl SchedulerState TaskManager queued_at: u64, session_config: Arc, subscriber: Option, + logical_plan: Option, + physical_plan: Option, ) -> Result<()> { let mut planner = DefaultDistributedPlanner::new(); @@ -294,6 +296,8 @@ impl TaskManager plan, queued_at, session_config, + logical_plan, + physical_plan, )?) as ExecutionGraphBox } else { debug!("Using static query planner for job planning"); @@ -306,6 +310,8 @@ impl TaskManager queued_at, session_config, &mut planner, + logical_plan, + physical_plan, )?) as ExecutionGraphBox }; diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs index 1d4f3633f9..80837a0fae 100644 --- a/ballista/scheduler/src/test_utils.rs +++ b/ballista/scheduler/src/test_utils.rs @@ -920,6 +920,8 @@ pub async fn test_aggregation_plan_with_job_id( 0, Arc::new(SessionConfig::new_with_ballista()), &mut planner, + None, + None, ) .unwrap() } @@ -968,6 +970,8 @@ pub async fn test_two_aggregations_plan(partition: usize) -> StaticExecutionGrap 0, Arc::new(SessionConfig::new_with_ballista()), &mut planner, + None, + None, ) .unwrap() } @@ -1008,6 +1012,8 @@ pub async fn test_coalesce_plan(partition: usize) -> StaticExecutionGraph { 0, Arc::new(SessionConfig::new_with_ballista()), &mut planner, + None, + None, ) .unwrap() } @@ -1068,6 +1074,8 @@ pub async fn test_join_plan(partition: usize) -> StaticExecutionGraph { 0, Arc::new(SessionConfig::new_with_ballista()), &mut planner, + None, + None, ) .unwrap(); @@ -1110,6 +1118,8 @@ pub async fn test_union_all_plan(partition: usize) -> StaticExecutionGraph { 0, Arc::new(SessionConfig::new_with_ballista()), &mut planner, + None, + None, ) .unwrap(); @@ -1152,6 +1162,8 @@ pub async fn test_union_plan(partition: usize) -> StaticExecutionGraph { 0, Arc::new(SessionConfig::new_with_ballista()), &mut planner, + None, + None, ) .unwrap();