Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Dec 8, 2023
1 parent aa010a1 commit e670fbe
Show file tree
Hide file tree
Showing 17 changed files with 59 additions and 44 deletions.
7 changes: 5 additions & 2 deletions src/common/base/src/runtime/global_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ impl GlobalIORuntime {
let thread_num = std::cmp::max(num_cpus, num_cpus::get() / 2);
let thread_num = std::cmp::max(2, thread_num);

GlobalInstance::set(Arc::new(Runtime::adhoc_instance(thread_num, "IO-worker")?));
GlobalInstance::set(Arc::new(Runtime::with_worker_threads(
thread_num,
"IO-worker",
)?));
Ok(())
}

Expand All @@ -49,7 +52,7 @@ impl GlobalQueryRuntime {
let thread_num = std::cmp::max(num_cpus, num_cpus::get() / 2);
let thread_num = std::cmp::max(2, thread_num);

let rt = Runtime::adhoc_instance(thread_num, "g-query-worker")?;
let rt = Runtime::with_worker_threads(thread_num, "g-query-worker")?;
GlobalInstance::set(Arc::new(GlobalQueryRuntime(rt)));
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/base/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl Runtime {
Self::create(None, mem_stat, &mut runtime_builder)
}

pub fn adhoc_instance(workers: usize, thread_name: &str) -> Result<Self> {
pub fn with_worker_threads(workers: usize, thread_name: &str) -> Result<Self> {
let thread_name = {
let count = ADHOC_RT_COUNTER.fetch_add(1, Ordering::SeqCst);
format!("{}-{}", thread_name, count)
Expand Down Expand Up @@ -378,7 +378,7 @@ where
{
// 1. build the runtime.
let semaphore = Semaphore::new(permit_nums);
let runtime = Arc::new(Runtime::adhoc_instance(thread_nums, &thread_name)?);
let runtime = Arc::new(Runtime::with_worker_threads(thread_nums, &thread_name)?);

// 2. spawn all the tasks to the runtime with semaphore.
let join_handlers = runtime.try_spawn_batch(semaphore, futures).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/common/base/tests/it/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn test_runtime() -> Result<()> {
let rt1 = Runtime::with_default_worker_threads().unwrap();
let rt1_counter = Arc::clone(&runtime_counter);
let rt1_header = rt1.spawn(GLOBAL_TASK, async move {
let rt2 = Runtime::with_worker_threads(1, None).unwrap();
let rt2 = Runtime::with_worker_threads(1, "rt2").unwrap();
let rt2_counter = Arc::clone(&rt1_counter);
let rt2_header = rt2.spawn(GLOBAL_TASK, async move {
let rt3 = Runtime::with_default_worker_threads().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/common/base/tests/it/runtime_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ async fn test_async_thread_tracker() -> Result<()> {
let (out_tx, out_rx) = async_channel::bounded(10);
let (inner_tx, inner_rx) = async_channel::bounded(10);

let outer_runtime = Runtime::with_worker_threads(2, Some(String::from("Outer")))?;
let inner_runtime = Runtime::with_worker_threads(2, Some(String::from("Inner")))?;
let outer_runtime = Runtime::with_worker_threads(2, "Outer")?;
let inner_runtime = Runtime::with_worker_threads(2, "Inner")?;

let memory_tracker = MemStat::create("test_async_thread_tracker".to_string());
let inner_join_handler = inner_runtime.spawn(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/client/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl MetaGrpcClient {

let mgr = MetaChannelManager { timeout, conf };

let rt = Runtime::adhoc_instance(1, "meta-client-rt").map_err(|e| {
let rt = Runtime::with_worker_threads(1, "meta-client-rt").map_err(|e| {
MetaClientError::ClientRuntimeError(
AnyError::new(&e).add_context(|| "when creating meta-client"),
)
Expand Down
34 changes: 21 additions & 13 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,27 @@ impl DataExchangeManager {
let config = GlobalConfig::instance();
let address = address.to_string();
let task = async move {
match config.tls_query_cli_enabled() {
true => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(
address.to_owned(),
None,
Some(config.query.to_rpc_client_tls_config()),
)
.await?,
))),
false => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
))),
}
let tls_cfg = match config.tls_query_cli_enabled() {
true => Some(config.query.to_rpc_client_tls_config()),
false => None,
};
Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address, None, tls_cfg).await?,
)))

// match config.tls_query_cli_enabled() {
// true => Ok(FlightClient::new(FlightServiceClient::new(
// ConnectionFactory::create_rpc_channel(
// address.to_owned(),
// None,
// Some(config.query.to_rpc_client_tls_config()),
// )
// .await?,
// )), use_current_rt),
// false => Ok(FlightClient::new(FlightServiceClient::new(
// ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
// )), use_current_rt),
//}
};
if use_current_rt {
task.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl StatisticsReceiver {
) -> Result<StatisticsReceiver> {
let (shutdown_tx, _shutdown_rx) = channel(2);
let mut exchange_handler = Vec::with_capacity(statistics_exchanges.len());
let runtime = Runtime::adhoc_instance(2, "StatisticsReceiver")?;
let runtime = Runtime::with_worker_threads(2, "StatisticsReceiver")?;

for (_source, exchange) in statistics_exchanges.into_iter() {
let rx = exchange.convert_to_receiver();
Expand Down
23 changes: 13 additions & 10 deletions src/query/service/src/api/rpc/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ use common_arrow::arrow_format::flight::service::flight_service_client::FlightSe
use common_base::base::tokio;
use common_base::base::tokio::sync::Notify;
use common_base::base::tokio::time::Duration;
use common_base::runtime::GlobalIORuntime;
use common_base::runtime::TrySpawn;
use common_exception::ErrorCode;
use common_exception::Result;
use futures::StreamExt;
use futures_util::future::Either;
use log::warn;
use tonic::transport::channel::Channel;
use tonic::Request;
use tonic::Status;
Expand All @@ -49,8 +48,8 @@ impl Drop for FlightClient {
fn drop(&mut self) {
let drop_thread_name = std::thread::current().name().map(ToOwned::to_owned);
if drop_thread_name != self.create_thread_name {
eprintln!(
" (drop) NOT MATCH {:?} {:?}",
warn!(
"FlightClient dropped in different rt, drop-thread: {:?}, create-thread: {:?}",
drop_thread_name, self.create_thread_name
);
}
Expand All @@ -62,7 +61,10 @@ impl FlightClient {
fn check_rt(&self) {
let current = std::thread::current().name().map(ToOwned::to_owned);
if current != self.create_thread_name {
eprintln!(" NOT MATCH {:?} {:?}", current, self.create_thread_name);
warn!(
" FlightClient used in different rt, current thread : {:?}, create-thread: {:?}",
current, self.create_thread_name
);
}
}
pub fn new(mut inner: FlightServiceClient<Channel>) -> FlightClient {
Expand Down Expand Up @@ -115,7 +117,6 @@ impl FlightClient {
target: &str,
fragment: usize,
) -> Result<FlightExchange> {
self.check_rt();
let request = RequestBuilder::create(Ticket::default())
.with_metadata("x-type", "exchange_fragment")?
.with_metadata("x-target", target)?
Expand All @@ -131,13 +132,12 @@ impl FlightClient {
}

fn streaming_receiver(
query_id: &str,
_query_id: &str,
mut streaming: Streaming<FlightData>,
) -> (Arc<Notify>, Receiver<Result<FlightData>>) {
let (tx, rx) = async_channel::bounded(1);
let notify = Arc::new(tokio::sync::Notify::new());
// GlobalIORuntime::instance().spawn(query_id, {
tokio::spawn({
let fut = {
let notify = notify.clone();
async move {
let mut notified = Box::pin(notify.notified());
Expand Down Expand Up @@ -170,8 +170,11 @@ impl FlightClient {
drop(streaming);
tx.close();
}
});
};

// TODO: shall we make this configurable?
// GlobalIORuntime::instance().spawn(query_id, fut);
tokio::spawn(fut);
(notify, rx)
}

Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use common_base::base::GlobalInstance;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/pipelines/builders/builder_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl PipelineBuilder {
self.main_pipeline.set_on_init(move || {
let ctx_clone = ctx.clone();
let (partitions, info) =
Runtime::adhoc_instance(2, "mutation_block_pruning")?.block_on(async move {
Runtime::with_worker_threads(2, "mutation_block_pruning")?.block_on(async move {
table_clone
.do_mutation_block_pruning(
ctx_clone,
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl Server for MySQLHandler {
match self.abort_registration.take() {
None => Err(ErrorCode::Internal("MySQLHandler already running.")),
Some(registration) => {
let rejected_rt = Arc::new(Runtime::adhoc_instance(1, "mysql-handler")?);
let rejected_rt = Arc::new(Runtime::with_worker_threads(1, "mysql-handler")?);
let (stream, listener) = Self::listener_tcp(listening).await?;
let stream = Abortable::new(stream, registration);
self.join_handle = Some(tokio::spawn(
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/servers/mysql/mysql_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl MySQLConnection {
MySQLConnection::attach_session(&session, &blocking_stream)?;

let non_blocking_stream = TcpStream::from_std(blocking_stream)?;
let query_executor = Runtime::adhoc_instance(1, "mysql-query-executor")?;
let query_executor = Runtime::with_worker_threads(1, "mysql-query-executor")?;
Thread::spawn(move || {
let join_handle = query_executor.spawn(GLOBAL_TASK, async move {
let client_addr = match non_blocking_stream.peer_addr() {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl QueryContextShared {
Some(query_runtime) => Ok(query_runtime.clone()),
None => {
// To avoid possible deadlock, we should keep at least two threads.
let runtime = Arc::new(Runtime::adhoc_instance(2, "query-ctx")?);
let runtime = Arc::new(Runtime::with_worker_threads(2, "query-ctx")?);
*query_runtime = Some(runtime.clone());
Ok(runtime)
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn test_rejected_session_with_parallel() -> Result<()> {
let start_barriers = Arc::new(Barrier::new(3));
let destroy_barriers = Arc::new(Barrier::new(3));

let runtime = Runtime::adhoc_instance(2, "connect_server")?;
let runtime = Runtime::with_worker_threads(2, "connect_server")?;
let mut join_handlers = Vec::with_capacity(3);
for _ in 0..3 {
let port = listening.port();
Expand Down
7 changes: 4 additions & 3 deletions src/query/storages/fuse/src/operations/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ impl FuseTable {
pipeline.set_on_init(move || {
let ctx = query_ctx.clone();
let column_ids = column_ids.clone();
let partitions =
Runtime::adhoc_instance(2, "build_compact_task")?.block_on(async move {
let partitions = Runtime::with_worker_threads(2, "build_compact_task")?.block_on(
async move {
let partitions = BlockCompactMutator::build_compact_tasks(
ctx.clone(),
column_ids,
Expand All @@ -149,7 +149,8 @@ impl FuseTable {
.await?;

Result::<_, ErrorCode>::Ok(partitions)
})?;
},
)?;

let partitions = Partitions::create_nolazy(PartitionsShuffleKind::Mod, partitions);
query_ctx.set_partitions(partitions)?;
Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/operations/read_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ impl FuseTable {
let push_downs = push_downs.clone();
// let lazy_init_segments = lazy_init_segments.clone();

let partitions =
Runtime::adhoc_instance(2, "prune_snapshot_blocks")?.block_on(async move {
let partitions = Runtime::with_worker_threads(2, "prune_snapshot_blocks")?
.block_on(async move {
let (_statistics, partitions) = table
.prune_snapshot_blocks(
ctx,
Expand Down
3 changes: 2 additions & 1 deletion src/query/storages/fuse/src/pruning/fuse_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ impl PruningContext {
let max_threads = ctx.get_settings().get_max_threads()? as usize;

// Pruning runtime.
let pruning_runtime = Arc::new(Runtime::adhoc_instance(max_threads, "pruning-worker")?);
let pruning_runtime =
Arc::new(Runtime::with_worker_threads(max_threads, "pruning-worker")?);
let pruning_semaphore = Arc::new(Semaphore::new(max_concurrency));
let pruning_stats = Arc::new(FusePruningStatistics::default());

Expand Down

0 comments on commit e670fbe

Please sign in to comment.