Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 13 additions & 70 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::common::Column;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::{Aggregate, LogicalPlan, Projection};
use datafusion::prelude::Expr;
use futures_util::Future;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::pin::Pin;
Expand All @@ -45,7 +41,7 @@ use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::Query as LogicalQuery;
use crate::query::{DateBinRequest, DateBinResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand All @@ -58,7 +54,7 @@ use crate::utils::user_auth_for_query;
use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Query {
pub query: String,
Expand All @@ -72,30 +68,6 @@ pub struct Query {
pub filter_tags: Option<Vec<String>>,
}

/// DateBin Request.
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DateBin {
pub stream: String,
pub start_time: String,
pub end_time: String,
pub num_bins: u64,
}

/// DateBinRecord
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
pub struct DateBinRecord {
pub date_bin_timestamp: String,
pub log_count: u64,
}

/// DateBin Response.
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
pub struct DateBinResponse {
pub fields: Vec<String>,
pub records: Vec<DateBinRecord>,
}

pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
let session_state = QUERY_SESSION.state();
let raw_logical_plan = match session_state
Expand Down Expand Up @@ -130,8 +102,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
let time = Instant::now();

if let (true, column_name) = is_logical_plan_aggregate_without_filters(&raw_logical_plan) {
let date_bin_request = DateBin {
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
let date_bin_request = DateBinRequest {
stream: table_name.clone(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
Expand All @@ -141,10 +113,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
let response = if query_request.fields {
json!({
"fields": vec![&column_name],
"records": vec![json!({&column_name: date_bin_records[0].log_count})]
"records": vec![json!({column_name: date_bin_records[0].log_count})]
})
} else {
Value::Array(vec![json!({&column_name: date_bin_records[0].log_count})])
Value::Array(vec![json!({column_name: date_bin_records[0].log_count})])
};

let time = time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -177,17 +149,17 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
Ok(response)
}

pub async fn get_date_bin(req: HttpRequest, body: Bytes) -> Result<impl Responder, QueryError> {
let date_bin_request: DateBin =
serde_json::from_slice(&body).map_err(|err| anyhow::Error::msg(err.to_string()))?;

pub async fn get_date_bin(
req: HttpRequest,
date_bin: Json<DateBinRequest>,
) -> Result<impl Responder, QueryError> {
let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

// does user have access to table?
user_auth_for_query(&permissions, &[date_bin_request.stream.clone()])?;
user_auth_for_query(&permissions, &[date_bin.stream.clone()])?;

let date_bin_records = date_bin_request.get_bin_density().await?;
let date_bin_records = date_bin.get_bin_density().await?;

Ok(web::Json(DateBinResponse {
fields: vec!["date_bin_timestamp".into(), "log_count".into()],
Expand Down Expand Up @@ -226,35 +198,6 @@ pub async fn create_streams_for_querier() {
}
}

fn is_logical_plan_aggregate_without_filters(plan: &LogicalPlan) -> (bool, String) {
match plan {
LogicalPlan::Projection(Projection { input, expr, .. }) => {
if let LogicalPlan::Aggregate(Aggregate { input, .. }) = &**input {
if matches!(&**input, LogicalPlan::TableScan { .. }) && expr.len() == 1 {
return match &expr[0] {
Expr::Column(Column { name, .. }) => (name == "count(*)", name.clone()),
Expr::Alias(Alias {
expr: inner_expr,
name,
..
}) => {
let alias_name = name;
if let Expr::Column(Column { name, .. }) = &**inner_expr {
(name == "count(*)", alias_name.to_string())
} else {
(false, "".to_string())
}
}
_ => (false, "".to_string()),
};
}
}
}
_ => return (false, "".to_string()),
}
(false, "".to_string())
}

impl FromRequest for Query {
type Error = actix_web::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
Expand Down
160 changes: 106 additions & 54 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, Tr
use datafusion::error::DataFusionError;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan};
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::{
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
};
use datafusion::prelude::*;
use itertools::Itertools;
use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::ops::Bound;
Expand All @@ -44,11 +48,12 @@ use sysinfo::System;
use self::error::ExecuteError;
use self::stream_schema_provider::GlobalSchemaProvider;
pub use self::stream_schema_provider::PartialTimeFilter;
use crate::catalog::column::{Int64Type, TypedStatistics};
use crate::catalog::manifest::Manifest;
use crate::catalog::snapshot::Snapshot;
use crate::catalog::Snapshot as CatalogSnapshot;
use crate::event;
use crate::handlers::http::query::{DateBin, DateBinRecord, QueryError};
use crate::handlers::http::query::QueryError;
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, StorageDir, STREAM_ROOT_DIRECTORY};
Expand Down Expand Up @@ -199,9 +204,69 @@ impl Query {
let _ = self.raw_logical_plan.visit(&mut visitor);
visitor.into_inner().pop()
}

/// Evaluates to Some("count(*)") | Some("column_name") if the logical plan is a Projection: SELECT COUNT(*) | SELECT COUNT(*) as column_name
pub fn is_logical_plan_count_without_filters(&self) -> Option<&String> {
// Check if the raw logical plan is a Projection: SELECT
let LogicalPlan::Projection(Projection { input, expr, .. }) = &self.raw_logical_plan else {
return None;
};
// Check if the input of the Projection is an Aggregate: COUNT(*)
let LogicalPlan::Aggregate(Aggregate { input, .. }) = &**input else {
return None;
};

// Ensure the input of the Aggregate is a TableScan and there is exactly one expression: SELECT COUNT(*)
if !matches!(&**input, LogicalPlan::TableScan { .. }) || expr.len() == 1 {
return None;
}

// Check if the expression is a column or an alias for COUNT(*)
match &expr[0] {
// Direct column check
Expr::Column(Column { name, .. }) if name == "count(*)" => Some(name),
// Alias for COUNT(*)
Expr::Alias(Alias {
expr: inner_expr,
name: alias_name,
..
}) => {
if let Expr::Column(Column { name, .. }) = &**inner_expr {
if name == "count(*)" {
return Some(alias_name);
}
}
None
}
// Unsupported expression type
_ => None,
}
}
}

/// DateBinRecord
#[derive(Debug, Serialize, Clone)]
pub struct DateBinRecord {
pub date_bin_timestamp: String,
pub log_count: u64,
}

impl DateBin {
struct DateBinBounds {
start: DateTime<Utc>,
end: DateTime<Utc>,
}

/// DateBin Request.
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DateBinRequest {
pub stream: String,
pub start_time: String,
pub end_time: String,
pub num_bins: u64,
}

impl DateBinRequest {
/// This function is supposed to read maninfest files for the given stream,
/// get the sum of `num_rows` between the `startTime` and `endTime`,
/// divide that by number of bins and return in a manner acceptable for the console
Expand All @@ -225,39 +290,29 @@ impl DateBin {
let mut date_bin_records = Vec::new();

for bin in final_date_bins {
let date_bin_timestamp = match &bin[0] {
PartialTimeFilter::Low(Bound::Included(ts)) => ts.and_utc().timestamp_millis(),
_ => unreachable!(),
};

// extract start and end time to compare
let bin_start = match &bin[0] {
PartialTimeFilter::Low(Bound::Included(ts)) => ts,
_ => unreachable!(),
};
let bin_end = match &bin[1] {
PartialTimeFilter::High(Bound::Included(ts) | Bound::Excluded(ts)) => ts,
_ => unreachable!(),
};
let date_bin_timestamp = bin.start.timestamp_millis();

// Sum up the number of rows that fall within the bin
let total_num_rows: u64 = all_manifest_files
.iter()
.flat_map(|m| &m.files)
.filter(|f| {
f.columns.iter().any(|c| {
.filter_map(|f| {
if f.columns.iter().any(|c| {
c.name == time_partition
&& match &c.stats {
Some(crate::catalog::column::TypedStatistics::Int(int64_type)) => {
let min = DateTime::from_timestamp_millis(int64_type.min)
.unwrap()
.naive_utc();
bin_start <= &min && bin_end >= &min
&& c.stats.as_ref().is_some_and(|stats| match stats {
TypedStatistics::Int(Int64Type { min, .. }) => {
let min = DateTime::from_timestamp_millis(*min).unwrap();
bin.start <= min && bin.end >= min // Determines if a column matches the bin's time range.
}
_ => false,
}
})
})
}) {
Some(f.num_rows)
} else {
None
}
})
.map(|f| f.num_rows)
.sum();

date_bin_records.push(DateBinRecord {
Expand All @@ -270,17 +325,16 @@ impl DateBin {
Ok(date_bin_records)
}

/// calculate the endTime for each bin based on num bins
fn get_bins(&self, time_range: &TimeRange) -> Vec<[PartialTimeFilter; 2]> {
// get total minutes elapsed between start and end time
/// Calculate the end time for each bin based on the number of bins
fn get_bins(&self, time_range: &TimeRange) -> Vec<DateBinBounds> {
let total_minutes = time_range
.end
.signed_duration_since(time_range.start)
.num_minutes() as u64;

// divide minutes by num bins to get minutes per bin
let quotient = (total_minutes / self.num_bins) as i64;
let remainder = (total_minutes % self.num_bins) as i64;
let quotient = total_minutes / self.num_bins;
let remainder = total_minutes % self.num_bins;
let have_remainder = remainder > 0;

// now create multiple bins [startTime, endTime)
Expand All @@ -295,39 +349,37 @@ impl DateBin {
self.num_bins - 1
};

// Create bins for all but the last date
for _ in 0..loop_end {
let bin_end = start + Duration::minutes(quotient);
final_date_bins.push([
PartialTimeFilter::Low(Bound::Included(start.naive_utc())),
PartialTimeFilter::High(Bound::Excluded(bin_end.naive_utc())),
]);

start = bin_end;
let end = start + Duration::minutes(quotient as i64);
final_date_bins.push(DateBinBounds { start, end });
start = end;
}

// construct the last bin
// if we have remainder, then the last bin will be as long as the remainder
// else it will be as long as the quotient
// Add the last bin, accounting for any remainder, should we include it?
if have_remainder {
final_date_bins.push([
PartialTimeFilter::Low(Bound::Included(start.naive_utc())),
PartialTimeFilter::High(Bound::Excluded(
(start + Duration::minutes(remainder)).naive_utc(),
)),
]);
final_date_bins.push(DateBinBounds {
start,
end: start + Duration::minutes(remainder as i64),
});
} else {
final_date_bins.push([
PartialTimeFilter::Low(Bound::Included(start.naive_utc())),
PartialTimeFilter::High(Bound::Excluded(
(start + Duration::minutes(quotient)).naive_utc(),
)),
]);
final_date_bins.push(DateBinBounds {
start,
end: start + Duration::minutes(quotient as i64),
});
}

final_date_bins
}
}

/// DateBin Response.
#[derive(Debug, Serialize, Clone)]
pub struct DateBinResponse {
pub fields: Vec<String>,
pub records: Vec<DateBinRecord>,
}

#[derive(Debug, Default)]
pub(crate) struct TableScanVisitor {
tables: Vec<String>,
Expand Down
Loading