Skip to content

Commit

Permalink
chore: new setting "create_rpc_client_with_current_rt"
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Dec 1, 2023
1 parent df3e22f commit f0a5f13
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 18 deletions.
52 changes: 35 additions & 17 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use async_channel::Receiver;
use common_arrow::arrow_format::flight::data::FlightData;
use common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient;
use common_base::base::GlobalInstance;
use common_base::runtime::GlobalIORuntime;
use common_base::runtime::Thread;
use common_base::runtime::TrySpawn;
use common_base::GLOBAL_TASK;
use common_config::GlobalConfig;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -116,7 +119,11 @@ impl DataExchangeManager {
// Create connections for cluster all nodes. We will push data through this connection.
#[async_backtrace::framed]
#[minitrace::trace]
pub async fn init_nodes_channel(&self, packet: &InitNodesChannelPacket) -> Result<()> {
pub async fn init_nodes_channel(
&self,
packet: &InitNodesChannelPacket,
create_rpc_client_with_current_rt: bool,
) -> Result<()> {
let mut request_exchanges = HashMap::new();
let mut targets_exchanges = HashMap::new();

Expand All @@ -125,7 +132,8 @@ impl DataExchangeManager {
for connection_info in &packet.fragment_connections_info {
for fragment in &connection_info.fragments {
let address = &connection_info.source.flight_address;
let mut flight_client = Self::create_client(address).await?;
let mut flight_client =
Self::create_client(address, create_rpc_client_with_current_rt).await?;

targets_exchanges.insert(
(connection_info.source.id.clone(), *fragment),
Expand All @@ -138,7 +146,8 @@ impl DataExchangeManager {

for connection_info in &packet.statistics_connections_info {
let address = &connection_info.source.flight_address;
let mut flight_client = Self::create_client(address).await?;
let mut flight_client =
Self::create_client(address, create_rpc_client_with_current_rt).await?;
request_exchanges.insert(
connection_info.source.id.clone(),
flight_client
Expand All @@ -165,22 +174,31 @@ impl DataExchangeManager {
}

#[async_backtrace::framed]
pub async fn create_client(address: &str) -> Result<FlightClient> {
pub async fn create_client(address: &str, use_current_rt: bool) -> Result<FlightClient> {
let config = GlobalConfig::instance();
let address = address.to_string();

match config.tls_query_cli_enabled() {
true => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(
address.to_owned(),
None,
Some(config.query.to_rpc_client_tls_config()),
)
.await?,
))),
false => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
))),
let task = async move {
match config.tls_query_cli_enabled() {
true => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(
address.to_owned(),
None,
Some(config.query.to_rpc_client_tls_config()),
)
.await?,
))),
false => Ok(FlightClient::new(FlightServiceClient::new(
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
))),
}
};
if use_current_rt {
task.await
} else {
GlobalIORuntime::instance()
.spawn(GLOBAL_TASK, task)
.await
.expect("create client future must be joined successfully")
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/query/service/src/api/rpc/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,14 @@ impl FlightService for DatabendQueryFlightService {
FlightResult { body: vec![] }
}
FlightAction::InitNodesChannel(init_nodes_channel) => {
let config = GlobalConfig::instance();
let settings = Settings::create(config.query.tenant_id.clone());
let publisher_packet = &init_nodes_channel.init_nodes_channel_packet;
if let Err(cause) = DataExchangeManager::instance()
.init_nodes_channel(publisher_packet)
.init_nodes_channel(
publisher_packet,
settings.get_create_query_flight_client_with_current_rt()?,
)
.await
{
let query_id = &init_nodes_channel.init_nodes_channel_packet.query_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
+------------------------------------------------+----------------+----------------+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
| 'acquire_lock_timeout' | '15' | '15' | 'SESSION' | 'Sets the maximum timeout in seconds for acquire a lock.' | 'UInt64' |
| 'collation' | 'binary' | 'binary' | 'SESSION' | 'Sets the character collation. Available values include "binary" and "utf8".' | 'String' |
| 'create_query_flight_client_with_current_rt' | '1' | '1' | 'SESSION' | 'create query flight client with current runtime' | 'UInt64' |
| 'ddl_column_type_nullable' | '1' | '1' | 'SESSION' | 'If columns are default nullable when create or alter table' | 'UInt64' |
| 'disable_join_reorder' | '0' | '0' | 'SESSION' | 'Disable join reorder optimization.' | 'UInt64' |
| 'efficiently_memory_group_by' | '0' | '0' | 'SESSION' | 'Memory is used efficiently, but this may cause performance degradation.' | 'UInt64' |
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,12 @@ impl DefaultSettings {
possible_values: None,
mode: SettingMode::Both,
}),
("create_query_flight_client_with_current_rt", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "create query flight client with current runtime",
possible_values: None,
mode: SettingMode::Both,
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,8 @@ impl Settings {
pub fn get_external_server_request_timeout_secs(&self) -> Result<u64> {
self.try_get_u64("external_server_request_timeout_secs")
}

pub fn get_create_query_flight_client_with_current_rt(&self) -> Result<bool> {
Ok(self.try_get_u64("create_query_flight_client_with_current_rt")? != 0)
}
}

0 comments on commit f0a5f13

Please sign in to comment.