diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 716ecc4d3..3d13bc0cb 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,17 +19,13 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; -use bytes::Bytes; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; -use datafusion::common::Column; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; -use datafusion::logical_expr::expr::Alias; -use datafusion::logical_expr::{Aggregate, LogicalPlan, Projection}; -use datafusion::prelude::Expr; use futures_util::Future; use http::StatusCode; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::pin::Pin; @@ -45,7 +41,7 @@ use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; -use crate::query::Query as LogicalQuery; +use crate::query::{DateBinRequest, DateBinResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -58,7 +54,7 @@ use crate::utils::user_auth_for_query; use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage; /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, @@ -72,30 +68,6 @@ pub struct Query { pub filter_tags: Option>, } -/// DateBin Request. -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct DateBin { - pub stream: String, - pub start_time: String, - pub end_time: String, - pub num_bins: u64, -} - -/// DateBinRecord -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] -pub struct DateBinRecord { - pub date_bin_timestamp: String, - pub log_count: u64, -} - -/// DateBin Response. -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] -pub struct DateBinResponse { - pub fields: Vec, - pub records: Vec, -} - pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); let raw_logical_plan = match session_state @@ -130,8 +102,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result Result { - let date_bin_request: DateBin = - serde_json::from_slice(&body).map_err(|err| anyhow::Error::msg(err.to_string()))?; - +pub async fn get_date_bin( + req: HttpRequest, + date_bin: Json, +) -> Result { let creds = extract_session_key_from_req(&req)?; let permissions = Users.get_permissions(&creds); // does user have access to table? - user_auth_for_query(&permissions, &[date_bin_request.stream.clone()])?; + user_auth_for_query(&permissions, &[date_bin.stream.clone()])?; - let date_bin_records = date_bin_request.get_bin_density().await?; + let date_bin_records = date_bin.get_bin_density().await?; Ok(web::Json(DateBinResponse { fields: vec!["date_bin_timestamp".into(), "log_count".into()], @@ -226,35 +198,6 @@ pub async fn create_streams_for_querier() { } } -fn is_logical_plan_aggregate_without_filters(plan: &LogicalPlan) -> (bool, String) { - match plan { - LogicalPlan::Projection(Projection { input, expr, .. }) => { - if let LogicalPlan::Aggregate(Aggregate { input, .. }) = &**input { - if matches!(&**input, LogicalPlan::TableScan { .. }) && expr.len() == 1 { - return match &expr[0] { - Expr::Column(Column { name, .. }) => (name == "count(*)", name.clone()), - Expr::Alias(Alias { - expr: inner_expr, - name, - .. - }) => { - let alias_name = name; - if let Expr::Column(Column { name, .. }) = &**inner_expr { - (name == "count(*)", alias_name.to_string()) - } else { - (false, "".to_string()) - } - } - _ => (false, "".to_string()), - }; - } - } - } - _ => return (false, "".to_string()), - } - (false, "".to_string()) -} - impl FromRequest for Query { type Error = actix_web::Error; type Future = Pin>>>; diff --git a/src/query/mod.rs b/src/query/mod.rs index bad753073..80c4e9fe7 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -28,11 +28,15 @@ use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, Tr use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::SessionStateBuilder; -use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan}; +use datafusion::logical_expr::expr::Alias; +use datafusion::logical_expr::{ + Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, +}; use datafusion::prelude::*; use itertools::Itertools; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::ops::Bound; @@ -44,11 +48,12 @@ use sysinfo::System; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; +use crate::catalog::column::{Int64Type, TypedStatistics}; use crate::catalog::manifest::Manifest; use crate::catalog::snapshot::Snapshot; use crate::catalog::Snapshot as CatalogSnapshot; use crate::event; -use crate::handlers::http::query::{DateBin, DateBinRecord, QueryError}; +use crate::handlers::http::query::QueryError; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, StorageDir, STREAM_ROOT_DIRECTORY}; @@ -199,9 +204,69 @@ impl Query { let _ = self.raw_logical_plan.visit(&mut visitor); visitor.into_inner().pop() } + + /// Evaluates to Some("count(*)") | Some("column_name") if the logical plan is a Projection: SELECT COUNT(*) | SELECT COUNT(*) as column_name + pub fn is_logical_plan_count_without_filters(&self) -> Option<&String> { + // Check if the raw logical plan is a Projection: SELECT + let LogicalPlan::Projection(Projection { input, expr, .. }) = &self.raw_logical_plan else { + return None; + }; + // Check if the input of the Projection is an Aggregate: COUNT(*) + let LogicalPlan::Aggregate(Aggregate { input, .. }) = &**input else { + return None; + }; + + // Ensure the input of the Aggregate is a TableScan and there is exactly one expression: SELECT COUNT(*) + if !matches!(&**input, LogicalPlan::TableScan { .. }) || expr.len() == 1 { + return None; + } + + // Check if the expression is a column or an alias for COUNT(*) + match &expr[0] { + // Direct column check + Expr::Column(Column { name, .. }) if name == "count(*)" => Some(name), + // Alias for COUNT(*) + Expr::Alias(Alias { + expr: inner_expr, + name: alias_name, + .. + }) => { + if let Expr::Column(Column { name, .. }) = &**inner_expr { + if name == "count(*)" { + return Some(alias_name); + } + } + None + } + // Unsupported expression type + _ => None, + } + } +} + +/// DateBinRecord +#[derive(Debug, Serialize, Clone)] +pub struct DateBinRecord { + pub date_bin_timestamp: String, + pub log_count: u64, } -impl DateBin { +struct DateBinBounds { + start: DateTime, + end: DateTime, +} + +/// DateBin Request. +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct DateBinRequest { + pub stream: String, + pub start_time: String, + pub end_time: String, + pub num_bins: u64, +} + +impl DateBinRequest { /// This function is supposed to read maninfest files for the given stream, /// get the sum of `num_rows` between the `startTime` and `endTime`, /// divide that by number of bins and return in a manner acceptable for the console @@ -225,39 +290,29 @@ impl DateBin { let mut date_bin_records = Vec::new(); for bin in final_date_bins { - let date_bin_timestamp = match &bin[0] { - PartialTimeFilter::Low(Bound::Included(ts)) => ts.and_utc().timestamp_millis(), - _ => unreachable!(), - }; - // extract start and end time to compare - let bin_start = match &bin[0] { - PartialTimeFilter::Low(Bound::Included(ts)) => ts, - _ => unreachable!(), - }; - let bin_end = match &bin[1] { - PartialTimeFilter::High(Bound::Included(ts) | Bound::Excluded(ts)) => ts, - _ => unreachable!(), - }; + let date_bin_timestamp = bin.start.timestamp_millis(); + // Sum up the number of rows that fall within the bin let total_num_rows: u64 = all_manifest_files .iter() .flat_map(|m| &m.files) - .filter(|f| { - f.columns.iter().any(|c| { + .filter_map(|f| { + if f.columns.iter().any(|c| { c.name == time_partition - && match &c.stats { - Some(crate::catalog::column::TypedStatistics::Int(int64_type)) => { - let min = DateTime::from_timestamp_millis(int64_type.min) - .unwrap() - .naive_utc(); - bin_start <= &min && bin_end >= &min + && c.stats.as_ref().is_some_and(|stats| match stats { + TypedStatistics::Int(Int64Type { min, .. }) => { + let min = DateTime::from_timestamp_millis(*min).unwrap(); + bin.start <= min && bin.end >= min // Determines if a column matches the bin's time range. } _ => false, - } - }) + }) + }) { + Some(f.num_rows) + } else { + None + } }) - .map(|f| f.num_rows) .sum(); date_bin_records.push(DateBinRecord { @@ -270,17 +325,16 @@ impl DateBin { Ok(date_bin_records) } - /// calculate the endTime for each bin based on num bins - fn get_bins(&self, time_range: &TimeRange) -> Vec<[PartialTimeFilter; 2]> { - // get total minutes elapsed between start and end time + /// Calculate the end time for each bin based on the number of bins + fn get_bins(&self, time_range: &TimeRange) -> Vec { let total_minutes = time_range .end .signed_duration_since(time_range.start) .num_minutes() as u64; // divide minutes by num bins to get minutes per bin - let quotient = (total_minutes / self.num_bins) as i64; - let remainder = (total_minutes % self.num_bins) as i64; + let quotient = total_minutes / self.num_bins; + let remainder = total_minutes % self.num_bins; let have_remainder = remainder > 0; // now create multiple bins [startTime, endTime) @@ -295,39 +349,37 @@ impl DateBin { self.num_bins - 1 }; + // Create bins for all but the last date for _ in 0..loop_end { - let bin_end = start + Duration::minutes(quotient); - final_date_bins.push([ - PartialTimeFilter::Low(Bound::Included(start.naive_utc())), - PartialTimeFilter::High(Bound::Excluded(bin_end.naive_utc())), - ]); - - start = bin_end; + let end = start + Duration::minutes(quotient as i64); + final_date_bins.push(DateBinBounds { start, end }); + start = end; } - // construct the last bin - // if we have remainder, then the last bin will be as long as the remainder - // else it will be as long as the quotient + // Add the last bin, accounting for any remainder, should we include it? if have_remainder { - final_date_bins.push([ - PartialTimeFilter::Low(Bound::Included(start.naive_utc())), - PartialTimeFilter::High(Bound::Excluded( - (start + Duration::minutes(remainder)).naive_utc(), - )), - ]); + final_date_bins.push(DateBinBounds { + start, + end: start + Duration::minutes(remainder as i64), + }); } else { - final_date_bins.push([ - PartialTimeFilter::Low(Bound::Included(start.naive_utc())), - PartialTimeFilter::High(Bound::Excluded( - (start + Duration::minutes(quotient)).naive_utc(), - )), - ]); + final_date_bins.push(DateBinBounds { + start, + end: start + Duration::minutes(quotient as i64), + }); } final_date_bins } } +/// DateBin Response. +#[derive(Debug, Serialize, Clone)] +pub struct DateBinResponse { + pub fields: Vec, + pub records: Vec, +} + #[derive(Debug, Default)] pub(crate) struct TableScanVisitor { tables: Vec,