diff --git a/Cargo.lock b/Cargo.lock index 66631453445f1..63f6d7e0d9600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4739,6 +4739,7 @@ dependencies = [ "rustls-pemfile", "rustls-pki-types", "rustyline", + "semver", "serde", "serde_json", "serde_urlencoded", diff --git a/scripts/ci/deploy/config/databend-query-node-1.toml b/scripts/ci/deploy/config/databend-query-node-1.toml index 1ba43c61df10d..4cfc000c9a9db 100644 --- a/scripts/ci/deploy/config/databend-query-node-1.toml +++ b/scripts/ci/deploy/config/databend-query-node-1.toml @@ -54,11 +54,11 @@ auth_type = "no_password" # name = "admin" # auth_type = "no_password" -# [[query.users]] -# name = "databend" -# auth_type = "double_sha1_password" -# # echo -n "databend" | sha1sum | cut -d' ' -f1 | xxd -r -p | sha1sum -# auth_string = "3081f32caef285c232d066033c89a78d88a6d8a5" + [[query.users]] + name = "databend" + auth_type = "double_sha1_password" + # echo -n "databend" | sha1sum | cut -d' ' -f1 | xxd -r -p | sha1sum + auth_string = "3081f32caef285c232d066033c89a78d88a6d8a5" # [[query.users]] # name = "datafuselabs" diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 1bb11b4d951b1..4460e2fe24b87 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -161,6 +161,7 @@ rustls = "0.22" rustls-pemfile = "2" rustls-pki-types = "1" rustyline = "14" +semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_urlencoded = "0.7.1" diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index e888ab6898014..58783aa6dd70f 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -53,20 +53,20 @@ const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID"; const HEADER_QUERY_STATE: &str = "X-DATABEND-QUERY-STATE"; const HEADER_QUERY_PAGE_ROWS: &str = "X-DATABEND-QUERY-PAGE-ROWS"; -pub fn make_page_uri(query_id: &str, page_no: usize) -> String { - format!("/v1/query/{}/page/{}", query_id, page_no) +pub fn make_page_uri(query_id: &str, node_id: &str, page_no: usize) -> String { + format!("/v1/query/{}/{}/page/{}", query_id, node_id, page_no) } -pub fn make_state_uri(query_id: &str) -> String { - format!("/v1/query/{}", query_id) +pub fn make_state_uri(query_id: &str, node_id: &str) -> String { + format!("/v1/query/{}/{}", query_id, node_id) } -pub fn make_final_uri(query_id: &str) -> String { - format!("/v1/query/{}/final", query_id) +pub fn make_final_uri(query_id: &str, node_id: &str) -> String { + format!("/v1/query/{}/{}/final", query_id, node_id) } -pub fn make_kill_uri(query_id: &str) -> String { - format!("/v1/query/{}/kill", query_id) +pub fn make_kill_uri(query_id: &str, node_id: &str) -> String { + format!("/v1/query/{}/{}/kill", query_id, node_id) } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -146,28 +146,31 @@ impl QueryResponse { r: HttpQueryResponseInternal, is_final: bool, ) -> impl IntoResponse { + let node_id = r.node_id.clone(); let state = r.state.clone(); let (data, next_uri) = if is_final { (JsonBlock::empty(), None) } else { match state.state { ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data { - None => (JsonBlock::empty(), Some(make_state_uri(&id))), + None => (JsonBlock::empty(), Some(make_state_uri(&id, &r.node_id))), Some(d) => { let uri = match d.next_page_no { - Some(n) => Some(make_page_uri(&id, n)), - None => Some(make_state_uri(&id)), + Some(n) => Some(make_page_uri(&id, &r.node_id, n)), + None => Some(make_state_uri(&id, &r.node_id)), }; (d.page.data, uri) } }, - ExecuteStateKind::Failed => (JsonBlock::empty(), Some(make_final_uri(&id))), + ExecuteStateKind::Failed => { + (JsonBlock::empty(), Some(make_final_uri(&id, &r.node_id))) + } ExecuteStateKind::Succeeded => match r.data { - None => (JsonBlock::empty(), Some(make_final_uri(&id))), + None => (JsonBlock::empty(), Some(make_final_uri(&id, &r.node_id))), Some(d) => { let uri = match d.next_page_no { - Some(n) => Some(make_page_uri(&id, n)), - None => Some(make_final_uri(&id)), + Some(n) => Some(make_page_uri(&id, &r.node_id, n)), + None => Some(make_final_uri(&id, &r.node_id)), }; (d.page.data, uri) } @@ -198,9 +201,9 @@ impl QueryResponse { warnings: r.state.warnings, id: id.clone(), next_uri, - stats_uri: Some(make_state_uri(&id)), - final_uri: Some(make_final_uri(&id)), - kill_uri: Some(make_kill_uri(&id)), + stats_uri: Some(make_state_uri(&id, &node_id)), + final_uri: Some(make_final_uri(&id, &node_id)), + kill_uri: Some(make_kill_uri(&id, &node_id)), error: r.state.error.map(QueryError::from_error_code), has_result_set: r.state.has_result_set, }) @@ -223,15 +226,16 @@ impl QueryResponse { #[poem::handler] async fn query_final_handler( ctx: &HttpQueryContext, - Path(query_id): Path, + Path((query_id, node_id)): Path<(String, String)>, ) -> PoemResult { + ctx.check_node_id(&node_id, &query_id)?; let root = get_http_tracing_span(full_name!(), ctx, &query_id); let _t = SlowRequestLogTracker::new(ctx); - async { info!( - "{}: got /v1/query/{}/final request, this query is going to be finally completed.", - query_id, query_id + "{}: got {} request, this query is going to be finally completed.", + query_id, + make_final_uri(&query_id, &node_id) ); let http_query_manager = HttpQueryManager::instance(); match http_query_manager @@ -260,15 +264,16 @@ async fn query_final_handler( #[poem::handler] async fn query_cancel_handler( ctx: &HttpQueryContext, - Path(query_id): Path, + Path((query_id, node_id)): Path<(String, String)>, ) -> PoemResult { + ctx.check_node_id(&node_id, &query_id)?; let root = get_http_tracing_span(full_name!(), ctx, &query_id); let _t = SlowRequestLogTracker::new(ctx); - async { info!( - "{}: got /v1/query/{}/kill request, cancel the query", - query_id, query_id + "{}: got {} request, cancel the query", + query_id, + make_kill_uri(&query_id, &node_id) ); let http_query_manager = HttpQueryManager::instance(); match http_query_manager @@ -290,10 +295,10 @@ async fn query_cancel_handler( #[poem::handler] async fn query_state_handler( ctx: &HttpQueryContext, - Path(query_id): Path, + Path((query_id, node_id)): Path<(String, String)>, ) -> PoemResult { + ctx.check_node_id(&node_id, &query_id)?; let root = get_http_tracing_span(full_name!(), ctx, &query_id); - async { let http_query_manager = HttpQueryManager::instance(); match http_query_manager.get_query(&query_id) { @@ -315,11 +320,11 @@ async fn query_state_handler( #[poem::handler] async fn query_page_handler( ctx: &HttpQueryContext, - Path((query_id, page_no)): Path<(String, usize)>, + Path((query_id, node_id, page_no)): Path<(String, String, usize)>, ) -> PoemResult { + ctx.check_node_id(&node_id, &query_id)?; let root = get_http_tracing_span(full_name!(), ctx, &query_id); let _t = SlowRequestLogTracker::new(ctx); - async { let http_query_manager = HttpQueryManager::instance(); match http_query_manager.get_query(&query_id) { @@ -352,7 +357,8 @@ pub(crate) async fn query_handler( let _t = SlowRequestLogTracker::new(ctx); async { - info!("http query new request: {:}", mask_connection_info(&format!("{:?}", req))); + let agent = ctx.user_agent.as_ref().map(|s|(format!("(from {s})"))).unwrap_or("".to_string()); + info!("http query new request{}: {:}", agent, mask_connection_info(&format!("{:?}", req))); let http_query_manager = HttpQueryManager::instance(); let sql = req.sql.clone(); @@ -397,14 +403,14 @@ pub fn query_route() -> Route { // Note: endpoints except /v1/query may change without notice, use uris in response instead let rules = [ ("/", post(query_handler)), - ("/:id", get(query_state_handler)), - ("/:id/page/:page_no", get(query_page_handler)), + ("/:query_id/:node_id", get(query_state_handler)), + ("/:query_id/:node_id/page/:page_no", get(query_page_handler)), ( - "/:id/kill", + "/:query_id/:node_id/kill", get(query_cancel_handler).post(query_cancel_handler), ), ( - "/:id/final", + "/:query_id/:node_id/final", get(query_final_handler).post(query_final_handler), ), ]; @@ -436,7 +442,7 @@ fn query_id_to_trace_id(query_id: &str) -> TraceId { } /// The HTTP query endpoints are expected to be responses within 60 seconds. -/// If it exceeds far of 60 seconds, there might be something wrong, we should +/// If it exceeds far from 60 seconds, there might be something wrong, we should /// log it. struct SlowRequestLogTracker { started_at: std::time::Instant, diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index f1a12748faabb..857750f06bef8 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -15,11 +15,14 @@ use std::collections::BTreeMap; use std::sync::Arc; +use http::StatusCode; +use log::warn; use poem::FromRequest; use poem::Request; use poem::RequestBody; -use poem::Result as PoemResult; +use time::Instant; +use crate::servers::http::v1::HttpQueryManager; use crate::sessions::Session; use crate::sessions::SessionManager; use crate::sessions::SessionType; @@ -101,11 +104,26 @@ impl HttpQueryContext { pub fn set_fail(&self) { self.session.txn_mgr().lock().set_fail(); } + + pub fn check_node_id(&self, node_id: &str, query_id: &str) -> poem::Result<()> { + if node_id != self.node_id { + let manager = HttpQueryManager::instance(); + let start_time = manager.server_info.start_time.clone(); + let uptime = (Instant::now() - manager.start_instant).as_seconds_f32(); + let msg = format!( + "route error: query {query_id} SHOULD be on server {node_id}, but current server is {}, which started at {start_time}({uptime} secs ago)", + self.node_id + ); + warn!("{msg}"); + return Err(poem::Error::from_string(msg, StatusCode::NOT_FOUND)); + } + Ok(()) + } } impl<'a> FromRequest<'a> for &'a HttpQueryContext { #[async_backtrace::framed] - async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult { + async fn from_request(req: &'a Request, _body: &mut RequestBody) -> poem::Result { Ok(req.extensions().get::().expect( "To use the `HttpQueryContext` extractor, the `HTTPSessionMiddleware` is required", )) diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs index 3afc2cec31134..6ca3bf8c54848 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs @@ -30,6 +30,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_storages_common_txn::TxnManagerRef; use parking_lot::Mutex; +use time::Instant; use tokio::task; use super::expiring_map::ExpiringMap; @@ -77,6 +78,7 @@ impl LimitedQueue { } pub struct HttpQueryManager { + pub(crate) start_instant: Instant, pub(crate) server_info: ServerInfo, #[allow(clippy::type_complexity)] pub(crate) queries: Arc>>, @@ -90,6 +92,7 @@ impl HttpQueryManager { #[async_backtrace::framed] pub async fn init(cfg: &InnerConfig) -> Result<()> { GlobalInstance::set(Arc::new(HttpQueryManager { + start_instant: Instant::now(), server_info: ServerInfo { id: cfg.query.node_id.clone(), start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Nanos, false), diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index f9a54bb2a2d5d..e581ee9c95d20 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -31,9 +31,7 @@ use databend_query::auth::AuthMgr; use databend_query::servers::http::middleware::get_client_ip; use databend_query::servers::http::middleware::HTTPSessionEndpoint; use databend_query::servers::http::middleware::HTTPSessionMiddleware; -use databend_query::servers::http::v1::make_final_uri; use databend_query::servers::http::v1::make_page_uri; -use databend_query::servers::http::v1::make_state_uri; use databend_query::servers::http::v1::query_route; use databend_query::servers::http::v1::ExecuteStateKind; use databend_query::servers::http::v1::HttpSessionConf; @@ -276,7 +274,7 @@ async fn test_simple_sql() -> Result<()> { assert!(result.error.is_none(), "{:?}", result.error); let query_id = &result.id; - let final_uri = make_final_uri(query_id); + let final_uri = result.final_uri.clone().unwrap(); assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result); assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result); @@ -284,7 +282,7 @@ async fn test_simple_sql() -> Result<()> { assert_eq!(result.schema.len(), 19, "{:?}", result); // get state - let uri = make_state_uri(query_id); + let uri = result.stats_uri.unwrap(); let (status, result) = get_uri_checked(&ep, &uri).await?; assert_eq!(status, StatusCode::OK, "{:?}", result); assert!(result.error.is_none(), "{:?}", result); @@ -293,8 +291,18 @@ async fn test_simple_sql() -> Result<()> { // assert!(result.schema.is_empty(), "{:?}", result); assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result); + let node_id = result + .session + .as_ref() + .unwrap() + .last_server_info + .as_ref() + .unwrap() + .id + .clone(); + // get page, support retry - let page_0_uri = make_page_uri(query_id, 0); + let page_0_uri = make_page_uri(query_id, &node_id, 0); for _ in 1..3 { let (status, result) = get_uri_checked(&ep, &page_0_uri).await?; assert_eq!(status, StatusCode::OK, "{:?}", result); @@ -306,7 +314,7 @@ async fn test_simple_sql() -> Result<()> { } // client retry - let page_1_uri = make_page_uri(query_id, 1); + let page_1_uri = make_page_uri(query_id, &node_id, 1); let (_, result) = get_uri_checked(&ep, &page_1_uri).await?; assert_eq!(status, StatusCode::OK, "{:?}", result); assert!(result.error.is_none(), "{:?}", result); @@ -314,7 +322,7 @@ async fn test_simple_sql() -> Result<()> { assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result); // get page not expected - let page_2_uri = make_page_uri(query_id, 2); + let page_2_uri = make_page_uri(query_id, &node_id, 2); let response = get_uri(&ep, &page_2_uri).await; assert_eq!(response.status(), StatusCode::NOT_FOUND, "{:?}", result); let body = response.into_body().into_string().await.unwrap(); @@ -497,13 +505,23 @@ async fn test_wait_time_secs() -> Result<()> { let (status, result) = post_json_to_endpoint(&ep, &json, HeaderMap::default()).await?; assert_eq!(result.state, ExecuteStateKind::Starting, "{:?}", result); assert_eq!(status, StatusCode::OK, "{:?}", result); + let node_id = result + .session + .as_ref() + .unwrap() + .last_server_info + .as_ref() + .unwrap() + .id + .clone(); + let query_id = &result.id; - let next_uri = make_page_uri(query_id, 0); + let next_uri = make_page_uri(query_id, &node_id, 0); assert!(result.error.is_none(), "{:?}", result); assert_eq!(result.data.len(), 0, "{:?}", result); assert_eq!(result.next_uri, Some(next_uri.clone()), "{:?}", result); - let mut uri = make_page_uri(query_id, 0); + let mut uri = make_page_uri(query_id, &node_id, 0); let mut num_row = 0; for _ in 1..300 { sleep(Duration::from_millis(10)).await; @@ -569,16 +587,27 @@ async fn test_pagination() -> Result<()> { let json = serde_json::json!({"sql": sql.to_string(), "pagination": {"wait_time_secs": 1, "max_rows_per_page": 2}, "session": { "settings": {}}}); let (status, result) = post_json_to_endpoint(&ep, &json, HeaderMap::default()).await?; + let node_id = result + .session + .as_ref() + .unwrap() + .last_server_info + .as_ref() + .unwrap() + .id + .clone(); + assert_eq!(status, StatusCode::OK, "{:?}", result); let query_id = &result.id; - let next_uri = make_page_uri(query_id, 1); + + let next_uri = make_page_uri(query_id, &node_id, 1); assert!(result.error.is_none(), "{:?}", result); assert_eq!(result.data.len(), 2, "{:?}", result); assert_eq!(result.next_uri, Some(next_uri), "{:?}", result); assert!(!result.schema.is_empty(), "{:?}", result); // get page not expected - let uri = make_page_uri(query_id, 6); + let uri = make_page_uri(query_id, &node_id, 6); let response = get_uri(&ep, &uri).await; assert_eq!(response.status(), StatusCode::NOT_FOUND, "{:?}", result); let body = response.into_body().into_string().await.unwrap(); @@ -597,7 +626,7 @@ async fn test_pagination() -> Result<()> { assert!(!result.schema.is_empty(), "{:?}", result); if page == 5 { // get state - let uri = make_state_uri(query_id); + let uri = result.stats_uri.clone().unwrap(); let (status, _state_result) = get_uri_checked(&ep, &uri).await?; assert_eq!(status, StatusCode::OK); diff --git a/tests/suites/1_stateful/09_http_handler/09_0006_route_error.result b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.result new file mode 100644 index 0000000000000..693db0ae0895c --- /dev/null +++ b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.result @@ -0,0 +1,6 @@ +## kill +{"error":{"code":"404","message":"route error: query QID is on node xxx, not NODE"}} +## page +{"error":{"code":"404","message":"route error: query QID is on node xxx, not NODE"}} +## final +{"error":{"code":"404","message":"route error: query QID is on node xxx, not NODE"}} diff --git a/tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh new file mode 100755 index 0000000000000..2d680e371f870 --- /dev/null +++ b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +QID="my_query_for_route_${RANDOM}" +NODE=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}" -H 'Content-Type: application/json' -d '{"sql": "select 1;"}' | jq -r ".session.last_server_info.id") +echo "## kill" +curl -s -u root: -XGET -w "\n" "http://localhost:8000/v1/query/${QID}/kill?node=xxx" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" +echo "## page" +curl -s -u root: -XGET -w "\n" "http://localhost:8000/v1/query/${QID}/page/0?node=xxx" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" +echo "## final" +curl -s -u root: -XGET -w "\n" "http://localhost:8000/v1/query/${QID}/final?node=xxx" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g"