Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster): disable forward warehouse request in self-managed cluster #17176

Merged
merged 6 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ impl KubernetesResourcesManagement {

#[async_trait::async_trait]
impl ResourcesManagement for KubernetesResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
false
}

async fn init_node(&self, _: &mut NodeInfo) -> Result<()> {
Err(ErrorCode::Unimplemented(
"Unimplemented kubernetes resources management",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl SelfManagedResourcesManagement {

#[async_trait::async_trait]
impl ResourcesManagement for SelfManagedResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
false
}

async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
let config = GlobalConfig::instance();
node.cluster_id = config.query.cluster_id.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ impl SystemResourcesManagement {

#[async_trait::async_trait]
impl ResourcesManagement for SystemResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
true
}

async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
let config = GlobalConfig::instance();
assert!(config.query.cluster_id.is_empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use databend_common_meta_types::NodeType;

#[async_trait::async_trait]
pub trait ResourcesManagement: Sync + Send + 'static {
fn support_forward_warehouse_request(&self) -> bool;

async fn init_node(&self, node: &mut NodeInfo) -> Result<()>;

async fn create_warehouse(&self, name: String, nodes: Vec<SelectedNode>) -> Result<()>;
Expand Down Expand Up @@ -77,6 +79,10 @@ pub struct DummyResourcesManagement;

#[async_trait::async_trait]
impl ResourcesManagement for DummyResourcesManagement {
fn support_forward_warehouse_request(&self) -> bool {
false
}

async fn init_node(&self, node: &mut NodeInfo) -> Result<()> {
let config = GlobalConfig::instance();
node.cluster_id = config.query.cluster_id.clone();
Expand Down
81 changes: 44 additions & 37 deletions src/query/service/src/servers/http/middleware/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_base::headers::HEADER_DEDUPLICATE_LABEL;
use databend_common_base::headers::HEADER_NODE_ID;
use databend_common_base::headers::HEADER_QUERY_ID;
Expand All @@ -30,6 +31,7 @@ use databend_common_exception::Result;
use databend_common_meta_app::principal::user_token::TokenType;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::NodeInfo;
use databend_enterprise_resources_management::ResourcesManagement;
use fastrace::func_name;
use headers::authorization::Basic;
use headers::authorization::Bearer;
Expand Down Expand Up @@ -537,46 +539,51 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
};
}
} else if let Some(warehouse) = headers.get(HEADER_WAREHOUSE) {
req.headers_mut().remove(HEADER_WAREHOUSE);

let warehouse = warehouse
.to_str()
.map_err(|e| {
HttpErrorCode::bad_request(ErrorCode::BadArguments(format!(
"Invalid Header ({HEADER_WAREHOUSE}: {warehouse:?}): {e}"
)))
})?
.to_string();

let cluster_discovery = ClusterDiscovery::instance();

let forward_node = cluster_discovery.find_node_by_warehouse(&warehouse).await;

match forward_node {
Err(error) => {
return Err(HttpErrorCode::server_error(error).into());
}
Ok(None) => {
let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse <new_warehouse>`", warehouse);
warn!("{}", msg);
return Err(Error::from(HttpErrorCode::bad_request(
ErrorCode::UnknownWarehouse(msg),
)));
}
Ok(Some(node)) => {
let local_id = GlobalConfig::instance().query.node_id.clone();
if node.id != local_id {
log::info!(
"forwarding /v1{} from {} to warehouse {}({})",
req.uri(),
local_id,
warehouse,
node.id
);
return forward_request(req, node).await;
let resources_management = GlobalInstance::get::<Arc<dyn ResourcesManagement>>();
if resources_management.support_forward_warehouse_request() {
req.headers_mut().remove(HEADER_WAREHOUSE);

let warehouse = warehouse
.to_str()
.map_err(|e| {
HttpErrorCode::bad_request(ErrorCode::BadArguments(format!(
"Invalid Header ({HEADER_WAREHOUSE}: {warehouse:?}): {e}"
zhang2014 marked this conversation as resolved.
Show resolved Hide resolved
)))
})?
.to_string();

let cluster_discovery = ClusterDiscovery::instance();

let forward_node = cluster_discovery.find_node_by_warehouse(&warehouse).await;

match forward_node {
Err(error) => {
return Err(HttpErrorCode::server_error(error).into());
zhang2014 marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(None) => {
let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse <new_warehouse>`", warehouse);
warn!("{}", msg);
return Err(Error::from(HttpErrorCode::bad_request(
ErrorCode::UnknownWarehouse(msg),
)));
}
Ok(Some(node)) => {
let local_id = GlobalConfig::instance().query.node_id.clone();
zhang2014 marked this conversation as resolved.
Show resolved Hide resolved
if node.id != local_id {
log::info!(
"forwarding /v1{} from {} to warehouse {}({})",
req.uri(),
local_id,
warehouse,
node.id
);
return forward_request(req, node).await;
}
}
}
}

log::warn!("Ignore header ({HEADER_WAREHOUSE}: {warehouse:?})");
zhang2014 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading