diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index 5c44c8ecd3b86..76fd27103e4a8 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -101,6 +101,7 @@ pub struct ThreadTracker { #[derive(Clone)] pub struct TrackingPayload { + pub query_id: Option, pub profile: Option>, pub mem_stat: Option>, pub metrics: Option>, @@ -164,6 +165,7 @@ impl ThreadTracker { profile: None, metrics: None, mem_stat: None, + query_id: None, node_error: None, }, } @@ -283,6 +285,17 @@ impl ThreadTracker { Ok(Err(oom)) => Err(oom), } } + + pub fn query_id() -> Option<&'static String> { + TRACKER.with(|tracker| { + tracker + .borrow() + .payload + .query_id + .as_ref() + .map(|query_id| unsafe { &*(query_id as *const String) }) + }) + } } pin_project! { diff --git a/src/common/tracing/src/loggers.rs b/src/common/tracing/src/loggers.rs index 2f2f1b1b7175d..483eafcd5db4c 100644 --- a/src/common/tracing/src/loggers.rs +++ b/src/common/tracing/src/loggers.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use std::time::SystemTime; +use databend_common_base::runtime::ThreadTracker; use fern::FormatCallback; use opentelemetry::logs::AnyValue; use opentelemetry::logs::Severity; @@ -185,12 +186,25 @@ fn format_json_log(out: FormatCallback, message: &fmt::Arguments, record: &log:: }; record.key_values().visit(&mut visitor).ok(); - out.finish(format_args!( - r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#, - humantime::format_rfc3339_micros(SystemTime::now()), - record.level(), - serde_json::to_string(&fields).unwrap_or_default(), - )); + match ThreadTracker::query_id() { + None => { + out.finish(format_args!( + r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#, + humantime::format_rfc3339_micros(SystemTime::now()), + record.level(), + serde_json::to_string(&fields).unwrap_or_default(), + )); + } + Some(query_id) => { + out.finish(format_args!( + r#"{{"timestamp":"{}","level":"{}","query_id":"{}","fields":{}}}"#, + humantime::format_rfc3339_micros(SystemTime::now()), + record.level(), + query_id, + serde_json::to_string(&fields).unwrap_or_default(), + )); + } + } struct KvCollector<'a> { fields: &'a mut Map, @@ -210,16 +224,33 @@ fn format_json_log(out: FormatCallback, message: &fmt::Arguments, record: &log:: } fn format_text_log(out: FormatCallback, message: &fmt::Arguments, record: &log::Record) { - out.finish(format_args!( - "{} {:>5} {}: {}:{} {}{}", - humantime::format_rfc3339_micros(SystemTime::now()), - record.level(), - record.module_path().unwrap_or(""), - record.file().unwrap_or(""), - record.line().unwrap_or(0), - message, - KvDisplay::new(record.key_values()), - )); + match ThreadTracker::query_id() { + None => { + out.finish(format_args!( + "{} {:>5} {}: {}:{} {}{}", + humantime::format_rfc3339_micros(SystemTime::now()), + record.level(), + record.module_path().unwrap_or(""), + record.file().unwrap_or(""), + record.line().unwrap_or(0), + message, + KvDisplay::new(record.key_values()), + )); + } + Some(query_id) => { + out.finish(format_args!( + "{} {} {:>5} {}: {}:{} {}{}", + query_id, + humantime::format_rfc3339_micros(SystemTime::now()), + record.level(), + record.module_path().unwrap_or(""), + record.file().unwrap_or(""), + record.line().unwrap_or(0), + message, + KvDisplay::new(record.key_values()), + )); + } + } } pub struct KvDisplay<'kvs> { diff --git a/src/query/service/src/servers/http/middleware.rs b/src/query/service/src/servers/http/middleware.rs index 2c84c26a18256..f02fc00f9f966 100644 --- a/src/query/service/src/servers/http/middleware.rs +++ b/src/query/service/src/servers/http/middleware.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; +use databend_common_base::runtime::ThreadTracker; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::tenant::Tenant; @@ -219,7 +220,7 @@ pub struct HTTPSessionEndpoint { impl HTTPSessionEndpoint { #[async_backtrace::framed] - async fn auth(&self, req: &Request) -> Result { + async fn auth(&self, req: &Request, query_id: String) -> Result { let credential = get_credential(req, self.kind)?; let session_manager = SessionManager::instance(); let session = session_manager.create_session(SessionType::Dummy).await?; @@ -245,12 +246,6 @@ impl HTTPSessionEndpoint { .get(USER_AGENT) .map(|id| id.to_str().unwrap().to_string()); - let query_id = req - .headers() - .get(QUERY_ID) - .map(|id| id.to_str().unwrap().to_string()) - .unwrap_or_else(|| Uuid::new_v4().to_string()); - let trace_parent = req .headers() .get(TRACE_PARENT) @@ -282,49 +277,62 @@ impl Endpoint for HTTPSessionEndpoint { let uri = req.uri().clone(); let headers = req.headers().clone(); - let res = match self.auth(&req).await { - Ok(ctx) => { - req.extensions_mut().insert(ctx); - self.ep.call(req).await - } - Err(err) => match err.code() { - ErrorCode::AUTHENTICATE_FAILURE => { - warn!( - "http auth failure: {method} {uri}, headers={:?}, error={}", - sanitize_request_headers(&headers), - err - ); - Err(PoemError::from_string( - err.message(), - StatusCode::UNAUTHORIZED, - )) - } - _ => { - error!( - "http request err: {method} {uri}, headers={:?}, error={}", - sanitize_request_headers(&headers), - err - ); - Err(PoemError::from_string( - err.message(), - StatusCode::INTERNAL_SERVER_ERROR, - )) + let query_id = req + .headers() + .get(QUERY_ID) + .map(|id| id.to_str().unwrap().to_string()) + .unwrap_or_else(|| Uuid::new_v4().to_string()); + + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + tracking_payload.query_id = Some(query_id.clone()); + let _guard = ThreadTracker::tracking(tracking_payload); + + ThreadTracker::tracking_future(async move { + let res = match self.auth(&req, query_id).await { + Ok(ctx) => { + req.extensions_mut().insert(ctx); + self.ep.call(req).await } - }, - }; - match res { - Err(err) => { - let body = Body::from_json(serde_json::json!({ - "error": { - "code": err.status().as_str(), - "message": err.to_string(), + Err(err) => match err.code() { + ErrorCode::AUTHENTICATE_FAILURE => { + warn!( + "http auth failure: {method} {uri}, headers={:?}, error={}", + sanitize_request_headers(&headers), + err + ); + Err(PoemError::from_string( + err.message(), + StatusCode::UNAUTHORIZED, + )) + } + _ => { + error!( + "http request err: {method} {uri}, headers={:?}, error={}", + sanitize_request_headers(&headers), + err + ); + Err(PoemError::from_string( + err.message(), + StatusCode::INTERNAL_SERVER_ERROR, + )) } - })) - .unwrap(); - Ok(Response::builder().status(err.status()).body(body)) + }, + }; + match res { + Err(err) => { + let body = Body::from_json(serde_json::json!({ + "error": { + "code": err.status().as_str(), + "message": err.to_string(), + } + })) + .unwrap(); + Ok(Response::builder().status(err.status()).body(body)) + } + Ok(res) => Ok(res.into_response()), } - Ok(res) => Ok(res.into_response()), - } + }) + .await } }