diff --git a/Cargo.lock b/Cargo.lock index adfd3b9a1d917..d4dd623dde001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4114,6 +4114,7 @@ dependencies = [ "temp-env", "tempfile", "time", + "tokio", "tokio-stream", "toml 0.7.6", "tonic 0.10.2", diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index 272f2cc1e30d6..477f34882fb74 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -24,6 +24,7 @@ mod stoppable; mod string; mod take_mut; mod uniq_id; +mod watch_notify; pub use net::get_free_tcp_port; pub use net::get_free_udp_port; @@ -52,3 +53,4 @@ pub use tokio; pub use uniq_id::GlobalSequence; pub use uniq_id::GlobalUniqName; pub use uuid; +pub use watch_notify::WatchNotify; diff --git a/src/common/base/src/base/watch_notify.rs b/src/common/base/src/base/watch_notify.rs new file mode 100644 index 0000000000000..be05dfc9028c0 --- /dev/null +++ b/src/common/base/src/base/watch_notify.rs @@ -0,0 +1,71 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tokio::sync::watch; + +/// A Notify based on tokio::sync::watch, +/// which allows `notify_waiters` to be called before `notified` was called, +/// without losing notification. +pub struct WatchNotify { + rx: watch::Receiver, + tx: watch::Sender, +} + +impl Default for WatchNotify { + fn default() -> Self { + Self::new() + } +} + +impl WatchNotify { + pub fn new() -> Self { + let (tx, rx) = watch::channel(false); + Self { rx, tx } + } + + pub async fn notified(&self) { + let mut rx = self.rx.clone(); + // we do care about the result, + // any change or error should wake up the waiting task + let _ = rx.changed().await; + } + + pub fn notify_waiters(&self) { + let _ = self.tx.send_replace(true); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_notify() { + let notify = WatchNotify::new(); + let notified = notify.notified(); + notify.notify_waiters(); + notified.await; + } + + #[tokio::test] + async fn test_notify_waiters_ahead() { + let notify = WatchNotify::new(); + // notify_waiters ahead of notified being instantiated and awaited + notify.notify_waiters(); + + // this should not await indefinitely + let notified = notify.notified(); + notified.await; + } +} diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index d6f6d508163e2..622ef36313fb9 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -156,6 +156,7 @@ socket2 = "0.5.3" strength_reduce = "0.2.4" tempfile = "3.4.0" time = "0.3.14" +tokio = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } toml = { version = "0.7.3", default-features = false } tonic = { workspace = true } @@ -185,7 +186,6 @@ temp-env = "0.3.0" tempfile = "3.4.0" tower = "0.4.13" url = "2.3.1" -walkdir = { workspace = true } wiremock = "0.5.14" [build-dependencies] diff --git a/src/query/service/src/api/rpc/flight_client.rs b/src/query/service/src/api/rpc/flight_client.rs index eb4d09f9512b7..afa5a0b965bd7 100644 --- a/src/query/service/src/api/rpc/flight_client.rs +++ b/src/query/service/src/api/rpc/flight_client.rs @@ -23,14 +23,14 @@ use common_arrow::arrow_format::flight::data::FlightData; use common_arrow::arrow_format::flight::data::Ticket; use common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient; use common_base::base::tokio; -use common_base::base::tokio::sync::Notify; use common_base::base::tokio::time::Duration; -use common_base::runtime::GlobalIORuntime; -use common_base::runtime::TrySpawn; use common_exception::ErrorCode; use common_exception::Result; use futures::StreamExt; use futures_util::future::Either; +use minitrace::full_name; +use minitrace::future::FutureExt; +use minitrace::Span; use tonic::transport::channel::Channel; use tonic::Request; use tonic::Status; @@ -39,6 +39,7 @@ use tonic::Streaming; use crate::api::rpc::flight_actions::FlightAction; use crate::api::rpc::packets::DataPacket; use crate::api::rpc::request_builder::RequestBuilder; +use crate::pipelines::executor::WatchNotify; pub struct FlightClient { inner: FlightServiceClient, @@ -107,10 +108,10 @@ impl FlightClient { fn streaming_receiver( query_id: &str, mut streaming: Streaming, - ) -> (Arc, Receiver>) { + ) -> (Arc, Receiver>) { let (tx, rx) = async_channel::bounded(1); - let notify = Arc::new(tokio::sync::Notify::new()); - GlobalIORuntime::instance().spawn(query_id, { + let notify = Arc::new(WatchNotify::new()); + let fut = { let notify = notify.clone(); async move { let mut notified = Box::pin(notify.notified()); @@ -143,7 +144,10 @@ impl FlightClient { drop(streaming); tx.close(); } - }); + } + .in_span(Span::enter_with_local_parent(full_name!())); + + tokio::spawn(async_backtrace::location!(String::from(query_id)).frame(fut)); (notify, rx) } @@ -179,15 +183,21 @@ impl FlightClient { } pub struct FlightReceiver { - notify: Arc, + notify: Arc, rx: Receiver>, } +impl Drop for FlightReceiver { + fn drop(&mut self) { + self.close(); + } +} + impl FlightReceiver { pub fn create(rx: Receiver>) -> FlightReceiver { FlightReceiver { rx, - notify: Arc::new(Notify::new()), + notify: Arc::new(WatchNotify::new()), } } @@ -238,7 +248,7 @@ impl FlightSender { pub enum FlightExchange { Dummy, Receiver { - notify: Arc, + notify: Arc, receiver: Receiver>, }, Sender(Sender>), @@ -250,7 +260,7 @@ impl FlightExchange { } pub fn create_receiver( - notify: Arc, + notify: Arc, receiver: Receiver>, ) -> FlightExchange { FlightExchange::Receiver { notify, receiver } diff --git a/src/query/service/src/pipelines/executor/executor_tasks.rs b/src/query/service/src/pipelines/executor/executor_tasks.rs index 51ab6f15a5e98..340293c4bf8e9 100644 --- a/src/query/service/src/pipelines/executor/executor_tasks.rs +++ b/src/query/service/src/pipelines/executor/executor_tasks.rs @@ -18,20 +18,20 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use common_base::base::tokio::sync::Notify; use common_exception::Result; use parking_lot::Mutex; use petgraph::prelude::NodeIndex; use crate::pipelines::executor::ExecutorTask; use crate::pipelines::executor::ExecutorWorkerContext; +use crate::pipelines::executor::WatchNotify; use crate::pipelines::executor::WorkersCondvar; use crate::pipelines::executor::WorkersWaitingStatus; use crate::pipelines::processors::ProcessorPtr; pub struct ExecutorTasksQueue { finished: Arc, - finished_notify: Arc, + finished_notify: Arc, workers_tasks: Mutex, } @@ -39,7 +39,7 @@ impl ExecutorTasksQueue { pub fn create(workers_size: usize) -> Arc { Arc::new(ExecutorTasksQueue { finished: Arc::new(AtomicBool::new(false)), - finished_notify: Arc::new(Notify::new()), + finished_notify: Arc::new(WatchNotify::new()), workers_tasks: Mutex::new(ExecutorTasks::create(workers_size)), }) } @@ -183,7 +183,7 @@ impl ExecutorTasksQueue { } } - pub fn get_finished_notify(&self) -> Arc { + pub fn get_finished_notify(&self) -> Arc { self.finished_notify.clone() } diff --git a/src/query/service/src/pipelines/executor/mod.rs b/src/query/service/src/pipelines/executor/mod.rs index 91a6373dfff07..00b8b0a6227c8 100644 --- a/src/query/service/src/pipelines/executor/mod.rs +++ b/src/query/service/src/pipelines/executor/mod.rs @@ -24,6 +24,7 @@ mod pipeline_pulling_executor; mod pipeline_pushing_executor; mod processor_async_task; +pub use common_base::base::WatchNotify; pub use executor_condvar::WorkersCondvar; pub use executor_condvar::WorkersWaitingStatus; pub use executor_graph::RunningGraph; diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index afed238f42fbb..485744ff61a04 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::time::Instant; use common_base::base::tokio; -use common_base::base::tokio::sync::Notify; use common_base::runtime::catch_unwind; use common_base::runtime::GlobalIORuntime; use common_base::runtime::Runtime; @@ -46,6 +45,7 @@ use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::ExecutorTasksQueue; use crate::pipelines::executor::ExecutorWorkerContext; use crate::pipelines::executor::RunningGraph; +use crate::pipelines::executor::WatchNotify; use crate::pipelines::executor::WorkersCondvar; pub type InitCallback = Box Result<()> + Send + Sync + 'static>; @@ -62,7 +62,7 @@ pub struct PipelineExecutor { on_init_callback: Mutex>, on_finished_callback: Mutex>, settings: ExecutorSettings, - finished_notify: Arc, + finished_notify: Arc, finished_error: Mutex>, #[allow(unused)] lock_guards: Vec, @@ -195,7 +195,7 @@ impl PipelineExecutor { async_runtime: GlobalIORuntime::instance(), settings, finished_error: Mutex::new(None), - finished_notify: Arc::new(Notify::new()), + finished_notify: Arc::new(WatchNotify::new()), lock_guards, })) } diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs index 83e7aba710c9f..2cc7eb275cd4c 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs @@ -16,7 +16,6 @@ use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use common_base::base::tokio::sync::Notify; use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::types::DataType; @@ -35,6 +34,7 @@ use common_sql::executor::physical_plans::RangeJoinType; use parking_lot::Mutex; use parking_lot::RwLock; +use crate::pipelines::executor::WatchNotify; use crate::pipelines::processors::transforms::range_join::IEJoinState; use crate::sessions::QueryContext; @@ -51,7 +51,7 @@ pub struct RangeJoinState { pub(crate) other_conditions: Vec, // Pipeline event related pub(crate) partition_finished: Mutex, - pub(crate) finished_notify: Arc, + pub(crate) finished_notify: Arc, pub(crate) left_sinker_count: RwLock, pub(crate) right_sinker_count: RwLock, // Task that need to be executed, pair.0 is left table block, pair.1 is right table block @@ -81,7 +81,7 @@ impl RangeJoinState { // join_type: range_join.join_type.clone(), other_conditions: range_join.other_conditions.clone(), partition_finished: Mutex::new(false), - finished_notify: Arc::new(Notify::new()), + finished_notify: Arc::new(WatchNotify::new()), left_sinker_count: RwLock::new(0), right_sinker_count: RwLock::new(0), tasks: RwLock::new(vec![]),