Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Dec 10, 2023
1 parent ec21fc9 commit 831c185
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 50 deletions.
24 changes: 8 additions & 16 deletions src/query/service/src/api/rpc/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ use common_arrow::arrow_format::flight::data::FlightData;
use common_arrow::arrow_format::flight::data::Ticket;
use common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient;
use common_base::base::tokio;
use common_base::base::tokio::sync::Notify;
use common_base::base::tokio::time::Duration;
use common_exception::ErrorCode;
use common_exception::Result;
use futures::StreamExt;
use futures_util::future::Either;
use log::warn;
use minitrace::full_name;
use minitrace::future::FutureExt;
use minitrace::Span;
Expand All @@ -41,6 +39,7 @@ use tonic::Streaming;
use crate::api::rpc::flight_actions::FlightAction;
use crate::api::rpc::packets::DataPacket;
use crate::api::rpc::request_builder::RequestBuilder;
use crate::pipelines::executor::WatchNotify;

pub struct FlightClient {
inner: FlightServiceClient<Channel>,
Expand Down Expand Up @@ -109,15 +108,12 @@ impl FlightClient {
fn streaming_receiver(
query_id: &str,
mut streaming: Streaming<FlightData>,
) -> (Arc<Notify>, Receiver<Result<FlightData>>) {
) -> (Arc<WatchNotify>, Receiver<Result<FlightData>>) {
let (tx, rx) = async_channel::bounded(1);
let notify = Arc::new(tokio::sync::Notify::new());
let notify = Arc::new(WatchNotify::new());
let fut = {
let notify = notify.clone();
async move {
// since the notifier will use `notify_one` to wake up us,
// it is safe to instantiate the Notified future here, even if
// the `notify_one` might be called before the instantiation.
let mut notified = Box::pin(notify.notified());
let mut streaming_next = streaming.next();

Expand Down Expand Up @@ -152,9 +148,6 @@ impl FlightClient {
.in_span(Span::enter_with_local_parent(full_name!()));

tokio::spawn(async_backtrace::location!(String::from(query_id)).frame(fut));
// use common_base::runtime::GlobalIORuntime;
// use common_base::runtime::TrySpawn;
// GlobalIORuntime::instance().spawn(query_id, fut);

(notify, rx)
}
Expand Down Expand Up @@ -190,7 +183,7 @@ impl FlightClient {
}

pub struct FlightReceiver {
notify: Arc<Notify>,
notify: Arc<WatchNotify>,
rx: Receiver<Result<FlightData>>,
}

Expand All @@ -204,7 +197,7 @@ impl FlightReceiver {
pub fn create(rx: Receiver<Result<FlightData>>) -> FlightReceiver {
FlightReceiver {
rx,
notify: Arc::new(Notify::new()),
notify: Arc::new(WatchNotify::new()),
}
}

Expand All @@ -219,8 +212,7 @@ impl FlightReceiver {

pub fn close(&self) {
self.rx.close();
// there is only one receiver, which will use the notification in a single-shot manner
self.notify.notify_one();
self.notify.notify_waiters();
}
}

Expand Down Expand Up @@ -256,7 +248,7 @@ impl FlightSender {
pub enum FlightExchange {
Dummy,
Receiver {
notify: Arc<Notify>,
notify: Arc<WatchNotify>,
receiver: Receiver<Result<FlightData>>,
},
Sender(Sender<Result<FlightData, Status>>),
Expand All @@ -268,7 +260,7 @@ impl FlightExchange {
}

pub fn create_receiver(
notify: Arc<Notify>,
notify: Arc<WatchNotify>,
receiver: Receiver<Result<FlightData>>,
) -> FlightExchange {
FlightExchange::Receiver { notify, receiver }
Expand Down
33 changes: 1 addition & 32 deletions src/query/service/src/pipelines/executor/executor_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use common_base::base::tokio;
use common_exception::Result;
use parking_lot::Mutex;
use petgraph::prelude::NodeIndex;
use tokio::sync::watch;

use crate::pipelines::executor::ExecutorTask;
use crate::pipelines::executor::ExecutorWorkerContext;
use crate::pipelines::executor::WatchNotify;
use crate::pipelines::executor::WorkersCondvar;
use crate::pipelines::executor::WorkersWaitingStatus;
use crate::pipelines::processors::ProcessorPtr;
Expand All @@ -36,36 +35,6 @@ pub struct ExecutorTasksQueue {
workers_tasks: Mutex<ExecutorTasks>,
}

// A single value Notify based on tokio::sync::watch,
// which allows `notify_waiters` to be called before `notified` was called,
// without losing notification.
pub struct WatchNotify {
rx: watch::Receiver<bool>,
tx: watch::Sender<bool>,
}

impl Default for WatchNotify {
fn default() -> Self {
Self::new()
}
}

impl WatchNotify {
pub fn new() -> Self {
let (tx, rx) = watch::channel(false);
WatchNotify { rx, tx }
}

pub async fn notified(&self) {
let mut rx = self.rx.clone();
let _ = rx.changed().await;
}

pub fn notify_waiters(&self) {
let _ = self.tx.send_replace(true);
}
}

impl ExecutorTasksQueue {
pub fn create(workers_size: usize) -> Arc<ExecutorTasksQueue> {
Arc::new(ExecutorTasksQueue {
Expand Down
47 changes: 47 additions & 0 deletions src/query/service/src/pipelines/executor/executor_watch_notify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use tokio::sync::watch;

/// A Notify based on tokio::sync::watch,
/// which allows `notify_waiters` to be called before `notified` was called,
/// without losing notification.
pub struct WatchNotify {
rx: watch::Receiver<bool>,
tx: watch::Sender<bool>,
}

impl Default for WatchNotify {
fn default() -> Self {
Self::new()
}
}

impl WatchNotify {
pub fn new() -> Self {
let (tx, rx) = watch::channel(false);
Self { rx, tx }
}

pub async fn notified(&self) {
let mut rx = self.rx.clone();
// we do care about the result,
// any change or error should wake up the waiting task
let _ = rx.changed().await;
}

pub fn notify_waiters(&self) {
let _ = self.tx.send_replace(true);
}
}
3 changes: 2 additions & 1 deletion src/query/service/src/pipelines/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod executor_condvar;
mod executor_graph;
mod executor_settings;
mod executor_tasks;
mod executor_watch_notify;
mod executor_worker_context;
mod pipeline_complete_executor;
mod pipeline_pulling_executor;
Expand All @@ -30,7 +31,7 @@ pub use executor_graph::RunningGraph;
pub use executor_settings::ExecutorSettings;
pub use executor_tasks::CompletedAsyncTask;
pub use executor_tasks::ExecutorTasksQueue;
pub use executor_tasks::WatchNotify;
pub use executor_watch_notify::WatchNotify;
pub use executor_worker_context::ExecutorTask;
pub use executor_worker_context::ExecutorWorkerContext;
pub use pipeline_complete_executor::PipelineCompleteExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ use parking_lot::Mutex;
use petgraph::matrix_graph::Zero;

use crate::pipelines::executor::executor_graph::ScheduleQueue;
use crate::pipelines::executor::executor_tasks::WatchNotify;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::ExecutorTasksQueue;
use crate::pipelines::executor::ExecutorWorkerContext;
use crate::pipelines::executor::RunningGraph;
use crate::pipelines::executor::WatchNotify;
use crate::pipelines::executor::WorkersCondvar;

pub type InitCallback = Box<dyn FnOnce() -> Result<()> + Send + Sync + 'static>;
Expand Down

0 comments on commit 831c185

Please sign in to comment.