Skip to content

Commit

Permalink
refactor(query): add query id for query log (#15466)
Browse files Browse the repository at this point in the history
* refactor(query): add query id for query log

* refactor(query): add query id for query log

* refactor(query): add query id for query log

* refactor(query): add query id for query log

* refactor(query): add query id for query log
  • Loading branch information
zhang2014 authored May 10, 2024
1 parent 96f3646 commit ca4015f
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 63 deletions.
13 changes: 13 additions & 0 deletions src/common/base/src/runtime/runtime_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub struct ThreadTracker {

#[derive(Clone)]
pub struct TrackingPayload {
pub query_id: Option<String>,
pub profile: Option<Arc<Profile>>,
pub mem_stat: Option<Arc<MemStat>>,
pub metrics: Option<Arc<ScopedRegistry>>,
Expand Down Expand Up @@ -164,6 +165,7 @@ impl ThreadTracker {
profile: None,
metrics: None,
mem_stat: None,
query_id: None,
node_error: None,
},
}
Expand Down Expand Up @@ -283,6 +285,17 @@ impl ThreadTracker {
Ok(Err(oom)) => Err(oom),
}
}

pub fn query_id() -> Option<&'static String> {
TRACKER.with(|tracker| {
tracker
.borrow()
.payload
.query_id
.as_ref()
.map(|query_id| unsafe { &*(query_id as *const String) })
})
}
}

pin_project! {
Expand Down
63 changes: 47 additions & 16 deletions src/common/tracing/src/loggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;

use databend_common_base::runtime::ThreadTracker;
use fern::FormatCallback;
use opentelemetry::logs::AnyValue;
use opentelemetry::logs::Severity;
Expand Down Expand Up @@ -185,12 +186,25 @@ fn format_json_log(out: FormatCallback, message: &fmt::Arguments, record: &log::
};
record.key_values().visit(&mut visitor).ok();

out.finish(format_args!(
r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#,
humantime::format_rfc3339_micros(SystemTime::now()),
record.level(),
serde_json::to_string(&fields).unwrap_or_default(),
));
match ThreadTracker::query_id() {
None => {
out.finish(format_args!(
r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#,
humantime::format_rfc3339_micros(SystemTime::now()),
record.level(),
serde_json::to_string(&fields).unwrap_or_default(),
));
}
Some(query_id) => {
out.finish(format_args!(
r#"{{"timestamp":"{}","level":"{}","query_id":"{}","fields":{}}}"#,
humantime::format_rfc3339_micros(SystemTime::now()),
record.level(),
query_id,
serde_json::to_string(&fields).unwrap_or_default(),
));
}
}

struct KvCollector<'a> {
fields: &'a mut Map<String, serde_json::Value>,
Expand All @@ -210,16 +224,33 @@ fn format_json_log(out: FormatCallback, message: &fmt::Arguments, record: &log::
}

fn format_text_log(out: FormatCallback, message: &fmt::Arguments, record: &log::Record) {
out.finish(format_args!(
"{} {:>5} {}: {}:{} {}{}",
humantime::format_rfc3339_micros(SystemTime::now()),
record.level(),
record.module_path().unwrap_or(""),
record.file().unwrap_or(""),
record.line().unwrap_or(0),
message,
KvDisplay::new(record.key_values()),
));
match ThreadTracker::query_id() {
None => {
out.finish(format_args!(
"{} {:>5} {}: {}:{} {}{}",
humantime::format_rfc3339_micros(SystemTime::now()),
record.level(),
record.module_path().unwrap_or(""),
record.file().unwrap_or(""),
record.line().unwrap_or(0),
message,
KvDisplay::new(record.key_values()),
));
}
Some(query_id) => {
out.finish(format_args!(
"{} {} {:>5} {}: {}:{} {}{}",
query_id,
humantime::format_rfc3339_micros(SystemTime::now()),
record.level(),
record.module_path().unwrap_or(""),
record.file().unwrap_or(""),
record.line().unwrap_or(0),
message,
KvDisplay::new(record.key_values()),
));
}
}
}

pub struct KvDisplay<'kvs> {
Expand Down
102 changes: 55 additions & 47 deletions src/query/service/src/servers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use databend_common_base::runtime::ThreadTracker;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::tenant::Tenant;
Expand Down Expand Up @@ -219,7 +220,7 @@ pub struct HTTPSessionEndpoint<E> {

impl<E> HTTPSessionEndpoint<E> {
#[async_backtrace::framed]
async fn auth(&self, req: &Request) -> Result<HttpQueryContext> {
async fn auth(&self, req: &Request, query_id: String) -> Result<HttpQueryContext> {
let credential = get_credential(req, self.kind)?;
let session_manager = SessionManager::instance();
let session = session_manager.create_session(SessionType::Dummy).await?;
Expand All @@ -245,12 +246,6 @@ impl<E> HTTPSessionEndpoint<E> {
.get(USER_AGENT)
.map(|id| id.to_str().unwrap().to_string());

let query_id = req
.headers()
.get(QUERY_ID)
.map(|id| id.to_str().unwrap().to_string())
.unwrap_or_else(|| Uuid::new_v4().to_string());

let trace_parent = req
.headers()
.get(TRACE_PARENT)
Expand Down Expand Up @@ -282,49 +277,62 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
let uri = req.uri().clone();
let headers = req.headers().clone();

let res = match self.auth(&req).await {
Ok(ctx) => {
req.extensions_mut().insert(ctx);
self.ep.call(req).await
}
Err(err) => match err.code() {
ErrorCode::AUTHENTICATE_FAILURE => {
warn!(
"http auth failure: {method} {uri}, headers={:?}, error={}",
sanitize_request_headers(&headers),
err
);
Err(PoemError::from_string(
err.message(),
StatusCode::UNAUTHORIZED,
))
}
_ => {
error!(
"http request err: {method} {uri}, headers={:?}, error={}",
sanitize_request_headers(&headers),
err
);
Err(PoemError::from_string(
err.message(),
StatusCode::INTERNAL_SERVER_ERROR,
))
let query_id = req
.headers()
.get(QUERY_ID)
.map(|id| id.to_str().unwrap().to_string())
.unwrap_or_else(|| Uuid::new_v4().to_string());

let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(query_id.clone());
let _guard = ThreadTracker::tracking(tracking_payload);

ThreadTracker::tracking_future(async move {
let res = match self.auth(&req, query_id).await {
Ok(ctx) => {
req.extensions_mut().insert(ctx);
self.ep.call(req).await
}
},
};
match res {
Err(err) => {
let body = Body::from_json(serde_json::json!({
"error": {
"code": err.status().as_str(),
"message": err.to_string(),
Err(err) => match err.code() {
ErrorCode::AUTHENTICATE_FAILURE => {
warn!(
"http auth failure: {method} {uri}, headers={:?}, error={}",
sanitize_request_headers(&headers),
err
);
Err(PoemError::from_string(
err.message(),
StatusCode::UNAUTHORIZED,
))
}
_ => {
error!(
"http request err: {method} {uri}, headers={:?}, error={}",
sanitize_request_headers(&headers),
err
);
Err(PoemError::from_string(
err.message(),
StatusCode::INTERNAL_SERVER_ERROR,
))
}
}))
.unwrap();
Ok(Response::builder().status(err.status()).body(body))
},
};
match res {
Err(err) => {
let body = Body::from_json(serde_json::json!({
"error": {
"code": err.status().as_str(),
"message": err.to_string(),
}
}))
.unwrap();
Ok(Response::builder().status(err.status()).body(body))
}
Ok(res) => Ok(res.into_response()),
}
Ok(res) => Ok(res.into_response()),
}
})
.await
}
}

Expand Down

0 comments on commit ca4015f

Please sign in to comment.