Skip to content
Closed
38 changes: 18 additions & 20 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ use datafusion_physical_expr::LexOrdering;
use itertools::Itertools;
use jni::objects::GlobalRef;
use num::{BigInt, ToPrimitive};
use parquet::arrow::ProjectionMask;
use parquet::schema::parser::parse_message_type;
use std::cmp::max;
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -950,7 +951,6 @@ impl PhysicalPlanner {
}
OpStruct::NativeScan(scan) => {
let data_schema = parse_message_type(&scan.data_schema).unwrap();
let required_schema = parse_message_type(&scan.required_schema).unwrap();

let data_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
Expand All @@ -959,11 +959,17 @@ impl PhysicalPlanner {
.unwrap(),
);

let required_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
// Add projection vector to FileScanConfig.
let projection_vector: Vec<usize> = scan
.projection_vector
.iter()
.map(|offset| *offset as usize)
.collect();

let required_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&required_schema_descriptor,
parquet::arrow::schema::parquet_to_arrow_schema_by_columns(
&data_schema_descriptor,
ProjectionMask::leaves(&data_schema_descriptor, projection_vector.clone()),
None,
)
.unwrap(),
Expand All @@ -979,7 +985,7 @@ impl PhysicalPlanner {
// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
let data_filters = data_filters?;
let test_data_filters = data_filters.clone().into_iter().reduce(|left, right| {
let cnf_data_filters = data_filters.into_iter().reduce(|left, right| {
Arc::new(BinaryExpr::new(
left,
datafusion::logical_expr::Operator::And,
Expand Down Expand Up @@ -1017,21 +1023,13 @@ impl PhysicalPlanner {
// TODO: I think we can remove partition_count in the future, but leave for testing.
assert_eq!(file_groups.len(), partition_count);

assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());

let object_store_url = ObjectStoreUrl::local_filesystem();
let mut file_scan_config =
let file_scan_config =
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
.with_file_groups(file_groups);

// Check for projection, if so generate the vector and add to FileScanConfig.
let mut projection_vector: Vec<usize> =
Vec::with_capacity(required_schema_arrow.fields.len());
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
required_schema_arrow.fields.iter().for_each(|field| {
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
});

assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
file_scan_config = file_scan_config.with_projection(Some(projection_vector));
.with_file_groups(file_groups)
.with_projection(Some(projection_vector));

let mut table_parquet_options = TableParquetOptions::new();
// TODO: Maybe these are configs?
Expand All @@ -1041,7 +1039,7 @@ impl PhysicalPlanner {
let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options);

if let Some(filter) = test_data_filters {
if let Some(filter) = cnf_data_filters {
builder = builder.with_predicate(filter);
}

Expand Down
4 changes: 2 additions & 2 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ message NativeScan {
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
string required_schema = 3;
string data_schema = 4;
string data_schema = 3;
repeated int64 projection_vector = 4;
repeated spark.spark_expression.Expr data_filters = 5;
repeated SparkFilePartition file_partitions = 6;
}
Expand Down
47 changes: 36 additions & 11 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,26 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, Normalize
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometNativeScanExec, CometScanExec, CometSinkPlaceHolder, CometSparkToColumnarExec, DecimalPrecision}
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometScanExec, CometSinkPlaceHolder, CometSparkToColumnarExec, DecimalPrecision}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD}
import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.{isCometScan, isSpark34Plus, withInfo}
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, RegExp, Unsupported}
import org.apache.comet.parquet.FooterReader
import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc}
import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo}
import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, BuildSide, JoinType, Operator}
Expand Down Expand Up @@ -2507,23 +2508,22 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
partitions.foreach(p => {
val inputPartitions = p.asInstanceOf[DataSourceRDDPartition].inputPartitions
inputPartitions.foreach(partition => {
partition2Proto(partition.asInstanceOf[FilePartition], nativeScanBuilder)
partition2Proto(partition.asInstanceOf[FilePartition], nativeScanBuilder, scan)
})
})
case rdd: FileScanRDD =>
rdd.filePartitions.foreach(partition => {
partition2Proto(partition, nativeScanBuilder)
partition2Proto(partition, nativeScanBuilder, scan)
})
case _ =>
assert(false)
}

val requiredSchemaParquet =
new SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema)
val dataSchemaParquet =
new SparkToParquetSchemaConverter(conf).convert(scan.relation.dataSchema)
val projection_vector: Array[java.lang.Long] = scan.requiredSchema.fields.map(field => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change essentially means that any schema 'adaptation' made in SparkToParquetSchemaConverter.convert to support legacy timestamps and decimals will not be supported. But we will probably fail tests with incorrect results.
Also, Comet's Parquet file reader uses CometParquetReadSupport.clipParquetSchema to do similar conversion and it includes support for Parquet field_id which is desirable for delta sources like Iceberg.
Basically a field_id, if present, identifies a field more precisely (in the event of field name changes) in a schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SchemaConverter seems like it could be handled in DF's SchemaAdapter. I'll look at clipParquetSchema as well, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you just need Arrow types, can you just convert Spark types to Arrow types? For example, if the column in Spark is treated as timestamp type, its Arrow type is timestamp too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that...

  1. Java side parses Parquet metadata, generates a Spark schema
  2. Java side converts Spark schema to Arrow schema (following Comet conversion rules)
  3. Serialize Arrow types, native side feeds this into ParquetExec as the data schema

...may yield different results than:

  1. Java side serializes original Parquet metadata
  2. Serialize schema message
  3. Native side parses message, generates Arrow schema and feeds this into ParquetExec as the data schema

I guess I could exhaustively test this hypothesis with all types.

scan.relation.dataSchema.fieldIndex(field.name).toLong.asInstanceOf[java.lang.Long]
})

nativeScanBuilder.setRequiredSchema(requiredSchemaParquet.toString)
nativeScanBuilder.setDataSchema(dataSchemaParquet.toString)
nativeScanBuilder.addAllProjectionVector(projection_vector.toIterable.asJava)

Some(result.setNativeScan(nativeScanBuilder).build())

Expand Down Expand Up @@ -3191,9 +3191,34 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim

private def partition2Proto(
partition: FilePartition,
nativeScanBuilder: OperatorOuterClass.NativeScan.Builder): Unit = {
nativeScanBuilder: OperatorOuterClass.NativeScan.Builder,
scan: CometScanExec): Unit = {
val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder()
val sparkContext = scan.session.sparkContext
var schema_saved: Boolean = false;
partition.files.foreach(file => {
if (!schema_saved) {
// TODO: This code shouldn't be here, but for POC it's fine.
// Extract the schema and stash it.
val hadoopConf =
scan.relation.sparkSession.sessionState.newHadoopConfWithOptions(scan.relation.options)
val broadcastedHadoopConf =
sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val sharedConf = broadcastedHadoopConf.value.value
val footer = FooterReader.readFooter(sharedConf, file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. This can never be in production code. For one, this is expensive.

Copy link
Contributor Author

@mbutrovich mbutrovich Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory it's just replacing the call that currently takes place during the fileReader instantiation, but yeah I'm still curious if it's already cached somewhere. I see references within Spark to a footersCache so I'm curious to look for that as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about a footersCache in Spark. Could you share a link maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks. I don't think we ever travel this path.

val footerFileMetaData = footer.getFileMetaData
val schema = footerFileMetaData.getSchema
nativeScanBuilder.setDataSchema(schema.toString)

// val clippedSchema = CometParquetReadSupport.clipParquetSchema(
// schema,
// scan.requiredSchema,
// scan.session.sessionState.conf.caseSensitiveAnalysis,
// CometParquetUtils.readFieldId(scan.session.sessionState.conf),
// CometParquetUtils.ignoreMissingIds(scan.session.sessionState.conf))

schema_saved = true
}
val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder()
fileBuilder
.setFilePath(file.pathUri.toString)
Expand Down
Loading