Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
706 changes: 427 additions & 279 deletions native/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ edition = "2021"
rust-version = "1.85"

[workspace.dependencies]
arrow = { version = "54.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow = { version = "55.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.10.0" }
parquet = { version = "54.2.0", default-features = false, features = ["experimental"] }
datafusion = { version = "46.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
parquet = { version = "55.0.0", default-features = false, features = ["experimental"] }
datafusion = { git = "https://github.com/apache/datafusion", rev = "47.0.0-rc1", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.8.0" }
datafusion-comet-proto = { path = "proto", version = "0.8.0" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand All @@ -48,7 +48,7 @@ num = "0.4"
rand = "0.8"
regex = "1.9.6"
thiserror = "1"
object_store = { version = "0.11.0", features = ["gcp", "azure", "aws", "http"] }
object_store = { version = "0.12.0", features = ["gcp", "azure", "aws", "http"] }
url = "2.2"

[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jni = { version = "0.21", features = ["invocation"] }
lazy_static = "1.4"
assertables = "7"
hex = "0.4.3"
datafusion-functions-nested = "46.0.0"
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "47.0.0-rc1" }

[features]
default = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::common::{internal_err, Result, ScalarValue};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ColumnarValue;
use std::fmt::Formatter;
use std::hash::Hash;
use std::{any::Any, fmt::Display, sync::Arc};

Expand Down Expand Up @@ -140,4 +141,8 @@ impl PhysicalExpr for BloomFilterMightContain {
Arc::clone(&children[1]),
)?))
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}
Comment on lines +145 to +147
Copy link
Member Author

@andygrove andygrove Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion now supports a new EXPLAIN output, but this is not exposed in Comet, so we do not need to implement these new methods.

}
4 changes: 4 additions & 0 deletions native/core/src/execution/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl PhysicalExpr for Subquery {
self
}

fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
unimplemented!()
}

fn data_type(&self, _: &Schema) -> datafusion::common::Result<DataType> {
Ok(self.data_type.clone())
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl DisplayAs for CopyExec {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CopyExec [{:?}]", self.mode)
}
DisplayFormatType::TreeRender => unimplemented!(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl DisplayAs for ExpandExec {

Ok(())
}
DisplayFormatType::TreeRender => unimplemented!(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl DisplayAs for FilterExec {
self.predicate, display_projections
)
}
DisplayFormatType::TreeRender => unimplemented!(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ impl DisplayAs for ScanExec {
.collect();
write!(f, "schema=[{}]", fields.join(", "))?;
}
DisplayFormatType::TreeRender => unimplemented!(),
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl DisplayAs for ShuffleWriterExec {
self.partitioning, self.enable_fast_encoding, self.codec
)
}
DisplayFormatType::TreeRender => unimplemented!(),
}
}
}
Expand Down
46 changes: 29 additions & 17 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use arrow::datatypes::{Field, SchemaRef};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, FileSource, ParquetSource};
use datafusion::datasource::physical_plan::{
FileGroup, FileScanConfigBuilder, FileSource, ParquetSource,
};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::physical_expr::expressions::BinaryExpr;
Expand Down Expand Up @@ -80,23 +82,33 @@ pub(crate) fn init_datasource_exec(
parquet_source = parquet_source.with_predicate(Arc::clone(data_schema), filter);
}
}

let file_groups = file_groups
.iter()
.map(|files| FileGroup::new(files.clone()))
.collect();

let file_scan_config = match (data_schema, projection_vector, partition_fields) {
(Some(data_schema), Some(projection_vector), Some(partition_fields)) => get_file_config(
data_schema,
partition_schema,
file_groups,
object_store_url,
Arc::new(parquet_source),
)
.with_projection(Some(projection_vector))
.with_table_partition_cols(partition_fields),
_ => get_file_config(
(Some(data_schema), Some(projection_vector), Some(partition_fields)) => {
get_file_config_builder(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this now uses a builder

data_schema,
partition_schema,
file_groups,
object_store_url,
Arc::new(parquet_source),
)
.with_projection(Some(projection_vector))
.with_table_partition_cols(partition_fields)
.build()
}
_ => get_file_config_builder(
required_schema,
partition_schema,
file_groups,
object_store_url,
Arc::new(parquet_source),
),
)
.build(),
};

Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
Expand All @@ -113,13 +125,13 @@ fn get_options(session_timezone: &str) -> (TableParquetOptions, SparkParquetOpti
(table_parquet_options, spark_parquet_options)
}

fn get_file_config(
fn get_file_config_builder(
schema: SchemaRef,
partition_schema: Option<SchemaRef>,
file_groups: Vec<Vec<PartitionedFile>>,
file_groups: Vec<FileGroup>,
object_store_url: ObjectStoreUrl,
file_source: Arc<dyn FileSource>,
) -> FileScanConfig {
) -> FileScanConfigBuilder {
match partition_schema {
Some(partition_schema) => {
let partition_fields: Vec<Field> = partition_schema
Expand All @@ -129,11 +141,11 @@ fn get_file_config(
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
})
.collect_vec();
FileScanConfig::new(object_store_url, Arc::clone(&schema), file_source)
FileScanConfigBuilder::new(object_store_url, Arc::clone(&schema), file_source)
.with_file_groups(file_groups)
.with_table_partition_cols(partition_fields)
}
_ => FileScanConfig::new(object_store_url, Arc::clone(&schema), file_source)
_ => FileScanConfigBuilder::new(object_store_url, Arc::clone(&schema), file_source)
.with_file_groups(file_groups),
}
}
80 changes: 7 additions & 73 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Custom schema adapter that uses Spark-compatible conversions

use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
use arrow::array::{new_null_array, Array, RecordBatch, RecordBatchOptions};
use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
use datafusion::physical_plan::ColumnarValue;
Expand Down Expand Up @@ -50,11 +50,10 @@ impl SchemaAdapterFactory for SparkSchemaAdapterFactory {
fn create(
&self,
required_schema: SchemaRef,
table_schema: SchemaRef,
_table_schema: SchemaRef,
) -> Box<dyn SchemaAdapter> {
Box::new(SparkSchemaAdapter {
required_schema,
table_schema,
parquet_options: self.parquet_options.clone(),
})
}
Expand All @@ -67,12 +66,6 @@ pub struct SparkSchemaAdapter {
/// The schema for the table, projected to include only the fields being output (projected) by the
/// associated ParquetExec
required_schema: SchemaRef,
/// The entire table schema for the table we're using this to adapt.
///
/// This is used to evaluate any filters pushed down into the scan
/// which may refer to columns that are not referred to anywhere
/// else in the plan.
table_schema: SchemaRef,
/// Spark cast options
parquet_options: SparkParquetOptions,
}
Expand Down Expand Up @@ -139,7 +132,6 @@ impl SchemaAdapter for SparkSchemaAdapter {
Arc::new(SchemaMapping {
required_schema: Arc::<Schema>::clone(&self.required_schema),
field_mappings,
table_schema: Arc::<Schema>::clone(&self.table_schema),
parquet_options: self.parquet_options.clone(),
}),
projection,
Expand Down Expand Up @@ -186,11 +178,6 @@ pub struct SchemaMapping {
/// They are Options instead of just plain `usize`s because the table could
/// have fields that don't exist in the file.
field_mappings: Vec<Option<usize>>,
/// The entire table schema, as opposed to the projected_table_schema (which
/// only contains the columns that we are projecting out of this query).
/// This contains all fields in the table, regardless of if they will be
/// projected out or not.
table_schema: SchemaRef,
/// Spark cast options
parquet_options: SparkParquetOptions,
}
Expand Down Expand Up @@ -239,59 +226,6 @@ impl SchemaMapper for SchemaMapping {
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}

/// Adapts a [`RecordBatch`]'s schema into one that has all the correct output types and only
/// contains the fields that exist in both the file schema and table schema.
///
/// Unlike `map_batch` this method also preserves the columns that
/// may not appear in the final output (`projected_table_schema`) but may
/// appear in push down predicates
fn map_partial_batch(&self, batch: RecordBatch) -> datafusion::common::Result<RecordBatch> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map_partial_batch has been removed in DataFusion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbutrovich @parthchandra fyi, I am not sure of the impact, but I figured I would first see if any tests fail in CI

let batch_cols = batch.columns().to_vec();
let schema = batch.schema();

// for each field in the batch's schema (which is based on a file, not a table)...
let (cols, fields) = schema
.fields()
.iter()
.zip(batch_cols.iter())
.flat_map(|(field, batch_col)| {
self.table_schema
.fields()
.iter()
.enumerate()
.find(|(_, b)| {
if self.parquet_options.case_sensitive {
b.name() == field.name()
} else {
b.name().to_lowercase() == field.name().to_lowercase()
}
})
// but if we do have it,
.map(|(_, table_field)| {
// try to cast it into the correct output type. we don't want to ignore this
// error, though, so it's propagated.
spark_parquet_convert(
ColumnarValue::Array(Arc::clone(batch_col)),
table_field.data_type(),
&self.parquet_options,
)?
.into_array(batch_col.len())
// and if that works, return the field and column.
.map(|new_col| (new_col, table_field.as_ref().clone()))
})
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()));
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}

#[cfg(test)]
Expand All @@ -306,7 +240,7 @@ mod test {
use datafusion::common::config::TableParquetOptions;
use datafusion::common::DataFusionError;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::TaskContext;
Expand Down Expand Up @@ -378,11 +312,11 @@ mod test {
)),
);

let files = FileGroup::new(vec![PartitionedFile::from_path(filename.to_string())?]);
let file_scan_config =
FileScanConfig::new(object_store_url, required_schema, parquet_source)
.with_file_groups(vec![vec![PartitionedFile::from_path(
filename.to_string(),
)?]]);
FileScanConfigBuilder::new(object_store_url, required_schema, parquet_source)
.with_file_groups(vec![files])
.build();

let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config));

Expand Down
Loading
Loading