diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 8cd301612b..1605acf6c0 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -120,6 +120,7 @@ use datafusion_physical_expr::LexOrdering; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; +use parquet::arrow::ProjectionMask; use parquet::schema::parser::parse_message_type; use std::cmp::max; use std::{collections::HashMap, sync::Arc}; @@ -950,7 +951,6 @@ impl PhysicalPlanner { } OpStruct::NativeScan(scan) => { let data_schema = parse_message_type(&scan.data_schema).unwrap(); - let required_schema = parse_message_type(&scan.required_schema).unwrap(); let data_schema_descriptor = parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema)); @@ -959,11 +959,17 @@ impl PhysicalPlanner { .unwrap(), ); - let required_schema_descriptor = - parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema)); + // Add projection vector to FileScanConfig. + let projection_vector: Vec = scan + .projection_vector + .iter() + .map(|offset| *offset as usize) + .collect(); + let required_schema_arrow = Arc::new( - parquet::arrow::schema::parquet_to_arrow_schema( - &required_schema_descriptor, + parquet::arrow::schema::parquet_to_arrow_schema_by_columns( + &data_schema_descriptor, + ProjectionMask::leaves(&data_schema_descriptor, projection_vector.clone()), None, ) .unwrap(), @@ -979,7 +985,7 @@ impl PhysicalPlanner { // Create a conjunctive form of the vector because ParquetExecBuilder takes // a single expression let data_filters = data_filters?; - let test_data_filters = data_filters.clone().into_iter().reduce(|left, right| { + let cnf_data_filters = data_filters.into_iter().reduce(|left, right| { Arc::new(BinaryExpr::new( left, datafusion::logical_expr::Operator::And, @@ -1017,21 +1023,13 @@ impl PhysicalPlanner { // TODO: I think we can remove partition_count in the future, but leave for testing. assert_eq!(file_groups.len(), partition_count); + assert_eq!(projection_vector.len(), required_schema_arrow.fields.len()); + let object_store_url = ObjectStoreUrl::local_filesystem(); - let mut file_scan_config = + let file_scan_config = FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow)) - .with_file_groups(file_groups); - - // Check for projection, if so generate the vector and add to FileScanConfig. - let mut projection_vector: Vec = - Vec::with_capacity(required_schema_arrow.fields.len()); - // TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of. - required_schema_arrow.fields.iter().for_each(|field| { - projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap()); - }); - - assert_eq!(projection_vector.len(), required_schema_arrow.fields.len()); - file_scan_config = file_scan_config.with_projection(Some(projection_vector)); + .with_file_groups(file_groups) + .with_projection(Some(projection_vector)); let mut table_parquet_options = TableParquetOptions::new(); // TODO: Maybe these are configs? @@ -1041,7 +1039,7 @@ impl PhysicalPlanner { let mut builder = ParquetExecBuilder::new(file_scan_config) .with_table_parquet_options(table_parquet_options); - if let Some(filter) = test_data_filters { + if let Some(filter) = cnf_data_filters { builder = builder.with_predicate(filter); } diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index f50389dbe5..c54bc7cee5 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -74,8 +74,8 @@ message NativeScan { // is purely for informational purposes when viewing native query plans in // debug mode. string source = 2; - string required_schema = 3; - string data_schema = 4; + string data_schema = 3; + repeated int64 projection_vector = 4; repeated spark.spark_expression.Expr data_filters = 5; repeated SparkFilePartition file_partitions = 6; } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b8a780e608..741405b873 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -29,14 +29,13 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, Normalize import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition} import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils -import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometNativeScanExec, CometScanExec, CometSinkPlaceHolder, CometSparkToColumnarExec, DecimalPrecision} +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometScanExec, CometSinkPlaceHolder, CometSparkToColumnarExec, DecimalPrecision} import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec import org.apache.spark.sql.execution import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec} import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD} -import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} @@ -44,10 +43,12 @@ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.SerializableConfiguration import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isCometScan, isSpark34Plus, withInfo} import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, RegExp, Unsupported} +import org.apache.comet.parquet.FooterReader import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, BuildSide, JoinType, Operator} @@ -2507,23 +2508,22 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim partitions.foreach(p => { val inputPartitions = p.asInstanceOf[DataSourceRDDPartition].inputPartitions inputPartitions.foreach(partition => { - partition2Proto(partition.asInstanceOf[FilePartition], nativeScanBuilder) + partition2Proto(partition.asInstanceOf[FilePartition], nativeScanBuilder, scan) }) }) case rdd: FileScanRDD => rdd.filePartitions.foreach(partition => { - partition2Proto(partition, nativeScanBuilder) + partition2Proto(partition, nativeScanBuilder, scan) }) case _ => + assert(false) } - val requiredSchemaParquet = - new SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema) - val dataSchemaParquet = - new SparkToParquetSchemaConverter(conf).convert(scan.relation.dataSchema) + val projection_vector: Array[java.lang.Long] = scan.requiredSchema.fields.map(field => { + scan.relation.dataSchema.fieldIndex(field.name).toLong.asInstanceOf[java.lang.Long] + }) - nativeScanBuilder.setRequiredSchema(requiredSchemaParquet.toString) - nativeScanBuilder.setDataSchema(dataSchemaParquet.toString) + nativeScanBuilder.addAllProjectionVector(projection_vector.toIterable.asJava) Some(result.setNativeScan(nativeScanBuilder).build()) @@ -3191,9 +3191,34 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim private def partition2Proto( partition: FilePartition, - nativeScanBuilder: OperatorOuterClass.NativeScan.Builder): Unit = { + nativeScanBuilder: OperatorOuterClass.NativeScan.Builder, + scan: CometScanExec): Unit = { val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder() + val sparkContext = scan.session.sparkContext + var schema_saved: Boolean = false; partition.files.foreach(file => { + if (!schema_saved) { + // TODO: This code shouldn't be here, but for POC it's fine. + // Extract the schema and stash it. + val hadoopConf = + scan.relation.sparkSession.sessionState.newHadoopConfWithOptions(scan.relation.options) + val broadcastedHadoopConf = + sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + val sharedConf = broadcastedHadoopConf.value.value + val footer = FooterReader.readFooter(sharedConf, file) + val footerFileMetaData = footer.getFileMetaData + val schema = footerFileMetaData.getSchema + nativeScanBuilder.setDataSchema(schema.toString) + +// val clippedSchema = CometParquetReadSupport.clipParquetSchema( +// schema, +// scan.requiredSchema, +// scan.session.sessionState.conf.caseSensitiveAnalysis, +// CometParquetUtils.readFieldId(scan.session.sessionState.conf), +// CometParquetUtils.ignoreMissingIds(scan.session.sessionState.conf)) + + schema_saved = true + } val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() fileBuilder .setFilePath(file.pathUri.toString)