Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 135 additions & 74 deletions native/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ arrow = { version = "54.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.10.0" }
parquet = { version = "54.2.0", default-features = false, features = ["experimental"] }
datafusion = { version = "46.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion = { git = "https://github.com/apache/datafusion", rev = "7c902def35601a003f91744ba2829eb451e792d7", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.8.0" }
datafusion-comet-proto = { path = "proto", version = "0.8.0" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand Down
2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jni = { version = "0.21", features = ["invocation"] }
lazy_static = "1.4"
assertables = "7"
hex = "0.4.3"
datafusion-functions-nested = "46.0.0"
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "7c902def35601a003f91744ba2829eb451e792d7" }

[features]
default = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::common::{internal_err, Result, ScalarValue};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ColumnarValue;
use std::fmt::Formatter;
use std::hash::Hash;
use std::{any::Any, fmt::Display, sync::Arc};

Expand Down Expand Up @@ -140,4 +141,8 @@ impl PhysicalExpr for BloomFilterMightContain {
Arc::clone(&children[1]),
)?))
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}
Comment on lines +145 to +147
Copy link
Member Author

@andygrove andygrove Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion now supports a new EXPLAIN output, but this is not exposed in Comet, so we do not need to implement these new methods.

}
4 changes: 4 additions & 0 deletions native/core/src/execution/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl PhysicalExpr for Subquery {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, _: &Schema) -> datafusion::common::Result<DataType> {
Ok(self.data_type.clone())
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl DisplayAs for CopyExec {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CopyExec [{:?}]", self.mode)
}
DisplayFormatType::TreeRender => unimplemented!(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl DisplayAs for ExpandExec {

Ok(())
}
DisplayFormatType::TreeRender => unimplemented!(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl DisplayAs for FilterExec {
self.predicate, display_projections
)
}
DisplayFormatType::TreeRender => unimplemented!(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ impl DisplayAs for ScanExec {
.collect();
write!(f, "schema=[{}]", fields.join(", "))?;
}
DisplayFormatType::TreeRender => unimplemented!(),
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl DisplayAs for ShuffleWriterExec {
self.partitioning, self.enable_fast_encoding, self.codec
)
}
DisplayFormatType::TreeRender => unimplemented!(),
}
}
}
Expand Down
53 changes: 1 addition & 52 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Custom schema adapter that uses Spark-compatible conversions

use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
use arrow::array::{new_null_array, Array, RecordBatch, RecordBatchOptions};
use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
use datafusion::physical_plan::ColumnarValue;
Expand Down Expand Up @@ -216,57 +216,6 @@ impl SchemaMapper for SchemaMapping {
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}

/// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only
/// contains the fields that exist in both the file schema and table schema.
///
/// Unlike `map_batch` this method also preserves the columns that
/// may not appear in the final output (`projected_table_schema`) but may
/// appear in push down predicates
fn map_partial_batch(&self, batch: RecordBatch) -> datafusion::common::Result<RecordBatch> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map_partial_batch has been removed in DataFusion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbutrovich @parthchandra fyi, I am not sure of the impact, but I figured I would first see if any tests fail in CI

let batch_cols = batch.columns().to_vec();
let schema = batch.schema();

// for each field in the batch's schema (which is based on a file, not a table)...
let (cols, fields) = schema
.fields()
.iter()
.zip(batch_cols.iter())
.flat_map(|(field, batch_col)| {
self.table_schema
// try to get the same field from the table schema that we have stored in self
.field_with_name(field.name())
// and if we don't have it, that's fine, ignore it. This may occur when we've
// created an external table whose fields are a subset of the fields in this
// file, then tried to read data from the file into this table. If that is the
// case here, it's fine to ignore because we don't care about this field
// anyways
.ok()
// but if we do have it,
.map(|table_field| {
// try to cast it into the correct output type. we don't want to ignore this
// error, though, so it's propagated.
spark_parquet_convert(
ColumnarValue::Array(Arc::clone(batch_col)),
table_field.data_type(),
&self.parquet_options,
)?
.into_array(batch_col.len())
// and if that works, return the field and column.
.map(|new_col| (new_col, table_field.clone()))
})
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/array_funcs/array_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ impl PhysicalExpr for ArrayInsert {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
self.array_type(&self.src_array_expr.data_type(input_schema)?)
}
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/array_funcs/get_array_struct_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ impl PhysicalExpr for GetArrayStructFields {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
let struct_field = self.child_field(input_schema)?;
match self.child.data_type(input_schema)? {
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/array_funcs/list_extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ impl PhysicalExpr for ListExtract {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
Ok(self.child_field(input_schema)?.data_type().clone())
}
Expand Down
5 changes: 5 additions & 0 deletions native/spark-expr/src/bitwise_funcs/bitwise_not.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow::{
use datafusion::common::Result;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::{error::DataFusionError, logical_expr::ColumnarValue};
use std::fmt::Formatter;
use std::hash::Hash;
use std::{any::Any, sync::Arc};

Expand Down Expand Up @@ -121,6 +122,10 @@ impl PhysicalExpr for BitwiseNotExpr {
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(BitwiseNotExpr::new(Arc::clone(&children[0]))))
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}
}

pub fn bitwise_not(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
Expand Down
7 changes: 4 additions & 3 deletions native/spark-expr/src/comet_scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::{
ScalarFunctionImplementation, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
ScalarFunctionArgs, ScalarFunctionImplementation, ScalarUDF, ScalarUDFImpl, Signature,
Volatility,
};
use datafusion::physical_plan::ColumnarValue;
use std::any::Any;
Expand Down Expand Up @@ -197,7 +198,7 @@ impl ScalarUDFImpl for CometScalarFunction {
Ok(self.data_type.clone())
}

fn invoke(&self, args: &[ColumnarValue]) -> DataFusionResult<ColumnarValue> {
(self.func)(args)
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
(self.func)(&args.args)
}
}
5 changes: 5 additions & 0 deletions native/spark-expr/src/conditional_funcs/if_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::{
use datafusion::common::Result;
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr::{expressions::CaseExpr, PhysicalExpr};
use std::fmt::Formatter;
use std::hash::Hash;
use std::{any::Any, sync::Arc};

Expand Down Expand Up @@ -87,6 +88,10 @@ impl PhysicalExpr for IfExpr {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
let data_type = self.true_expr.data_type(input_schema)?;
Ok(data_type)
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/conversion_funcs/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1731,6 +1731,10 @@ impl PhysicalExpr for Cast {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, _: &Schema) -> DataFusionResult<DataType> {
Ok(self.data_type.clone())
}
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/datetime_funcs/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ impl PhysicalExpr for DateTruncExpr {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> {
self.child.data_type(input_schema)
}
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/datetime_funcs/hour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl PhysicalExpr for HourExpr {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> {
match self.child.data_type(input_schema).unwrap() {
DataType::Dictionary(key_type, _) => {
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/datetime_funcs/minute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl PhysicalExpr for MinuteExpr {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> {
match self.child.data_type(input_schema).unwrap() {
DataType::Dictionary(key_type, _) => {
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/datetime_funcs/second.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl PhysicalExpr for SecondExpr {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> {
match self.child.data_type(input_schema).unwrap() {
DataType::Dictionary(key_type, _) => {
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/datetime_funcs/timestamp_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ impl PhysicalExpr for TimestampTruncExpr {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> {
match self.child.data_type(input_schema)? {
DataType::Dictionary(key_type, _) => Ok(DataType::Dictionary(
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/json_funcs/to_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl PhysicalExpr for ToJson {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, _: &Schema) -> Result<DataType> {
Ok(DataType::Utf8)
}
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/math_funcs/internal/checkoverflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl PhysicalExpr for CheckOverflow {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, _: &Schema) -> datafusion::common::Result<DataType> {
Ok(self.data_type.clone())
}
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/src/math_funcs/internal/normalize_nan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ impl PhysicalExpr for NormalizeNaNAndZero {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> {
self.child.data_type(input_schema)
}
Expand Down
5 changes: 5 additions & 0 deletions native/spark-expr/src/math_funcs/negative.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::{
logical_expr::{interval_arithmetic::Interval, ColumnarValue},
physical_expr::PhysicalExpr,
};
use std::fmt::Formatter;
use std::hash::Hash;
use std::{any::Any, sync::Arc};

Expand Down Expand Up @@ -258,4 +259,8 @@ impl PhysicalExpr for NegativeExpr {
let properties = children[0].clone().with_order(children[0].sort_properties);
Ok(properties)
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}
}
4 changes: 4 additions & 0 deletions native/spark-expr/src/predicate_funcs/rlike.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,8 @@ impl PhysicalExpr for RLike {
&self.pattern_str,
)?))
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}
}
8 changes: 5 additions & 3 deletions native/spark-expr/src/string_funcs/chr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use arrow::{
};

use datafusion::common::{cast::as_int64_array, exec_err, Result, ScalarValue};
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};

fn chr(args: &[ArrayRef]) -> Result<ArrayRef> {
let integer_array = as_int64_array(&args[0])?;
Expand Down Expand Up @@ -91,8 +93,8 @@ impl ScalarUDFImpl for SparkChrFunc {
Ok(Utf8)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
spark_chr(args)
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
spark_chr(&args.args)
}
}

Expand Down
Loading
Loading