From f8e205c5019c6f478ecddcd7f2f777d013603ace Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 1 Dec 2023 16:56:13 +0800 Subject: [PATCH] chore: new setting "create_rpc_client_with_current_rt" --- .../src/api/rpc/exchange/exchange_manager.rs | 48 ++++++++++++------- .../src/api/rpc/packets/packet_publisher.rs | 3 ++ .../fragments/query_fragment_actions.rs | 3 ++ .../it/storages/testdata/settings_table.txt | 1 + src/query/settings/src/settings_default.rs | 6 +++ .../settings/src/settings_getter_setter.rs | 4 ++ 6 files changed, 49 insertions(+), 16 deletions(-) diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 2d7eae543ac54..5257f6a85a435 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -22,7 +22,10 @@ use async_channel::Receiver; use common_arrow::arrow_format::flight::data::FlightData; use common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient; use common_base::base::GlobalInstance; +use common_base::runtime::GlobalIORuntime; use common_base::runtime::Thread; +use common_base::runtime::TrySpawn; +use common_base::GLOBAL_TASK; use common_config::GlobalConfig; use common_exception::ErrorCode; use common_exception::Result; @@ -122,10 +125,13 @@ impl DataExchangeManager { let target = &packet.executor.id; + let create_rpc_client_with_current_rt = packet.create_rpc_clint_with_current_rt; + for connection_info in &packet.fragment_connections_info { for fragment in &connection_info.fragments { let address = &connection_info.source.flight_address; - let mut flight_client = Self::create_client(address).await?; + let mut flight_client = + Self::create_client(address, create_rpc_client_with_current_rt).await?; targets_exchanges.insert( (connection_info.source.id.clone(), *fragment), @@ -138,7 +144,8 @@ impl DataExchangeManager { for connection_info in &packet.statistics_connections_info { let address = &connection_info.source.flight_address; - let mut flight_client = Self::create_client(address).await?; + let mut flight_client = + Self::create_client(address, create_rpc_client_with_current_rt).await?; request_exchanges.insert( connection_info.source.id.clone(), flight_client @@ -165,22 +172,31 @@ impl DataExchangeManager { } #[async_backtrace::framed] - pub async fn create_client(address: &str) -> Result { + pub async fn create_client(address: &str, use_current_rt: bool) -> Result { let config = GlobalConfig::instance(); let address = address.to_string(); - - match config.tls_query_cli_enabled() { - true => Ok(FlightClient::new(FlightServiceClient::new( - ConnectionFactory::create_rpc_channel( - address.to_owned(), - None, - Some(config.query.to_rpc_client_tls_config()), - ) - .await?, - ))), - false => Ok(FlightClient::new(FlightServiceClient::new( - ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?, - ))), + let task = async move { + match config.tls_query_cli_enabled() { + true => Ok(FlightClient::new(FlightServiceClient::new( + ConnectionFactory::create_rpc_channel( + address.to_owned(), + None, + Some(config.query.to_rpc_client_tls_config()), + ) + .await?, + ))), + false => Ok(FlightClient::new(FlightServiceClient::new( + ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?, + ))), + } + }; + if use_current_rt { + task.await + } else { + GlobalIORuntime::instance() + .spawn(GLOBAL_TASK, task) + .await + .expect("create client future must be joined successfully") } } diff --git a/src/query/service/src/api/rpc/packets/packet_publisher.rs b/src/query/service/src/api/rpc/packets/packet_publisher.rs index e6ad857a394db..134da838d56da 100644 --- a/src/query/service/src/api/rpc/packets/packet_publisher.rs +++ b/src/query/service/src/api/rpc/packets/packet_publisher.rs @@ -41,6 +41,7 @@ pub struct InitNodesChannelPacket { pub executor: Arc, pub fragment_connections_info: Vec, pub statistics_connections_info: Vec, + pub create_rpc_clint_with_current_rt: bool, } impl InitNodesChannelPacket { @@ -49,12 +50,14 @@ impl InitNodesChannelPacket { executor: Arc, fragment_connections_info: Vec, statistics_connections_info: Vec, + create_rpc_clint_with_current_rt: bool, ) -> InitNodesChannelPacket { InitNodesChannelPacket { query_id, executor, fragment_connections_info, statistics_connections_info, + create_rpc_clint_with_current_rt, } } } diff --git a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs index 6ee139a6d2fdf..11fe12ac88c3f 100644 --- a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs +++ b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs @@ -250,6 +250,9 @@ impl QueryFragmentsActions { true => statistics_connections.clone(), false => vec![], }, + self.ctx + .get_settings() + .get_create_query_flight_client_with_current_rt()?, )); } diff --git a/src/query/service/tests/it/storages/testdata/settings_table.txt b/src/query/service/tests/it/storages/testdata/settings_table.txt index 362d12855a62d..40764fd34937b 100644 --- a/src/query/service/tests/it/storages/testdata/settings_table.txt +++ b/src/query/service/tests/it/storages/testdata/settings_table.txt @@ -6,6 +6,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System +------------------------------------------------+----------------+----------------+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+ | 'acquire_lock_timeout' | '15' | '15' | 'SESSION' | 'Sets the maximum timeout in seconds for acquire a lock.' | 'UInt64' | | 'collation' | 'binary' | 'binary' | 'SESSION' | 'Sets the character collation. Available values include "binary" and "utf8".' | 'String' | +| 'create_query_flight_client_with_current_rt' | '1' | '1' | 'SESSION' | 'create query flight client with current runtime' | 'UInt64' | | 'ddl_column_type_nullable' | '1' | '1' | 'SESSION' | 'If columns are default nullable when create or alter table' | 'UInt64' | | 'disable_join_reorder' | '0' | '0' | 'SESSION' | 'Disable join reorder optimization.' | 'UInt64' | | 'efficiently_memory_group_by' | '0' | '0' | 'SESSION' | 'Memory is used efficiently, but this may cause performance degradation.' | 'UInt64' | diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index a9c0c49c599fd..03fc6ddc181af 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -525,6 +525,12 @@ impl DefaultSettings { possible_values: None, mode: SettingMode::Both, }), + ("create_query_flight_client_with_current_rt", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "create query flight client with current runtime", + possible_values: None, + mode: SettingMode::Both, + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index b8a29cb9e9eef..8ca95354081aa 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -491,4 +491,8 @@ impl Settings { pub fn get_external_server_request_timeout_secs(&self) -> Result { self.try_get_u64("external_server_request_timeout_secs") } + + pub fn get_create_query_flight_client_with_current_rt(&self) -> Result { + Ok(self.try_get_u64("create_query_flight_client_with_current_rt")? != 0) + } }