Skip to content

Commit

Permalink
Revert "fix: create FlightReceiver using current rt"
Browse files Browse the repository at this point in the history
This reverts commit 6a0dbe6.
  • Loading branch information
dantengsky committed Dec 10, 2023
1 parent 6d71141 commit ec21fc9
Show file tree
Hide file tree
Showing 17 changed files with 72 additions and 97 deletions.
4 changes: 2 additions & 2 deletions src/common/base/src/runtime/global_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl GlobalIORuntime {

GlobalInstance::set(Arc::new(Runtime::with_worker_threads(
thread_num,
"IO-worker",
Some("IO-worker".to_owned()),
)?));
Ok(())
}
Expand All @@ -52,7 +52,7 @@ impl GlobalQueryRuntime {
let thread_num = std::cmp::max(num_cpus, num_cpus::get() / 2);
let thread_num = std::cmp::max(2, thread_num);

let rt = Runtime::with_worker_threads(thread_num, "g-query-worker")?;
let rt = Runtime::with_worker_threads(thread_num, Some("g-query-worker".to_owned()))?;
GlobalInstance::set(Arc::new(GlobalQueryRuntime(rt)));
Ok(())
}
Expand Down
4 changes: 0 additions & 4 deletions src/common/base/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ mod runtime_tracker;
mod thread;
mod thread_pool;

use std::sync::atomic::AtomicUsize;

pub use backtrace::dump_backtrace;
pub use backtrace::get_all_tasks;
pub use backtrace::AsyncTaskItem;
Expand All @@ -47,5 +45,3 @@ pub use thread::Thread;
pub use thread::ThreadJoinHandle;
pub use thread_pool::TaskJoinHandler;
pub use thread_pool::ThreadPool;

static ADHOC_RT_COUNTER: AtomicUsize = AtomicUsize::new(0);
18 changes: 5 additions & 13 deletions src/common/base/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::backtrace::Backtrace;
use std::future::Future;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
Expand All @@ -33,7 +32,6 @@ use tokio::task::JoinHandle;

use crate::runtime::catch_unwind::CatchUnwindFuture;
use crate::runtime::MemStat;
use crate::runtime::ADHOC_RT_COUNTER;

/// Methods to spawn tasks.
pub trait TrySpawn {
Expand Down Expand Up @@ -165,17 +163,8 @@ impl Runtime {
Self::create(None, mem_stat, &mut runtime_builder)
}

pub fn with_worker_threads(workers: usize, thread_name: &str) -> Result<Self> {
let thread_name = {
let count = ADHOC_RT_COUNTER.fetch_add(1, Ordering::SeqCst);
format!("{}-{}", thread_name, count)
};

Self::create_with_worker_threads(workers, Some(thread_name))
}

#[allow(unused_mut)]
fn create_with_worker_threads(workers: usize, mut thread_name: Option<String>) -> Result<Self> {
pub fn with_worker_threads(workers: usize, mut thread_name: Option<String>) -> Result<Self> {
let mut mem_stat_name = String::from("UnnamedRuntime");

if let Some(thread_name) = thread_name.as_ref() {
Expand Down Expand Up @@ -378,7 +367,10 @@ where
{
// 1. build the runtime.
let semaphore = Semaphore::new(permit_nums);
let runtime = Arc::new(Runtime::with_worker_threads(thread_nums, &thread_name)?);
let runtime = Arc::new(Runtime::with_worker_threads(
thread_nums,
Some(thread_name),
)?);

// 2. spawn all the tasks to the runtime with semaphore.
let join_handlers = runtime.try_spawn_batch(semaphore, futures).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/common/base/tests/it/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn test_runtime() -> Result<()> {
let rt1 = Runtime::with_default_worker_threads().unwrap();
let rt1_counter = Arc::clone(&runtime_counter);
let rt1_header = rt1.spawn(GLOBAL_TASK, async move {
let rt2 = Runtime::with_worker_threads(1, "rt2").unwrap();
let rt2 = Runtime::with_worker_threads(1, None).unwrap();
let rt2_counter = Arc::clone(&rt1_counter);
let rt2_header = rt2.spawn(GLOBAL_TASK, async move {
let rt3 = Runtime::with_default_worker_threads().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/common/base/tests/it/runtime_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ async fn test_async_thread_tracker() -> Result<()> {
let (out_tx, out_rx) = async_channel::bounded(10);
let (inner_tx, inner_rx) = async_channel::bounded(10);

let outer_runtime = Runtime::with_worker_threads(2, "Outer")?;
let inner_runtime = Runtime::with_worker_threads(2, "Inner")?;
let outer_runtime = Runtime::with_worker_threads(2, Some(String::from("Outer")))?;
let inner_runtime = Runtime::with_worker_threads(2, Some(String::from("Inner")))?;

let memory_tracker = MemStat::create("test_async_thread_tracker".to_string());
let inner_join_handler = inner_runtime.spawn(
Expand Down
11 changes: 6 additions & 5 deletions src/meta/client/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,12 @@ impl MetaGrpcClient {

let mgr = MetaChannelManager { timeout, conf };

let rt = Runtime::with_worker_threads(1, "meta-client-rt").map_err(|e| {
MetaClientError::ClientRuntimeError(
AnyError::new(&e).add_context(|| "when creating meta-client"),
)
})?;
let rt =
Runtime::with_worker_threads(1, Some("meta-client-rt".to_string())).map_err(|e| {
MetaClientError::ClientRuntimeError(
AnyError::new(&e).add_context(|| "when creating meta-client"),
)
})?;
let rt = Arc::new(rt);

// Build the handle-worker pair
Expand Down
21 changes: 13 additions & 8 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,19 @@ impl DataExchangeManager {
let config = GlobalConfig::instance();
let address = address.to_string();
let task = async move {
let tls_cfg = if config.tls_query_cli_enabled() {
Some(config.query.to_rpc_client_tls_config())
} else {
None
};
Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address, None, tls_cfg).await?,
)))
match config.tls_query_cli_enabled() {
true => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(
address.to_owned(),
None,
Some(config.query.to_rpc_client_tls_config()),
)
.await?,
))),
false => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
))),
}
};
if use_current_rt {
task.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl StatisticsReceiver {
) -> Result<StatisticsReceiver> {
let (shutdown_tx, _shutdown_rx) = channel(2);
let mut exchange_handler = Vec::with_capacity(statistics_exchanges.len());
let runtime = Runtime::with_worker_threads(2, "StatisticsReceiver")?;
let runtime = Runtime::with_worker_threads(2, Some(String::from("StatisticsReceiver")))?;

for (_source, exchange) in statistics_exchanges.into_iter() {
let rx = exchange.convert_to_receiver();
Expand Down
27 changes: 1 addition & 26 deletions src/query/service/src/api/rpc/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,41 +44,19 @@ use crate::api::rpc::request_builder::RequestBuilder;

pub struct FlightClient {
inner: FlightServiceClient<Channel>,
create_thread_name: Option<String>,
}

impl Drop for FlightClient {
fn drop(&mut self) {
self.check_rt("drop")
}
}

// TODO: Integration testing required
impl FlightClient {
fn check_rt(&self, operation: &str) {
let current_thread = std::thread::current();
let name = current_thread.name();
if name != self.create_thread_name.as_deref() {
warn!(
" FlightClient used in different rt, operation {}, current thread : {:?}, create thread: {:?}",
operation, name, self.create_thread_name
);
}
}
pub fn new(mut inner: FlightServiceClient<Channel>) -> FlightClient {
inner = inner.max_decoding_message_size(usize::MAX);
inner = inner.max_encoding_message_size(usize::MAX);

let create_thread_name = std::thread::current().name().map(ToOwned::to_owned);
FlightClient {
inner,
create_thread_name,
}
FlightClient { inner }
}

#[async_backtrace::framed]
pub async fn execute_action(&mut self, action: FlightAction, timeout: u64) -> Result<()> {
self.check_rt("execute_action");
if let Err(cause) = self.do_action(action, timeout).await {
return Err(cause.add_message_back("(while in query flight)"));
}
Expand All @@ -92,7 +70,6 @@ impl FlightClient {
query_id: &str,
target: &str,
) -> Result<FlightExchange> {
self.check_rt("request_server_exchange");
let streaming = self
.get_streaming(
RequestBuilder::create(Ticket::default())
Expand All @@ -115,7 +92,6 @@ impl FlightClient {
target: &str,
fragment: usize,
) -> Result<FlightExchange> {
self.check_rt("do_get");
let request = RequestBuilder::create(Ticket::default())
.with_metadata("x-type", "exchange_fragment")?
.with_metadata("x-target", target)?
Expand All @@ -136,7 +112,6 @@ impl FlightClient {
) -> (Arc<Notify>, Receiver<Result<FlightData>>) {
let (tx, rx) = async_channel::bounded(1);
let notify = Arc::new(tokio::sync::Notify::new());
// let mut fused = streaming.fuse();
let fut = {
let notify = notify.clone();
async move {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/pipelines/builders/builder_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl PipelineBuilder {
self.main_pipeline.set_on_init(move || {
let ctx_clone = ctx.clone();
let (partitions, info) =
Runtime::with_worker_threads(2, "mutation_block_pruning")?.block_on(async move {
Runtime::with_worker_threads(2, None)?.block_on(async move {
table_clone
.do_mutation_block_pruning(
ctx_clone,
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ impl Server for MySQLHandler {
match self.abort_registration.take() {
None => Err(ErrorCode::Internal("MySQLHandler already running.")),
Some(registration) => {
let rejected_rt = Arc::new(Runtime::with_worker_threads(1, "mysql-handler")?);
let rejected_rt = Arc::new(Runtime::with_worker_threads(
1,
Some("mysql-handler".to_string()),
)?);
let (stream, listener) = Self::listener_tcp(listening).await?;
let stream = Abortable::new(stream, registration);
self.join_handle = Some(tokio::spawn(
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/servers/mysql/mysql_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl MySQLConnection {
MySQLConnection::attach_session(&session, &blocking_stream)?;

let non_blocking_stream = TcpStream::from_std(blocking_stream)?;
let query_executor = Runtime::with_worker_threads(1, "mysql-query-executor")?;
let query_executor =
Runtime::with_worker_threads(1, Some("mysql-query-executor".to_string()))?;
Thread::spawn(move || {
let join_handle = query_executor.spawn(GLOBAL_TASK, async move {
let client_addr = match non_blocking_stream.peer_addr() {
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,10 @@ impl QueryContextShared {
Some(query_runtime) => Ok(query_runtime.clone()),
None => {
// To avoid possible deadlock, we should keep at least two threads.
let runtime = Arc::new(Runtime::with_worker_threads(2, "query-ctx")?);
let runtime = Arc::new(Runtime::with_worker_threads(
2,
Some("query-ctx".to_string()),
)?);
*query_runtime = Some(runtime.clone());
Ok(runtime)
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn test_rejected_session_with_parallel() -> Result<()> {
let start_barriers = Arc::new(Barrier::new(3));
let destroy_barriers = Arc::new(Barrier::new(3));

let runtime = Runtime::with_worker_threads(2, "connect_server")?;
let runtime = Runtime::with_worker_threads(2, None)?;
let mut join_handlers = Vec::with_capacity(3);
for _ in 0..3 {
let port = listening.port();
Expand Down
26 changes: 12 additions & 14 deletions src/query/storages/fuse/src/operations/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,18 @@ impl FuseTable {
pipeline.set_on_init(move || {
let ctx = query_ctx.clone();
let column_ids = column_ids.clone();
let partitions = Runtime::with_worker_threads(2, "build_compact_task")?.block_on(
async move {
let partitions = BlockCompactMutator::build_compact_tasks(
ctx.clone(),
column_ids,
cluster_key_id,
thresholds,
lazy_parts,
)
.await?;

Result::<_, ErrorCode>::Ok(partitions)
},
)?;
let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move {
let partitions = BlockCompactMutator::build_compact_tasks(
ctx.clone(),
column_ids,
cluster_key_id,
thresholds,
lazy_parts,
)
.await?;

Result::<_, ErrorCode>::Ok(partitions)
})?;

let partitions = Partitions::create_nolazy(PartitionsShuffleKind::Mod, partitions);
query_ctx.set_partitions(partitions)?;
Expand Down
27 changes: 13 additions & 14 deletions src/query/storages/fuse/src/operations/read_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,20 @@ impl FuseTable {
let push_downs = push_downs.clone();
// let lazy_init_segments = lazy_init_segments.clone();

let partitions = Runtime::with_worker_threads(2, "prune_snapshot_blocks")?
.block_on(async move {
let (_statistics, partitions) = table
.prune_snapshot_blocks(
ctx,
dal,
push_downs,
table_schema,
lazy_init_segments,
0,
)
.await?;
let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move {
let (_statistics, partitions) = table
.prune_snapshot_blocks(
ctx,
dal,
push_downs,
table_schema,
lazy_init_segments,
0,
)
.await?;

Result::<_, ErrorCode>::Ok(partitions)
})?;
Result::<_, ErrorCode>::Ok(partitions)
})?;

query_ctx.set_partitions(partitions)?;
Ok(())
Expand Down
6 changes: 4 additions & 2 deletions src/query/storages/fuse/src/pruning/fuse_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ impl PruningContext {
let max_threads = ctx.get_settings().get_max_threads()? as usize;

// Pruning runtime.
let pruning_runtime =
Arc::new(Runtime::with_worker_threads(max_threads, "pruning-worker")?);
let pruning_runtime = Arc::new(Runtime::with_worker_threads(
max_threads,
Some("pruning-worker".to_owned()),
)?);
let pruning_semaphore = Arc::new(Semaphore::new(max_concurrency));
let pruning_stats = Arc::new(FusePruningStatistics::default());

Expand Down

0 comments on commit ec21fc9

Please sign in to comment.