diff --git a/src/query/service/src/api/rpc/flight_client.rs b/src/query/service/src/api/rpc/flight_client.rs index 92c84d262a013..afa5a0b965bd7 100644 --- a/src/query/service/src/api/rpc/flight_client.rs +++ b/src/query/service/src/api/rpc/flight_client.rs @@ -23,13 +23,11 @@ use common_arrow::arrow_format::flight::data::FlightData; use common_arrow::arrow_format::flight::data::Ticket; use common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient; use common_base::base::tokio; -use common_base::base::tokio::sync::Notify; use common_base::base::tokio::time::Duration; use common_exception::ErrorCode; use common_exception::Result; use futures::StreamExt; use futures_util::future::Either; -use log::warn; use minitrace::full_name; use minitrace::future::FutureExt; use minitrace::Span; @@ -41,6 +39,7 @@ use tonic::Streaming; use crate::api::rpc::flight_actions::FlightAction; use crate::api::rpc::packets::DataPacket; use crate::api::rpc::request_builder::RequestBuilder; +use crate::pipelines::executor::WatchNotify; pub struct FlightClient { inner: FlightServiceClient, @@ -109,15 +108,12 @@ impl FlightClient { fn streaming_receiver( query_id: &str, mut streaming: Streaming, - ) -> (Arc, Receiver>) { + ) -> (Arc, Receiver>) { let (tx, rx) = async_channel::bounded(1); - let notify = Arc::new(tokio::sync::Notify::new()); + let notify = Arc::new(WatchNotify::new()); let fut = { let notify = notify.clone(); async move { - // since the notifier will use `notify_one` to wake up us, - // it is safe to instantiate the Notified future here, even if - // the `notify_one` might be called before the instantiation. let mut notified = Box::pin(notify.notified()); let mut streaming_next = streaming.next(); @@ -152,9 +148,6 @@ impl FlightClient { .in_span(Span::enter_with_local_parent(full_name!())); tokio::spawn(async_backtrace::location!(String::from(query_id)).frame(fut)); - // use common_base::runtime::GlobalIORuntime; - // use common_base::runtime::TrySpawn; - // GlobalIORuntime::instance().spawn(query_id, fut); (notify, rx) } @@ -190,7 +183,7 @@ impl FlightClient { } pub struct FlightReceiver { - notify: Arc, + notify: Arc, rx: Receiver>, } @@ -204,7 +197,7 @@ impl FlightReceiver { pub fn create(rx: Receiver>) -> FlightReceiver { FlightReceiver { rx, - notify: Arc::new(Notify::new()), + notify: Arc::new(WatchNotify::new()), } } @@ -219,8 +212,7 @@ impl FlightReceiver { pub fn close(&self) { self.rx.close(); - // there is only one receiver, which will use the notification in a single-shot manner - self.notify.notify_one(); + self.notify.notify_waiters(); } } @@ -256,7 +248,7 @@ impl FlightSender { pub enum FlightExchange { Dummy, Receiver { - notify: Arc, + notify: Arc, receiver: Receiver>, }, Sender(Sender>), @@ -268,7 +260,7 @@ impl FlightExchange { } pub fn create_receiver( - notify: Arc, + notify: Arc, receiver: Receiver>, ) -> FlightExchange { FlightExchange::Receiver { notify, receiver } diff --git a/src/query/service/src/pipelines/executor/executor_tasks.rs b/src/query/service/src/pipelines/executor/executor_tasks.rs index 29f1566e2c007..340293c4bf8e9 100644 --- a/src/query/service/src/pipelines/executor/executor_tasks.rs +++ b/src/query/service/src/pipelines/executor/executor_tasks.rs @@ -18,14 +18,13 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use common_base::base::tokio; use common_exception::Result; use parking_lot::Mutex; use petgraph::prelude::NodeIndex; -use tokio::sync::watch; use crate::pipelines::executor::ExecutorTask; use crate::pipelines::executor::ExecutorWorkerContext; +use crate::pipelines::executor::WatchNotify; use crate::pipelines::executor::WorkersCondvar; use crate::pipelines::executor::WorkersWaitingStatus; use crate::pipelines::processors::ProcessorPtr; @@ -36,36 +35,6 @@ pub struct ExecutorTasksQueue { workers_tasks: Mutex, } -// A single value Notify based on tokio::sync::watch, -// which allows `notify_waiters` to be called before `notified` was called, -// without losing notification. -pub struct WatchNotify { - rx: watch::Receiver, - tx: watch::Sender, -} - -impl Default for WatchNotify { - fn default() -> Self { - Self::new() - } -} - -impl WatchNotify { - pub fn new() -> Self { - let (tx, rx) = watch::channel(false); - WatchNotify { rx, tx } - } - - pub async fn notified(&self) { - let mut rx = self.rx.clone(); - let _ = rx.changed().await; - } - - pub fn notify_waiters(&self) { - let _ = self.tx.send_replace(true); - } -} - impl ExecutorTasksQueue { pub fn create(workers_size: usize) -> Arc { Arc::new(ExecutorTasksQueue { diff --git a/src/query/service/src/pipelines/executor/executor_watch_notify.rs b/src/query/service/src/pipelines/executor/executor_watch_notify.rs new file mode 100644 index 0000000000000..a6826b4bf62c1 --- /dev/null +++ b/src/query/service/src/pipelines/executor/executor_watch_notify.rs @@ -0,0 +1,47 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tokio::sync::watch; + +/// A Notify based on tokio::sync::watch, +/// which allows `notify_waiters` to be called before `notified` was called, +/// without losing notification. +pub struct WatchNotify { + rx: watch::Receiver, + tx: watch::Sender, +} + +impl Default for WatchNotify { + fn default() -> Self { + Self::new() + } +} + +impl WatchNotify { + pub fn new() -> Self { + let (tx, rx) = watch::channel(false); + Self { rx, tx } + } + + pub async fn notified(&self) { + let mut rx = self.rx.clone(); + // we do care about the result, + // any change or error should wake up the waiting task + let _ = rx.changed().await; + } + + pub fn notify_waiters(&self) { + let _ = self.tx.send_replace(true); + } +} diff --git a/src/query/service/src/pipelines/executor/mod.rs b/src/query/service/src/pipelines/executor/mod.rs index 787ef7ab40604..6ceba32a3712f 100644 --- a/src/query/service/src/pipelines/executor/mod.rs +++ b/src/query/service/src/pipelines/executor/mod.rs @@ -18,6 +18,7 @@ mod executor_condvar; mod executor_graph; mod executor_settings; mod executor_tasks; +mod executor_watch_notify; mod executor_worker_context; mod pipeline_complete_executor; mod pipeline_pulling_executor; @@ -30,7 +31,7 @@ pub use executor_graph::RunningGraph; pub use executor_settings::ExecutorSettings; pub use executor_tasks::CompletedAsyncTask; pub use executor_tasks::ExecutorTasksQueue; -pub use executor_tasks::WatchNotify; +pub use executor_watch_notify::WatchNotify; pub use executor_worker_context::ExecutorTask; pub use executor_worker_context::ExecutorWorkerContext; pub use pipeline_complete_executor::PipelineCompleteExecutor; diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index a9ef8296a398a..485744ff61a04 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -41,11 +41,11 @@ use parking_lot::Mutex; use petgraph::matrix_graph::Zero; use crate::pipelines::executor::executor_graph::ScheduleQueue; -use crate::pipelines::executor::executor_tasks::WatchNotify; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::ExecutorTasksQueue; use crate::pipelines::executor::ExecutorWorkerContext; use crate::pipelines::executor::RunningGraph; +use crate::pipelines::executor::WatchNotify; use crate::pipelines::executor::WorkersCondvar; pub type InitCallback = Box Result<()> + Send + Sync + 'static>;