From e670fbee23922cb56872c2c1d7d2d8bdf886cbd2 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 8 Dec 2023 10:50:31 +0800 Subject: [PATCH] fix lint --- src/common/base/src/runtime/global_runtime.rs | 7 ++-- src/common/base/src/runtime/runtime.rs | 4 +-- src/common/base/tests/it/runtime.rs | 2 +- src/common/base/tests/it/runtime_tracker.rs | 4 +-- src/meta/client/src/grpc_client.rs | 2 +- .../src/api/rpc/exchange/exchange_manager.rs | 34 ++++++++++++------- .../api/rpc/exchange/statistics_receiver.rs | 2 +- .../service/src/api/rpc/flight_client.rs | 23 +++++++------ src/query/service/src/global_services.rs | 1 - .../src/pipelines/builders/builder_delete.rs | 2 +- .../src/servers/mysql/mysql_handler.rs | 2 +- .../src/servers/mysql/mysql_session.rs | 2 +- .../service/src/sessions/query_ctx_shared.rs | 2 +- .../tests/it/servers/mysql/mysql_handler.rs | 2 +- .../storages/fuse/src/operations/compact.rs | 7 ++-- .../storages/fuse/src/operations/read_data.rs | 4 +-- .../storages/fuse/src/pruning/fuse_pruner.rs | 3 +- 17 files changed, 59 insertions(+), 44 deletions(-) diff --git a/src/common/base/src/runtime/global_runtime.rs b/src/common/base/src/runtime/global_runtime.rs index dfd0ca246b61c..2e6d59c95d96e 100644 --- a/src/common/base/src/runtime/global_runtime.rs +++ b/src/common/base/src/runtime/global_runtime.rs @@ -35,7 +35,10 @@ impl GlobalIORuntime { let thread_num = std::cmp::max(num_cpus, num_cpus::get() / 2); let thread_num = std::cmp::max(2, thread_num); - GlobalInstance::set(Arc::new(Runtime::adhoc_instance(thread_num, "IO-worker")?)); + GlobalInstance::set(Arc::new(Runtime::with_worker_threads( + thread_num, + "IO-worker", + )?)); Ok(()) } @@ -49,7 +52,7 @@ impl GlobalQueryRuntime { let thread_num = std::cmp::max(num_cpus, num_cpus::get() / 2); let thread_num = std::cmp::max(2, thread_num); - let rt = Runtime::adhoc_instance(thread_num, "g-query-worker")?; + let rt = Runtime::with_worker_threads(thread_num, "g-query-worker")?; GlobalInstance::set(Arc::new(GlobalQueryRuntime(rt))); Ok(()) } diff --git a/src/common/base/src/runtime/runtime.rs b/src/common/base/src/runtime/runtime.rs index d6255a3f756d6..d912cf7fe1076 100644 --- a/src/common/base/src/runtime/runtime.rs +++ b/src/common/base/src/runtime/runtime.rs @@ -165,7 +165,7 @@ impl Runtime { Self::create(None, mem_stat, &mut runtime_builder) } - pub fn adhoc_instance(workers: usize, thread_name: &str) -> Result { + pub fn with_worker_threads(workers: usize, thread_name: &str) -> Result { let thread_name = { let count = ADHOC_RT_COUNTER.fetch_add(1, Ordering::SeqCst); format!("{}-{}", thread_name, count) @@ -378,7 +378,7 @@ where { // 1. build the runtime. let semaphore = Semaphore::new(permit_nums); - let runtime = Arc::new(Runtime::adhoc_instance(thread_nums, &thread_name)?); + let runtime = Arc::new(Runtime::with_worker_threads(thread_nums, &thread_name)?); // 2. spawn all the tasks to the runtime with semaphore. let join_handlers = runtime.try_spawn_batch(semaphore, futures).await?; diff --git a/src/common/base/tests/it/runtime.rs b/src/common/base/tests/it/runtime.rs index 049eac0f0489f..ba143f6934194 100644 --- a/src/common/base/tests/it/runtime.rs +++ b/src/common/base/tests/it/runtime.rs @@ -37,7 +37,7 @@ async fn test_runtime() -> Result<()> { let rt1 = Runtime::with_default_worker_threads().unwrap(); let rt1_counter = Arc::clone(&runtime_counter); let rt1_header = rt1.spawn(GLOBAL_TASK, async move { - let rt2 = Runtime::with_worker_threads(1, None).unwrap(); + let rt2 = Runtime::with_worker_threads(1, "rt2").unwrap(); let rt2_counter = Arc::clone(&rt1_counter); let rt2_header = rt2.spawn(GLOBAL_TASK, async move { let rt3 = Runtime::with_default_worker_threads().unwrap(); diff --git a/src/common/base/tests/it/runtime_tracker.rs b/src/common/base/tests/it/runtime_tracker.rs index 1493952c9926a..4d6e29f125b28 100644 --- a/src/common/base/tests/it/runtime_tracker.rs +++ b/src/common/base/tests/it/runtime_tracker.rs @@ -26,8 +26,8 @@ async fn test_async_thread_tracker() -> Result<()> { let (out_tx, out_rx) = async_channel::bounded(10); let (inner_tx, inner_rx) = async_channel::bounded(10); - let outer_runtime = Runtime::with_worker_threads(2, Some(String::from("Outer")))?; - let inner_runtime = Runtime::with_worker_threads(2, Some(String::from("Inner")))?; + let outer_runtime = Runtime::with_worker_threads(2, "Outer")?; + let inner_runtime = Runtime::with_worker_threads(2, "Inner")?; let memory_tracker = MemStat::create("test_async_thread_tracker".to_string()); let inner_join_handler = inner_runtime.spawn( diff --git a/src/meta/client/src/grpc_client.rs b/src/meta/client/src/grpc_client.rs index c9570b5edab00..2982a348a259e 100644 --- a/src/meta/client/src/grpc_client.rs +++ b/src/meta/client/src/grpc_client.rs @@ -340,7 +340,7 @@ impl MetaGrpcClient { let mgr = MetaChannelManager { timeout, conf }; - let rt = Runtime::adhoc_instance(1, "meta-client-rt").map_err(|e| { + let rt = Runtime::with_worker_threads(1, "meta-client-rt").map_err(|e| { MetaClientError::ClientRuntimeError( AnyError::new(&e).add_context(|| "when creating meta-client"), ) diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 2ded527b66594..39390abf14111 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -176,19 +176,27 @@ impl DataExchangeManager { let config = GlobalConfig::instance(); let address = address.to_string(); let task = async move { - match config.tls_query_cli_enabled() { - true => Ok(FlightClient::new(FlightServiceClient::new( - ConnectionFactory::create_rpc_channel( - address.to_owned(), - None, - Some(config.query.to_rpc_client_tls_config()), - ) - .await?, - ))), - false => Ok(FlightClient::new(FlightServiceClient::new( - ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?, - ))), - } + let tls_cfg = match config.tls_query_cli_enabled() { + true => Some(config.query.to_rpc_client_tls_config()), + false => None, + }; + Ok(FlightClient::new(FlightServiceClient::new( + ConnectionFactory::create_rpc_channel(address, None, tls_cfg).await?, + ))) + + // match config.tls_query_cli_enabled() { + // true => Ok(FlightClient::new(FlightServiceClient::new( + // ConnectionFactory::create_rpc_channel( + // address.to_owned(), + // None, + // Some(config.query.to_rpc_client_tls_config()), + // ) + // .await?, + // )), use_current_rt), + // false => Ok(FlightClient::new(FlightServiceClient::new( + // ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?, + // )), use_current_rt), + //} }; if use_current_rt { task.await diff --git a/src/query/service/src/api/rpc/exchange/statistics_receiver.rs b/src/query/service/src/api/rpc/exchange/statistics_receiver.rs index b5a53502cd3d1..3a2d451f3ceba 100644 --- a/src/query/service/src/api/rpc/exchange/statistics_receiver.rs +++ b/src/query/service/src/api/rpc/exchange/statistics_receiver.rs @@ -43,7 +43,7 @@ impl StatisticsReceiver { ) -> Result { let (shutdown_tx, _shutdown_rx) = channel(2); let mut exchange_handler = Vec::with_capacity(statistics_exchanges.len()); - let runtime = Runtime::adhoc_instance(2, "StatisticsReceiver")?; + let runtime = Runtime::with_worker_threads(2, "StatisticsReceiver")?; for (_source, exchange) in statistics_exchanges.into_iter() { let rx = exchange.convert_to_receiver(); diff --git a/src/query/service/src/api/rpc/flight_client.rs b/src/query/service/src/api/rpc/flight_client.rs index 06dedcbf5f4fc..df111baf55498 100644 --- a/src/query/service/src/api/rpc/flight_client.rs +++ b/src/query/service/src/api/rpc/flight_client.rs @@ -25,12 +25,11 @@ use common_arrow::arrow_format::flight::service::flight_service_client::FlightSe use common_base::base::tokio; use common_base::base::tokio::sync::Notify; use common_base::base::tokio::time::Duration; -use common_base::runtime::GlobalIORuntime; -use common_base::runtime::TrySpawn; use common_exception::ErrorCode; use common_exception::Result; use futures::StreamExt; use futures_util::future::Either; +use log::warn; use tonic::transport::channel::Channel; use tonic::Request; use tonic::Status; @@ -49,8 +48,8 @@ impl Drop for FlightClient { fn drop(&mut self) { let drop_thread_name = std::thread::current().name().map(ToOwned::to_owned); if drop_thread_name != self.create_thread_name { - eprintln!( - " (drop) NOT MATCH {:?} {:?}", + warn!( + "FlightClient dropped in different rt, drop-thread: {:?}, create-thread: {:?}", drop_thread_name, self.create_thread_name ); } @@ -62,7 +61,10 @@ impl FlightClient { fn check_rt(&self) { let current = std::thread::current().name().map(ToOwned::to_owned); if current != self.create_thread_name { - eprintln!(" NOT MATCH {:?} {:?}", current, self.create_thread_name); + warn!( + " FlightClient used in different rt, current thread : {:?}, create-thread: {:?}", + current, self.create_thread_name + ); } } pub fn new(mut inner: FlightServiceClient) -> FlightClient { @@ -115,7 +117,6 @@ impl FlightClient { target: &str, fragment: usize, ) -> Result { - self.check_rt(); let request = RequestBuilder::create(Ticket::default()) .with_metadata("x-type", "exchange_fragment")? .with_metadata("x-target", target)? @@ -131,13 +132,12 @@ impl FlightClient { } fn streaming_receiver( - query_id: &str, + _query_id: &str, mut streaming: Streaming, ) -> (Arc, Receiver>) { let (tx, rx) = async_channel::bounded(1); let notify = Arc::new(tokio::sync::Notify::new()); - // GlobalIORuntime::instance().spawn(query_id, { - tokio::spawn({ + let fut = { let notify = notify.clone(); async move { let mut notified = Box::pin(notify.notified()); @@ -170,8 +170,11 @@ impl FlightClient { drop(streaming); tx.close(); } - }); + }; + // TODO: shall we make this configurable? + // GlobalIORuntime::instance().spawn(query_id, fut); + tokio::spawn(fut); (notify, rx) } diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 7a5d940993b7f..7ea0d362169c3 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::BTreeMap; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; use common_base::base::GlobalInstance; diff --git a/src/query/service/src/pipelines/builders/builder_delete.rs b/src/query/service/src/pipelines/builders/builder_delete.rs index 8c350fa7642e2..7de06ef125efe 100644 --- a/src/query/service/src/pipelines/builders/builder_delete.rs +++ b/src/query/service/src/pipelines/builders/builder_delete.rs @@ -76,7 +76,7 @@ impl PipelineBuilder { self.main_pipeline.set_on_init(move || { let ctx_clone = ctx.clone(); let (partitions, info) = - Runtime::adhoc_instance(2, "mutation_block_pruning")?.block_on(async move { + Runtime::with_worker_threads(2, "mutation_block_pruning")?.block_on(async move { table_clone .do_mutation_block_pruning( ctx_clone, diff --git a/src/query/service/src/servers/mysql/mysql_handler.rs b/src/query/service/src/servers/mysql/mysql_handler.rs index 6516444ec1a9c..d97cbfc4a1ace 100644 --- a/src/query/service/src/servers/mysql/mysql_handler.rs +++ b/src/query/service/src/servers/mysql/mysql_handler.rs @@ -173,7 +173,7 @@ impl Server for MySQLHandler { match self.abort_registration.take() { None => Err(ErrorCode::Internal("MySQLHandler already running.")), Some(registration) => { - let rejected_rt = Arc::new(Runtime::adhoc_instance(1, "mysql-handler")?); + let rejected_rt = Arc::new(Runtime::with_worker_threads(1, "mysql-handler")?); let (stream, listener) = Self::listener_tcp(listening).await?; let stream = Abortable::new(stream, registration); self.join_handle = Some(tokio::spawn( diff --git a/src/query/service/src/servers/mysql/mysql_session.rs b/src/query/service/src/servers/mysql/mysql_session.rs index ef29582688cfc..b135f74641b78 100644 --- a/src/query/service/src/servers/mysql/mysql_session.rs +++ b/src/query/service/src/servers/mysql/mysql_session.rs @@ -50,7 +50,7 @@ impl MySQLConnection { MySQLConnection::attach_session(&session, &blocking_stream)?; let non_blocking_stream = TcpStream::from_std(blocking_stream)?; - let query_executor = Runtime::adhoc_instance(1, "mysql-query-executor")?; + let query_executor = Runtime::with_worker_threads(1, "mysql-query-executor")?; Thread::spawn(move || { let join_handle = query_executor.spawn(GLOBAL_TASK, async move { let client_addr = match non_blocking_stream.peer_addr() { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 07e3525a905ff..debbeb2bd281e 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -333,7 +333,7 @@ impl QueryContextShared { Some(query_runtime) => Ok(query_runtime.clone()), None => { // To avoid possible deadlock, we should keep at least two threads. - let runtime = Arc::new(Runtime::adhoc_instance(2, "query-ctx")?); + let runtime = Arc::new(Runtime::with_worker_threads(2, "query-ctx")?); *query_runtime = Some(runtime.clone()); Ok(runtime) } diff --git a/src/query/service/tests/it/servers/mysql/mysql_handler.rs b/src/query/service/tests/it/servers/mysql/mysql_handler.rs index 051485cf5c4fd..72c236bc218b0 100644 --- a/src/query/service/tests/it/servers/mysql/mysql_handler.rs +++ b/src/query/service/tests/it/servers/mysql/mysql_handler.rs @@ -163,7 +163,7 @@ async fn test_rejected_session_with_parallel() -> Result<()> { let start_barriers = Arc::new(Barrier::new(3)); let destroy_barriers = Arc::new(Barrier::new(3)); - let runtime = Runtime::adhoc_instance(2, "connect_server")?; + let runtime = Runtime::with_worker_threads(2, "connect_server")?; let mut join_handlers = Vec::with_capacity(3); for _ in 0..3 { let port = listening.port(); diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 2cfc23430112c..3f9ccb47c3232 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -137,8 +137,8 @@ impl FuseTable { pipeline.set_on_init(move || { let ctx = query_ctx.clone(); let column_ids = column_ids.clone(); - let partitions = - Runtime::adhoc_instance(2, "build_compact_task")?.block_on(async move { + let partitions = Runtime::with_worker_threads(2, "build_compact_task")?.block_on( + async move { let partitions = BlockCompactMutator::build_compact_tasks( ctx.clone(), column_ids, @@ -149,7 +149,8 @@ impl FuseTable { .await?; Result::<_, ErrorCode>::Ok(partitions) - })?; + }, + )?; let partitions = Partitions::create_nolazy(PartitionsShuffleKind::Mod, partitions); query_ctx.set_partitions(partitions)?; diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 35e1e83354aa9..cef4282693059 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -178,8 +178,8 @@ impl FuseTable { let push_downs = push_downs.clone(); // let lazy_init_segments = lazy_init_segments.clone(); - let partitions = - Runtime::adhoc_instance(2, "prune_snapshot_blocks")?.block_on(async move { + let partitions = Runtime::with_worker_threads(2, "prune_snapshot_blocks")? + .block_on(async move { let (_statistics, partitions) = table .prune_snapshot_blocks( ctx, diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index bbb2086f6e463..dbc3e6b83b494 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -149,7 +149,8 @@ impl PruningContext { let max_threads = ctx.get_settings().get_max_threads()? as usize; // Pruning runtime. - let pruning_runtime = Arc::new(Runtime::adhoc_instance(max_threads, "pruning-worker")?); + let pruning_runtime = + Arc::new(Runtime::with_worker_threads(max_threads, "pruning-worker")?); let pruning_semaphore = Arc::new(Semaphore::new(max_concurrency)); let pruning_stats = Arc::new(FusePruningStatistics::default());