From fb286d08ef26b83e64e1520215ef8f1e030d9cb5 Mon Sep 17 00:00:00 2001
From: Yang Xiufeng <yangxiufeng.c@gmail.com>
Date: Wed, 29 May 2024 20:06:36 +0800
Subject: [PATCH] refactor: more explicit error confirmation for http query
 route error.

---
 .../service/src/servers/http/middleware.rs    | 14 ++++++++--
 .../servers/http/v1/http_query_handlers.rs    | 21 ++++++++------
 .../http/v1/query/http_query_context.rs       | 28 +++++++++++++++++--
 .../http/v1/query/http_query_manager.rs       |  5 +++-
 .../it/servers/http/http_query_handlers.rs    |  8 ++----
 .../09_0006_route_error.result                | 15 ++++++++++
 .../09_http_handler/09_0006_route_error.sh    | 24 ++++++++++++++++
 7 files changed, 96 insertions(+), 19 deletions(-)
 create mode 100644 tests/suites/1_stateful/09_http_handler/09_0006_route_error.result
 create mode 100755 tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh

diff --git a/src/query/service/src/servers/http/middleware.rs b/src/query/service/src/servers/http/middleware.rs
index da2b6018a6696..d0cada272c252 100644
--- a/src/query/service/src/servers/http/middleware.rs
+++ b/src/query/service/src/servers/http/middleware.rs
@@ -61,6 +61,7 @@ use crate::sessions::SessionType;
 const DEDUPLICATE_LABEL: &str = "X-DATABEND-DEDUPLICATE-LABEL";
 const USER_AGENT: &str = "User-Agent";
 const QUERY_ID: &str = "X-DATABEND-QUERY-ID";
+const NODE_ID: &str = "X-DATABEND-NODE-ID";
 
 const TRACE_PARENT: &str = "traceparent";
 
@@ -253,9 +254,6 @@ impl<E> HTTPSessionEndpoint<E> {
 
         let session = session_manager.register_session(session)?;
 
-        let ctx = session.create_query_context().await?;
-        let node_id = ctx.get_cluster().local_id.clone();
-
         let deduplicate_label = req
             .headers()
             .get(DEDUPLICATE_LABEL)
@@ -266,16 +264,26 @@ impl<E> HTTPSessionEndpoint<E> {
             .get(USER_AGENT)
             .map(|id| id.to_str().unwrap().to_string());
 
+        let expected_node_id = req
+            .headers()
+            .get(NODE_ID)
+            .map(|id| id.to_str().unwrap().to_string());
+
         let trace_parent = req
             .headers()
             .get(TRACE_PARENT)
             .map(|id| id.to_str().unwrap().to_string());
         let baggage = extract_baggage_from_headers(req.headers());
         let client_host = get_client_ip(req);
+
+        let ctx = session.create_query_context().await?;
+        let node_id = ctx.get_cluster().local_id.clone();
+
         Ok(HttpQueryContext::new(
             session,
             query_id,
             node_id,
+            expected_node_id,
             deduplicate_label,
             user_agent,
             trace_parent,
diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs
index e888ab6898014..490808aa29e4d 100644
--- a/src/query/service/src/servers/http/v1/http_query_handlers.rs
+++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs
@@ -225,13 +225,14 @@ async fn query_final_handler(
     ctx: &HttpQueryContext,
     Path(query_id): Path<String>,
 ) -> PoemResult<impl IntoResponse> {
+    ctx.check_node_id(&query_id)?;
     let root = get_http_tracing_span(full_name!(), ctx, &query_id);
     let _t = SlowRequestLogTracker::new(ctx);
-
     async {
         info!(
-            "{}: got /v1/query/{}/final request, this query is going to be finally completed.",
-            query_id, query_id
+            "{}: got {} request, this query is going to be finally completed.",
+            query_id,
+            make_final_uri(&query_id)
         );
         let http_query_manager = HttpQueryManager::instance();
         match http_query_manager
@@ -262,13 +263,14 @@ async fn query_cancel_handler(
     ctx: &HttpQueryContext,
     Path(query_id): Path<String>,
 ) -> PoemResult<impl IntoResponse> {
+    ctx.check_node_id(&query_id)?;
     let root = get_http_tracing_span(full_name!(), ctx, &query_id);
     let _t = SlowRequestLogTracker::new(ctx);
-
     async {
         info!(
-            "{}: got /v1/query/{}/kill request, cancel the query",
-            query_id, query_id
+            "{}: got {} request, cancel the query",
+            query_id,
+            make_kill_uri(&query_id)
         );
         let http_query_manager = HttpQueryManager::instance();
         match http_query_manager
@@ -292,6 +294,7 @@ async fn query_state_handler(
     ctx: &HttpQueryContext,
     Path(query_id): Path<String>,
 ) -> PoemResult<impl IntoResponse> {
+    ctx.check_node_id(&query_id)?;
     let root = get_http_tracing_span(full_name!(), ctx, &query_id);
 
     async {
@@ -317,6 +320,7 @@ async fn query_page_handler(
     ctx: &HttpQueryContext,
     Path((query_id, page_no)): Path<(String, usize)>,
 ) -> PoemResult<impl IntoResponse> {
+    ctx.check_node_id(&query_id)?;
     let root = get_http_tracing_span(full_name!(), ctx, &query_id);
     let _t = SlowRequestLogTracker::new(ctx);
 
@@ -352,7 +356,8 @@ pub(crate) async fn query_handler(
     let _t = SlowRequestLogTracker::new(ctx);
 
     async {
-        info!("http query new request: {:}", mask_connection_info(&format!("{:?}", req)));
+        let agent = ctx.user_agent.as_ref().map(|s|(format!("(from {s})"))).unwrap_or("".to_string());
+        info!("http query new request{}: {:}", agent, mask_connection_info(&format!("{:?}", req)));
         let http_query_manager = HttpQueryManager::instance();
         let sql = req.sql.clone();
 
@@ -436,7 +441,7 @@ fn query_id_to_trace_id(query_id: &str) -> TraceId {
 }
 
 /// The HTTP query endpoints are expected to be responses within 60 seconds.
-/// If it exceeds far of 60 seconds, there might be something wrong, we should
+/// If it exceeds far from 60 seconds, there might be something wrong, we should
 /// log it.
 struct SlowRequestLogTracker {
     started_at: std::time::Instant,
diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs
index f1a12748faabb..05f632debfec9 100644
--- a/src/query/service/src/servers/http/v1/query/http_query_context.rs
+++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs
@@ -15,11 +15,14 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 
+use http::StatusCode;
+use log::warn;
 use poem::FromRequest;
 use poem::Request;
 use poem::RequestBody;
-use poem::Result as PoemResult;
+use time::Instant;
 
+use crate::servers::http::v1::HttpQueryManager;
 use crate::sessions::Session;
 use crate::sessions::SessionManager;
 use crate::sessions::SessionType;
@@ -29,6 +32,7 @@ pub struct HttpQueryContext {
     pub session: Arc<Session>,
     pub query_id: String,
     pub node_id: String,
+    pub expected_node_id: Option<String>,
     pub deduplicate_label: Option<String>,
     pub user_agent: Option<String>,
     pub trace_parent: Option<String>,
@@ -43,6 +47,7 @@ impl HttpQueryContext {
         session: Arc<Session>,
         query_id: String,
         node_id: String,
+        expected_node_id: Option<String>,
         deduplicate_label: Option<String>,
         user_agent: Option<String>,
         trace_parent: Option<String>,
@@ -55,6 +60,7 @@ impl HttpQueryContext {
             session,
             query_id,
             node_id,
+            expected_node_id,
             deduplicate_label,
             user_agent,
             trace_parent,
@@ -101,11 +107,29 @@ impl HttpQueryContext {
     pub fn set_fail(&self) {
         self.session.txn_mgr().lock().set_fail();
     }
+
+    pub fn check_node_id(&self, query_id: &str) -> poem::Result<()> {
+        if let Some(expected_node_id) = self.expected_node_id.as_ref() {
+            if expected_node_id != &self.node_id {
+                let manager = HttpQueryManager::instance();
+                let start_time = manager.server_info.start_time.clone();
+                let uptime = (Instant::now() - manager.start_instant).as_seconds_f32();
+                let msg = format!(
+                    "route error: query {query_id} SHOULD be on server {expected_node_id}, but current server is {}, which started at {start_time}({uptime} secs ago)",
+                    self.node_id
+                );
+                warn!("{msg}");
+                return Err(poem::Error::from_string(msg, StatusCode::NOT_FOUND));
+            }
+        }
+
+        Ok(())
+    }
 }
 
 impl<'a> FromRequest<'a> for &'a HttpQueryContext {
     #[async_backtrace::framed]
-    async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult<Self> {
+    async fn from_request(req: &'a Request, _body: &mut RequestBody) -> poem::Result<Self> {
         Ok(req.extensions().get::<HttpQueryContext>().expect(
             "To use the `HttpQueryContext` extractor, the `HTTPSessionMiddleware` is required",
         ))
diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs
index 3afc2cec31134..296f74f67194c 100644
--- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs
+++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs
@@ -30,6 +30,7 @@ use databend_common_exception::ErrorCode;
 use databend_common_exception::Result;
 use databend_storages_common_txn::TxnManagerRef;
 use parking_lot::Mutex;
+use time::Instant;
 use tokio::task;
 
 use super::expiring_map::ExpiringMap;
@@ -77,6 +78,7 @@ impl<T> LimitedQueue<T> {
 }
 
 pub struct HttpQueryManager {
+    pub(crate) start_instant: Instant,
     pub(crate) server_info: ServerInfo,
     #[allow(clippy::type_complexity)]
     pub(crate) queries: Arc<DashMap<String, Arc<HttpQuery>>>,
@@ -90,9 +92,10 @@ impl HttpQueryManager {
     #[async_backtrace::framed]
     pub async fn init(cfg: &InnerConfig) -> Result<()> {
         GlobalInstance::set(Arc::new(HttpQueryManager {
+            start_instant: Instant::now(),
             server_info: ServerInfo {
                 id: cfg.query.node_id.clone(),
-                start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Nanos, false),
+                start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Millis, false),
             },
             queries: Arc::new(DashMap::new()),
             sessions: Mutex::new(ExpiringMap::default()),
diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs
index f9a54bb2a2d5d..7ffd1ba305a47 100644
--- a/src/query/service/tests/it/servers/http/http_query_handlers.rs
+++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs
@@ -31,9 +31,7 @@ use databend_query::auth::AuthMgr;
 use databend_query::servers::http::middleware::get_client_ip;
 use databend_query::servers::http::middleware::HTTPSessionEndpoint;
 use databend_query::servers::http::middleware::HTTPSessionMiddleware;
-use databend_query::servers::http::v1::make_final_uri;
 use databend_query::servers::http::v1::make_page_uri;
-use databend_query::servers::http::v1::make_state_uri;
 use databend_query::servers::http::v1::query_route;
 use databend_query::servers::http::v1::ExecuteStateKind;
 use databend_query::servers::http::v1::HttpSessionConf;
@@ -276,7 +274,7 @@ async fn test_simple_sql() -> Result<()> {
     assert!(result.error.is_none(), "{:?}", result.error);
 
     let query_id = &result.id;
-    let final_uri = make_final_uri(query_id);
+    let final_uri = result.final_uri.clone().unwrap();
 
     assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result);
     assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result);
@@ -284,7 +282,7 @@ async fn test_simple_sql() -> Result<()> {
     assert_eq!(result.schema.len(), 19, "{:?}", result);
 
     // get state
-    let uri = make_state_uri(query_id);
+    let uri = result.stats_uri.unwrap();
     let (status, result) = get_uri_checked(&ep, &uri).await?;
     assert_eq!(status, StatusCode::OK, "{:?}", result);
     assert!(result.error.is_none(), "{:?}", result);
@@ -597,7 +595,7 @@ async fn test_pagination() -> Result<()> {
         assert!(!result.schema.is_empty(), "{:?}", result);
         if page == 5 {
             // get state
-            let uri = make_state_uri(query_id);
+            let uri = result.stats_uri.clone().unwrap();
             let (status, _state_result) = get_uri_checked(&ep, &uri).await?;
             assert_eq!(status, StatusCode::OK);
 
diff --git a/tests/suites/1_stateful/09_http_handler/09_0006_route_error.result b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.result
new file mode 100644
index 0000000000000..b6d7c05744ffd
--- /dev/null
+++ b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.result
@@ -0,0 +1,15 @@
+# error
+## page
+{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}}
+## kill
+{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}}
+## final
+{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}}
+
+# ok
+## page
+[["1"]]
+## kill
+200
+## final
+null
diff --git a/tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh
new file mode 100755
index 0000000000000..f55ffd5832ad7
--- /dev/null
+++ b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+
+CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+. "$CURDIR"/../../../shell_env.sh
+
+QID="my_query_for_route_${RANDOM}"
+NODE=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}"  -H 'Content-Type: application/json' -d '{"sql": "select 1;"}' | jq -r ".session.last_server_info.id")
+echo "# error"
+echo "## page"
+curl -s -u root: -XGET -H "x-databend-node-id:XXX"  -w "\n" "http://localhost:8000/v1/query/${QID}/page/0" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'
+echo "## kill"
+curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n"  "http://localhost:8000/v1/query/${QID}/kill" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'
+echo "## final"
+curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/final" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'
+
+echo ""
+
+echo "# ok"
+echo "## page"
+curl -s -u root: -XGET -H "x-databend-node-id:${NODE}"  -w "\n" "http://localhost:8000/v1/query/${QID}/page/0" | jq -c ".data"
+echo "## kill"
+curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "%{http_code}\n"  "http://localhost:8000/v1/query/${QID}/kill"
+echo "## final"
+curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "\n" "http://localhost:8000/v1/query/${QID}/final" | jq ".next_uri"