From dfcc805511fdf45d61ed0dd6611c891dc01ca6ad Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 29 Feb 2024 14:18:54 +0800 Subject: [PATCH] fix(executor): queries pipeline executor schedule incorrectly (#14787) * fix(executor): queries pipeline executor schedule incorrectly * should not affect current executor --- .../pipelines/executor/executor_settings.rs | 2 +- .../executor/executor_worker_context.rs | 60 ++++++++++++++++++- .../executor/queries_pipeline_executor.rs | 2 +- .../executor/query_pipeline_executor.rs | 2 +- src/query/settings/src/settings_default.rs | 2 +- .../settings/src/settings_getter_setter.rs | 4 +- 6 files changed, 64 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/pipelines/executor/executor_settings.rs b/src/query/service/src/pipelines/executor/executor_settings.rs index d78d391f2d790..9a8454d189873 100644 --- a/src/query/service/src/pipelines/executor/executor_settings.rs +++ b/src/query/service/src/pipelines/executor/executor_settings.rs @@ -29,7 +29,7 @@ impl ExecutorSettings { pub fn try_create(settings: &Settings, query_id: String) -> Result { let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?; Ok(ExecutorSettings { - enable_new_executor: settings.get_enable_experimental_new_executor()?, + enable_new_executor: settings.get_enable_experimental_queries_executor()?, query_id: Arc::new(query_id), max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds), }) diff --git a/src/query/service/src/pipelines/executor/executor_worker_context.rs b/src/query/service/src/pipelines/executor/executor_worker_context.rs index 9abe49e822776..09b9b6c9331f9 100644 --- a/src/query/service/src/pipelines/executor/executor_worker_context.rs +++ b/src/query/service/src/pipelines/executor/executor_worker_context.rs @@ -20,11 +20,18 @@ use std::time::Instant; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use minitrace::future::FutureExt; +use minitrace::Span; use petgraph::prelude::NodeIndex; use crate::pipelines::executor::executor_graph::ProcessorWrapper; +use crate::pipelines::executor::processor_async_task::ExecutorTasksQueue; +use crate::pipelines::executor::ProcessorAsyncTask; +use crate::pipelines::executor::QueriesExecutorTasksQueue; +use crate::pipelines::executor::QueriesPipelineExecutor; use crate::pipelines::executor::RunningGraph; use crate::pipelines::executor::WorkersCondvar; @@ -96,15 +103,30 @@ impl ExecutorWorkerContext { } /// # Safety - pub unsafe fn execute_task(&mut self) -> Result)>> { + pub unsafe fn execute_task( + &mut self, + executor: Option<&Arc>, + ) -> Result)>> { match std::mem::replace(&mut self.task, ExecutorTask::None) { ExecutorTask::None => Err(ErrorCode::Internal("Execute none task.")), ExecutorTask::Sync(processor) => self.execute_sync_task(processor), + ExecutorTask::Async(processor) => { + if let Some(executor) = executor { + self.execute_async_task( + processor, + executor, + executor.global_tasks_queue.clone(), + ) + } else { + Err(ErrorCode::Internal( + "Async task should only be executed on queries executor", + )) + } + } ExecutorTask::AsyncCompleted(task) => match task.res { Ok(_) => Ok(Some((task.id, task.graph))), Err(cause) => Err(cause), }, - ExecutorTask::Async(_) => unreachable!("used for new executor"), } } @@ -125,6 +147,40 @@ impl ExecutorWorkerContext { Ok(Some((proc.processor.id(), proc.graph))) } + pub fn execute_async_task( + &mut self, + proc: ProcessorWrapper, + executor: &Arc, + global_queue: Arc, + ) -> Result)>> { + unsafe { + let workers_condvar = self.workers_condvar.clone(); + workers_condvar.inc_active_async_worker(); + let query_id = self.query_id.clone(); + let wakeup_worker_id = self.worker_id; + let process_future = proc.processor.async_process(); + let node_profile = executor.graph.get_node_profile(proc.processor.id()).clone(); + let graph = proc.graph; + executor.async_runtime.spawn( + query_id.as_ref().clone(), + ProcessorAsyncTask::create( + query_id, + wakeup_worker_id, + proc.processor.clone(), + Arc::new(ExecutorTasksQueue::QueriesExecutorTasksQueue(global_queue)), + workers_condvar, + node_profile, + graph, + process_future, + ) + .in_span(Span::enter_with_local_parent(std::any::type_name::< + ProcessorAsyncTask, + >())), + ); + } + Ok(None) + } + pub fn get_workers_condvar(&self) -> &Arc { &self.workers_condvar } diff --git a/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs b/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs index 8f6cdfa8bab56..7d0d05049b61a 100644 --- a/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs @@ -413,7 +413,7 @@ impl QueriesPipelineExecutor { } while !self.global_tasks_queue.is_finished() && context.has_task() { - if let Some((executed_pid, graph)) = context.execute_task()? { + if let Some((executed_pid, graph)) = context.execute_task(Some(self))? { // Not scheduled graph if pipeline is finished. if !self.global_tasks_queue.is_finished() { // We immediately schedule the processor again. diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 2975698da2ca9..05053bef9efdb 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -401,7 +401,7 @@ impl QueryPipelineExecutor { } while !self.global_tasks_queue.is_finished() && context.has_task() { - if let Some((executed_pid, graph)) = context.execute_task()? { + if let Some((executed_pid, graph)) = context.execute_task(None)? { // Not scheduled graph if pipeline is finished. if !self.global_tasks_queue.is_finished() { // We immediately schedule the processor again. diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 34a65cb03cedf..1984361f18510 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -691,7 +691,7 @@ impl DefaultSettings { mode:SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)) }), - ("enable_experimental_new_executor", DefaultSettingValue { + ("enable_experimental_queries_executor", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Enables experimental new executor", mode: SettingMode::Both, diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 783a78486c7ac..b3a6a5fd63c64 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -615,7 +615,7 @@ impl Settings { self.try_set_u64("enable_geo_create_table", u64::from(val)) } - pub fn get_enable_experimental_new_executor(&self) -> Result { - Ok(self.try_get_u64("enable_experimental_new_executor")? == 1) + pub fn get_enable_experimental_queries_executor(&self) -> Result { + Ok(self.try_get_u64("enable_experimental_queries_executor")? == 1) } }