Skip to content

Commit

Permalink
fix(cluster): disable forward warehouse request in self-managed clust…
Browse files Browse the repository at this point in the history
…er (#17176)

* fix(cluster): disable forward warehouse request in self-managed cluster

* fix(cluster): disable forward warehouse request in self-managed cluster

* fix(cluster): disable forward warehouse request in self-managed cluster

* fix(cluster): disable forward warehouse request in self-managed cluster
  • Loading branch information
zhang2014 authored Jan 6, 2025
1 parent 9c56abd commit f6742b5
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ impl KubernetesResourcesManagement {

#[async_trait::async_trait]
impl ResourcesManagement for KubernetesResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
false
}

async fn init_node(&self, _: &mut NodeInfo) -> Result<()> {
Err(ErrorCode::Unimplemented(
"Unimplemented kubernetes resources management",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl SelfManagedResourcesManagement {

#[async_trait::async_trait]
impl ResourcesManagement for SelfManagedResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
false
}

async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
let config = GlobalConfig::instance();
node.cluster_id = config.query.cluster_id.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ impl SystemResourcesManagement {

#[async_trait::async_trait]
impl ResourcesManagement for SystemResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
true
}

async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
let config = GlobalConfig::instance();
assert!(config.query.cluster_id.is_empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use databend_common_meta_types::NodeType;

#[async_trait::async_trait]
pub trait ResourcesManagement: Sync + Send + 'static {
fn support_forward_warehouse_request(&self) -> bool;

async fn init_node(&self, node: &mut NodeInfo) -> Result<()>;

async fn create_warehouse(&self, name: String, nodes: Vec<SelectedNode>) -> Result<()>;
Expand Down Expand Up @@ -77,6 +79,10 @@ pub struct DummyResourcesManagement;

#[async_trait::async_trait]
impl ResourcesManagement for DummyResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
false
}

async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
let config = GlobalConfig::instance();
node.cluster_id = config.query.cluster_id.clone();
Expand Down
84 changes: 47 additions & 37 deletions src/query/service/src/servers/http/middleware/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_base::headers::HEADER_DEDUPLICATE_LABEL;
use databend_common_base::headers::HEADER_NODE_ID;
use databend_common_base::headers::HEADER_QUERY_ID;
Expand All @@ -30,6 +31,7 @@ use databend_common_exception::Result;
use databend_common_meta_app::principal::user_token::TokenType;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::NodeInfo;
use databend_enterprise_resources_management::ResourcesManagement;
use fastrace::func_name;
use headers::authorization::Basic;
use headers::authorization::Bearer;
Expand Down Expand Up @@ -537,46 +539,54 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
};
}
} else if let Some(warehouse) = headers.get(HEADER_WAREHOUSE) {
req.headers_mut().remove(HEADER_WAREHOUSE);

let warehouse = warehouse
.to_str()
.map_err(|e| {
HttpErrorCode::bad_request(ErrorCode::BadArguments(format!(
"Invalid Header ({HEADER_WAREHOUSE}: {warehouse:?}): {e}"
)))
})?
.to_string();

let cluster_discovery = ClusterDiscovery::instance();

let forward_node = cluster_discovery.find_node_by_warehouse(&warehouse).await;

match forward_node {
Err(error) => {
return Err(HttpErrorCode::server_error(error).into());
}
Ok(None) => {
let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse <new_warehouse>`", warehouse);
warn!("{}", msg);
return Err(Error::from(HttpErrorCode::bad_request(
ErrorCode::UnknownWarehouse(msg),
)));
}
Ok(Some(node)) => {
let local_id = GlobalConfig::instance().query.node_id.clone();
if node.id != local_id {
log::info!(
"forwarding /v1{} from {} to warehouse {}({})",
req.uri(),
local_id,
warehouse,
node.id
);
return forward_request(req, node).await;
let resources_management = GlobalInstance::get::<Arc<dyn ResourcesManagement>>();
if resources_management.support_forward_warehouse_request() {
req.headers_mut().remove(HEADER_WAREHOUSE);

let warehouse = warehouse
.to_str()
.map_err(|e| {
HttpErrorCode::bad_request(ErrorCode::BadArguments(format!(
"Invalid value for header ({HEADER_WAREHOUSE}: {warehouse:?}): {e}"
)))
})?
.to_string();

let cluster_discovery = ClusterDiscovery::instance();

let forward_node = cluster_discovery.find_node_by_warehouse(&warehouse).await;

match forward_node {
Err(error) => {
return Err(HttpErrorCode::server_error(
error.add_message_back("(while in warehouse request forward)"),
)
.into());
}
Ok(None) => {
let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse <new_warehouse>`", warehouse);
warn!("{}", msg);
return Err(Error::from(HttpErrorCode::bad_request(
ErrorCode::UnknownWarehouse(msg),
)));
}
Ok(Some(node)) => {
let local_id = GlobalConfig::instance().query.node_id.clone();
if node.id != local_id {
log::info!(
"forwarding /v1{} from {} to warehouse {}({})",
req.uri(),
local_id,
warehouse,
node.id
);
return forward_request(req, node).await;
}
}
}
}

log::warn!("Ignore header ({HEADER_WAREHOUSE}: {warehouse:?})");
}
}

Expand Down

0 comments on commit f6742b5

Please sign in to comment.