diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index d9f3e1b7196d..cef11525921c 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -894,7 +894,7 @@ pub async fn from_substrait_rex( Some(RexType::ScalarFunction(f)) => { let fn_name = extensions.get(&f.function_reference).ok_or_else(|| { DataFusionError::NotImplemented(format!( - "Aggregated function not found: function reference = {:?}", + "Scalar function not found: function reference = {:?}", f.function_reference )) })?; @@ -912,8 +912,8 @@ pub async fn from_substrait_rex( Some(ArgType::Value(e)) => { from_substrait_rex(ctx, e, input_schema, extensions).await } - _ => not_impl_err!( - "Aggregated function argument non-Value type not supported" + t => not_impl_err!( + "Only Value arguments are supported for scalar functions, got: {t:?}" ), }?; args.push(arg_expr.as_ref().clone()); @@ -938,7 +938,7 @@ pub async fn from_substrait_rex( ScalarFunctionType::Op(op) => { if f.arguments.len() != 2 { return not_impl_err!( - "Expect two arguments for binary operator {op:?}" + "Expected two arguments for binary operator {op:?}" ); } let lhs = &f.arguments[0].arg_type; @@ -1464,13 +1464,15 @@ fn from_substrait_literal_with_type( }?; let builder = s.fields.iter().zip(fields).try_fold( ScalarStructBuilder::new(), - |b, (sv, field)| -> Result<_> { + |b, (lit, field)| -> Result<_> { let sv = from_substrait_literal_with_type( - sv, + lit, field.to_owned().map(|f| f.data_type().to_owned()).as_ref(), )?; Ok(b.with_scalar( - field.unwrap_or_else(|| Field::new("col", sv.data_type(), true)), + field.unwrap_or_else(|| { + Field::new("col", sv.data_type(), lit.nullable) + }), sv, )) },