Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions src/log-query/src/log_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ impl Filters {
Filters::Single(filter)
}
}
/// Aggregation function with optional range and alias.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggFunc {
/// Function name, e.g., "count", "sum", etc.
pub name: String,
/// Arguments to the function. e.g., column references or literals. LogExpr::NamedIdent("column1".to_string())
pub args: Vec<LogExpr>,
pub alias: Option<String>,
}

impl AggFunc {
pub fn new(name: String, args: Vec<LogExpr>, alias: Option<String>) -> Self {
Self { name, args, alias }
}
}

/// Expression to calculate on log after filtering.
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -103,13 +118,11 @@ pub enum LogExpr {
args: Vec<LogExpr>,
alias: Option<String>,
},
/// Aggregation function with optional grouping.
AggrFunc {
name: String,
args: Vec<LogExpr>,
/// Optional range function parameter. Stands for the time range for both step and align.
range: Option<String>,
/// Function name, arguments, and optional alias.
expr: Vec<AggFunc>,
by: Vec<LogExpr>,
alias: Option<String>,
},
Decompose {
expr: Box<LogExpr>,
Expand Down
75 changes: 41 additions & 34 deletions src/query/src/log_query/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion_expr::{
};
use datafusion_sql::TableReference;
use datatypes::schema::Schema;
use log_query::{BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter};
use log_query::{AggFunc, BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter};
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;

Expand Down Expand Up @@ -312,26 +312,40 @@ impl LogQueryPlanner {
fn build_aggr_func(
&self,
schema: &DFSchema,
fn_name: &str,
args: &[LogExpr],
expr: &[AggFunc],
by: &[LogExpr],
) -> Result<(Expr, Vec<Expr>)> {
let aggr_fn = self
.session_state
.aggregate_functions()
.get(fn_name)
.context(UnknownAggregateFunctionSnafu {
name: fn_name.to_string(),
})?;
let args = args
) -> Result<(Vec<Expr>, Vec<Expr>)> {
let aggr_expr = expr
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.map(|agg_func| {
let AggFunc {
name: fn_name,
args,
alias,
} = agg_func;
let aggr_fn = self
.session_state
.aggregate_functions()
.get(fn_name)
.context(UnknownAggregateFunctionSnafu {
name: fn_name.to_string(),
})?;
let args = args
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
if let Some(alias) = alias {
Ok(aggr_fn.call(args).alias(alias))
} else {
Ok(aggr_fn.call(args))
}
})
.try_collect::<Vec<_>>()?;

let group_exprs = by
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let aggr_expr = aggr_fn.call(args);

Ok((aggr_expr, group_exprs))
}
Expand Down Expand Up @@ -490,21 +504,12 @@ impl LogQueryPlanner {
let mut plan_builder = plan_builder;

match expr {
LogExpr::AggrFunc {
name,
args,
by,
range: _range,
alias,
} => {
LogExpr::AggrFunc { expr, by } => {
let schema = plan_builder.schema();
let (mut aggr_expr, group_exprs) = self.build_aggr_func(schema, name, args, by)?;
if let Some(alias) = alias {
aggr_expr = aggr_expr.alias(alias);
}
let (aggr_expr, group_exprs) = self.build_aggr_func(schema, expr, by)?;

plan_builder = plan_builder
.aggregate(group_exprs, [aggr_expr.clone()])
.aggregate(group_exprs, aggr_expr)
.context(DataFusionPlanningSnafu)?;
}
LogExpr::Filter { filter } => {
Expand Down Expand Up @@ -917,11 +922,12 @@ mod tests {
context: Context::None,
columns: vec![],
exprs: vec![LogExpr::AggrFunc {
name: "count".to_string(),
args: vec![LogExpr::NamedIdent("message".to_string())],
expr: vec![AggFunc::new(
"count".to_string(),
vec![LogExpr::NamedIdent("message".to_string())],
Some("count_result".to_string()),
)],
by: vec![LogExpr::NamedIdent("host".to_string())],
range: None,
alias: Some("count_result".to_string()),
}],
};

Expand Down Expand Up @@ -1036,13 +1042,14 @@ mod tests {
alias: Some("2__date_histogram__time_bucket".to_string()),
},
LogExpr::AggrFunc {
name: "count".to_string(),
args: vec![LogExpr::PositionalIdent(0)],
expr: vec![AggFunc::new(
"count".to_string(),
vec![LogExpr::PositionalIdent(0)],
Some("count_result".to_string()),
)],
by: vec![LogExpr::NamedIdent(
"2__date_histogram__time_bucket".to_string(),
)],
range: None,
alias: Some("count_result".to_string()),
},
],
};
Expand Down
Loading