Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
| Expr::Lambda(_)
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(TreeNodeRecursion::Stop)
Expand Down
93 changes: 92 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ pub enum Expr {
OuterReferenceColumn(DataType, Column),
/// Unnest expression
Unnest(Unnest),
/// Lambda expression
Lambda(LambdaFunction),
}

impl Default for Expr {
Expand Down Expand Up @@ -1466,6 +1468,7 @@ impl Expr {
#[expect(deprecated)]
Expr::Wildcard { .. } => "Wildcard",
Expr::Unnest { .. } => "Unnest",
Expr::Lambda(..) => "LambdaFunction",
}
}

Expand Down Expand Up @@ -2020,7 +2023,8 @@ impl Expr {
| Expr::Wildcard { .. }
| Expr::WindowFunction(..)
| Expr::Literal(..)
| Expr::Placeholder(..) => false,
| Expr::Placeholder(..)
| Expr::Lambda(..) => false,
}
}

Expand Down Expand Up @@ -2607,6 +2611,12 @@ impl HashNode for Expr {
column.hash(state);
}
Expr::Unnest(Unnest { expr: _expr }) => {}
Expr::Lambda(LambdaFunction {
params: arguments,
body: _body,
}) => {
arguments.hash(state);
}
};
}
}
Expand Down Expand Up @@ -2911,6 +2921,17 @@ impl Display for SchemaDisplay<'_> {
}
}
}
Expr::Lambda(LambdaFunction {
params: arguments,
body,
}) => {
write!(
f,
"({arguments}) -> {body}",
arguments = arguments.join(", "),
body = SchemaDisplay(body)
)
}
}
}
}
Expand Down Expand Up @@ -3156,6 +3177,41 @@ fn schema_name_from_exprs_inner(exprs: &[Expr], sep: &str) -> Result<String, fmt
Ok(s)
}

/// Creates a schema name from a slice of expression references.
///
/// This function generates a comma-separated string representation of expressions
/// suitable for use in schema names. It's particularly useful for functions that
/// work with lambda expressions where argument names need to be preserved.
///
/// # Arguments
/// * `exprs` - A slice of expression references to convert to schema names
///
/// # Returns
/// A comma-separated string representation of the expressions
pub fn schema_name_from_exprs_ref(exprs: &[&Expr]) -> Result<String, fmt::Error> {
schema_name_from_exprs_inner_ref(exprs, ", ")
}

/// Internal helper function for creating schema names with custom separator.
///
/// # Arguments
/// * `exprs` - A slice of expression references
/// * `sep` - The separator to use between expressions
fn schema_name_from_exprs_inner_ref(
exprs: &[&Expr],
sep: &str,
) -> Result<String, fmt::Error> {
let mut s = String::new();
for (i, e) in exprs.iter().enumerate() {
if i > 0 {
write!(&mut s, "{sep}")?;
}
write!(&mut s, "{}", SchemaDisplay(e))?;
}

Ok(s)
}

pub fn schema_name_from_sorts(sorts: &[Sort]) -> Result<String, fmt::Error> {
let mut s = String::new();
for (i, e) in sorts.iter().enumerate() {
Expand Down Expand Up @@ -3393,10 +3449,45 @@ impl Display for Expr {
Expr::Unnest(Unnest { expr }) => {
write!(f, "{UNNEST_COLUMN_PREFIX}({expr})")
}
Expr::Lambda(LambdaFunction { params, body }) => {
write!(
f,
"({params}) -> {body}",
params = params.join(", "),
body = SchemaDisplay(body)
)
}
}
}
}

/// Represents a lambda function expression with parameters and a body.
///
/// Lambda functions are anonymous functions that can be used in higher-order
/// functions like `array_filter`. They consist of parameter names and an
/// expression body that can reference those parameters.
///
/// # Example
/// In SQL: `x -> x > 3` represents a lambda with parameter `x` and body `x > 3`
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct LambdaFunction {
/// The parameter names for this lambda function
pub params: Vec<String>,
/// The expression body that references the parameters
pub body: Box<Expr>,
}

impl LambdaFunction {
/// Creates a new lambda function with the given parameters and body.
///
/// # Arguments
/// * `params` - The parameter names for the lambda function
/// * `body` - The expression body that can reference the parameters
pub fn new(params: Vec<String>, body: Box<Expr>) -> Self {
Self { params, body }
}
}

fn fmt_function(
f: &mut Formatter,
fun: &str,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl ExprSchemable for Expr {
// Grouping sets do not really have a type and do not appear in projections
Ok(DataType::Null)
}
Expr::Lambda(..) => Ok(DataType::Null),
}
}

Expand Down Expand Up @@ -342,6 +343,7 @@ impl ExprSchemable for Expr {
// in projections
Ok(true)
}
Expr::Lambda(..) => Ok(true),
}
}

Expand Down Expand Up @@ -559,6 +561,7 @@ impl ExprSchemable for Expr {
| Expr::Wildcard { .. }
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::Lambda(..)
| Expr::Unnest(_) => Ok(Arc::new(Field::new(
&schema_name,
self.get_type(schema)?,
Expand Down
66 changes: 66 additions & 0 deletions datafusion/expr/src/lambda.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fmt::Debug;

use arrow::array::RecordBatch;
use datafusion_common::{DFSchema, Result};
use datafusion_expr_common::columnar_value::ColumnarValue;

use crate::expr::LambdaFunction;

/// Trait for planning lambda functions into their physical representation.
///
/// This trait is implemented by query planners to convert logical lambda expressions
/// into executable physical lambda functions that can be evaluated at runtime.
pub trait LambdaPlanner {
/// Plans a logical lambda function into a physical lambda implementation.
///
/// # Arguments
/// * `lambda` - The logical lambda function to plan
/// * `df_schema` - The schema context for the lambda function
///
/// # Returns
/// A boxed physical lambda that can be executed
fn plan_lambda(
&self,
lambda: &LambdaFunction,
df_schema: &DFSchema,
) -> Result<Box<dyn PhysicalLambda>>;
}

/// Trait for physical lambda functions that can be executed on record batches.
///
/// Physical lambda functions are the runtime representation of lambda expressions
/// that have been planned and optimized for execution. They can evaluate lambda
/// logic against columnar data in record batches.
pub trait PhysicalLambda: Send + Sync + Debug {
/// Returns the parameter names for this lambda function.
///
/// # Returns
/// A slice of parameter names that this lambda expects
fn params(&self) -> &[String];

/// Evaluates the lambda function against a record batch.
///
/// # Arguments
/// * `batch` - The record batch containing the input data
///
/// # Returns
/// The result of evaluating the lambda function as a columnar value
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
}
2 changes: 2 additions & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
//!
//! The [expr_fn] module contains functions for creating expressions.

mod lambda;
mod literal;
mod operation;
mod partition_evaluator;
Expand Down Expand Up @@ -97,6 +98,7 @@ pub use function::{
AccumulatorFactoryFunction, PartitionEvaluatorFactory, ReturnTypeFunction,
ScalarFunctionImplementation, StateTypeFunction,
};
pub use lambda::{LambdaPlanner, PhysicalLambda};
pub use literal::{
lit, lit_timestamp_nano, lit_with_metadata, Literal, TimestampLiteral,
};
Expand Down
13 changes: 11 additions & 2 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

use crate::expr::{
AggregateFunction, AggregateFunctionParams, Alias, Between, BinaryExpr, Case, Cast,
GroupingSet, InList, InSubquery, Like, Placeholder, ScalarFunction, TryCast, Unnest,
WindowFunction, WindowFunctionParams,
GroupingSet, InList, InSubquery, LambdaFunction, Like, Placeholder, ScalarFunction,
TryCast, Unnest, WindowFunction, WindowFunctionParams,
};
use crate::{Expr, ExprFunctionExt};

Expand Down Expand Up @@ -105,6 +105,9 @@ impl TreeNode for Expr {
Expr::InList(InList { expr, list, .. }) => {
(expr, list).apply_ref_elements(f)
}
Expr::Lambda(LambdaFunction { body, .. }) => {
body.apply_elements(f)
}
}
}

Expand Down Expand Up @@ -312,6 +315,12 @@ impl TreeNode for Expr {
.update_data(|(new_expr, new_list)| {
Expr::InList(InList::new(new_expr, new_list, negated))
}),
Expr::Lambda(LambdaFunction {
params: arguments,
body,
}) => body.map_elements(f)?.update_data(|new_body| {
Expr::Lambda(LambdaFunction::new(arguments, new_body))
}),
})
}
}
Loading