Skip to content

Commit

Permalink
chore: new setting "create_rpc_client_with_current_rt"
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Dec 1, 2023
1 parent 6a5a9dc commit f8e205c
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 16 deletions.
48 changes: 32 additions & 16 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use async_channel::Receiver;
use common_arrow::arrow_format::flight::data::FlightData;
use common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient;
use common_base::base::GlobalInstance;
use common_base::runtime::GlobalIORuntime;
use common_base::runtime::Thread;
use common_base::runtime::TrySpawn;
use common_base::GLOBAL_TASK;
use common_config::GlobalConfig;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -122,10 +125,13 @@ impl DataExchangeManager {

let target = &packet.executor.id;

let create_rpc_client_with_current_rt = packet.create_rpc_clint_with_current_rt;

for connection_info in &packet.fragment_connections_info {
for fragment in &connection_info.fragments {
let address = &connection_info.source.flight_address;
let mut flight_client = Self::create_client(address).await?;
let mut flight_client =
Self::create_client(address, create_rpc_client_with_current_rt).await?;

targets_exchanges.insert(
(connection_info.source.id.clone(), *fragment),
Expand All @@ -138,7 +144,8 @@ impl DataExchangeManager {

for connection_info in &packet.statistics_connections_info {
let address = &connection_info.source.flight_address;
let mut flight_client = Self::create_client(address).await?;
let mut flight_client =
Self::create_client(address, create_rpc_client_with_current_rt).await?;
request_exchanges.insert(
connection_info.source.id.clone(),
flight_client
Expand All @@ -165,22 +172,31 @@ impl DataExchangeManager {
}

#[async_backtrace::framed]
pub async fn create_client(address: &str) -> Result<FlightClient> {
pub async fn create_client(address: &str, use_current_rt: bool) -> Result<FlightClient> {
let config = GlobalConfig::instance();
let address = address.to_string();

match config.tls_query_cli_enabled() {
true => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(
address.to_owned(),
None,
Some(config.query.to_rpc_client_tls_config()),
)
.await?,
))),
false => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
))),
let task = async move {
match config.tls_query_cli_enabled() {
true => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(
address.to_owned(),
None,
Some(config.query.to_rpc_client_tls_config()),
)
.await?,
))),
false => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
))),
}
};
if use_current_rt {
task.await
} else {
GlobalIORuntime::instance()
.spawn(GLOBAL_TASK, task)
.await
.expect("create client future must be joined successfully")
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/api/rpc/packets/packet_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct InitNodesChannelPacket {
pub executor: Arc<NodeInfo>,
pub fragment_connections_info: Vec<ConnectionInfo>,
pub statistics_connections_info: Vec<ConnectionInfo>,
pub create_rpc_clint_with_current_rt: bool,
}

impl InitNodesChannelPacket {
Expand All @@ -49,12 +50,14 @@ impl InitNodesChannelPacket {
executor: Arc<NodeInfo>,
fragment_connections_info: Vec<ConnectionInfo>,
statistics_connections_info: Vec<ConnectionInfo>,
create_rpc_clint_with_current_rt: bool,
) -> InitNodesChannelPacket {
InitNodesChannelPacket {
query_id,
executor,
fragment_connections_info,
statistics_connections_info,
create_rpc_clint_with_current_rt,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ impl QueryFragmentsActions {
true => statistics_connections.clone(),
false => vec![],
},
self.ctx
.get_settings()
.get_create_query_flight_client_with_current_rt()?,
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
+------------------------------------------------+----------------+----------------+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
| 'acquire_lock_timeout' | '15' | '15' | 'SESSION' | 'Sets the maximum timeout in seconds for acquire a lock.' | 'UInt64' |
| 'collation' | 'binary' | 'binary' | 'SESSION' | 'Sets the character collation. Available values include "binary" and "utf8".' | 'String' |
| 'create_query_flight_client_with_current_rt' | '1' | '1' | 'SESSION' | 'create query flight client with current runtime' | 'UInt64' |
| 'ddl_column_type_nullable' | '1' | '1' | 'SESSION' | 'If columns are default nullable when create or alter table' | 'UInt64' |
| 'disable_join_reorder' | '0' | '0' | 'SESSION' | 'Disable join reorder optimization.' | 'UInt64' |
| 'efficiently_memory_group_by' | '0' | '0' | 'SESSION' | 'Memory is used efficiently, but this may cause performance degradation.' | 'UInt64' |
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,12 @@ impl DefaultSettings {
possible_values: None,
mode: SettingMode::Both,
}),
("create_query_flight_client_with_current_rt", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "create query flight client with current runtime",
possible_values: None,
mode: SettingMode::Both,
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,8 @@ impl Settings {
pub fn get_external_server_request_timeout_secs(&self) -> Result<u64> {
self.try_get_u64("external_server_request_timeout_secs")
}

pub fn get_create_query_flight_client_with_current_rt(&self) -> Result<bool> {
Ok(self.try_get_u64("create_query_flight_client_with_current_rt")? != 0)
}
}

0 comments on commit f8e205c

Please sign in to comment.