From 04c068d09c7863b0aade890a9752b1fb71225580 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 14 Nov 2024 16:42:33 -0500 Subject: [PATCH 1/2] DataSourceRDD handling (seems to be related to prefetching, so maybe not relevant for our ParquetExec). --- .../apache/comet/serde/QueryPlanSerde.scala | 58 +++++++++++++------ 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index cce48204ba..aab50e683e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -35,8 +35,9 @@ import org.apache.spark.sql.execution import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec} -import org.apache.spark.sql.execution.datasources.FileScanRDD +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD} import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.window.WindowExec @@ -2497,22 +2498,45 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim val dataFilters = scan.dataFilters.map(exprToProto(_, scan.output)) nativeScanBuilder.addAllDataFilters(dataFilters.map(_.get).asJava) - // Eventually we'll want to modify CometNativeScan to generate the file partitions - // for us without instantiating the RDD. - val file_partitions = scan.inputRDD.asInstanceOf[FileScanRDD].filePartitions; - file_partitions.foreach(partition => { - val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder() - partition.files.foreach(file => { - val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() - fileBuilder - .setFilePath(file.pathUri.toString) - .setStart(file.start) - .setLength(file.length) - .setFileSize(file.fileSize) - partitionBuilder.addPartitionedFile(fileBuilder.build()) - }) - nativeScanBuilder.addFilePartitions(partitionBuilder.build()) - }) + // TODO: modify CometNativeScan to generate the file partitions without instantiating RDD. + scan.inputRDD match { + case rdd: DataSourceRDD => + val partitions = rdd.partitions + // scalastyle:off println + partitions.foreach(p => { + val inputPartitions = p.asInstanceOf[DataSourceRDDPartition].inputPartitions + inputPartitions.foreach(f => { + val partition = f.asInstanceOf[FilePartition] + val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder() + partition.files.foreach(file => { + val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() + fileBuilder + .setFilePath(file.pathUri.toString) + .setStart(file.start) + .setLength(file.length) + .setFileSize(file.fileSize) + partitionBuilder.addPartitionedFile(fileBuilder.build()) + }) + nativeScanBuilder.addFilePartitions(partitionBuilder.build()) + }) + }) + // scalastyle:on println + case rdd: FileScanRDD => + rdd.filePartitions.foreach(partition => { + val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder() + partition.files.foreach(file => { + val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() + fileBuilder + .setFilePath(file.pathUri.toString) + .setStart(file.start) + .setLength(file.length) + .setFileSize(file.fileSize) + partitionBuilder.addPartitionedFile(fileBuilder.build()) + }) + nativeScanBuilder.addFilePartitions(partitionBuilder.build()) + }) + case _ => + } val requiredSchemaParquet = new SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema) From 3c1ea06ffc2198101310691a9c6cde4df9d85665 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 14 Nov 2024 16:56:19 -0500 Subject: [PATCH 2/2] Refactor to reduce duplicate code. --- .../apache/comet/serde/QueryPlanSerde.scala | 45 ++++++++----------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index aab50e683e..f9a2546693 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2502,38 +2502,15 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim scan.inputRDD match { case rdd: DataSourceRDD => val partitions = rdd.partitions - // scalastyle:off println partitions.foreach(p => { val inputPartitions = p.asInstanceOf[DataSourceRDDPartition].inputPartitions - inputPartitions.foreach(f => { - val partition = f.asInstanceOf[FilePartition] - val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder() - partition.files.foreach(file => { - val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() - fileBuilder - .setFilePath(file.pathUri.toString) - .setStart(file.start) - .setLength(file.length) - .setFileSize(file.fileSize) - partitionBuilder.addPartitionedFile(fileBuilder.build()) - }) - nativeScanBuilder.addFilePartitions(partitionBuilder.build()) + inputPartitions.foreach(partition => { + partition2Proto(partition.asInstanceOf[FilePartition], nativeScanBuilder) }) }) - // scalastyle:on println case rdd: FileScanRDD => rdd.filePartitions.foreach(partition => { - val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder() - partition.files.foreach(file => { - val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() - fileBuilder - .setFilePath(file.pathUri.toString) - .setStart(file.start) - .setLength(file.length) - .setFileSize(file.fileSize) - partitionBuilder.addPartitionedFile(fileBuilder.build()) - }) - nativeScanBuilder.addFilePartitions(partitionBuilder.build()) + partition2Proto(partition, nativeScanBuilder) }) case _ => } @@ -3209,4 +3186,20 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim true } + + private def partition2Proto( + partition: FilePartition, + nativeScanBuilder: OperatorOuterClass.NativeScan.Builder): Unit = { + val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder() + partition.files.foreach(file => { + val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() + fileBuilder + .setFilePath(file.pathUri.toString) + .setStart(file.start) + .setLength(file.length) + .setFileSize(file.fileSize) + partitionBuilder.addPartitionedFile(fileBuilder.build()) + }) + nativeScanBuilder.addFilePartitions(partitionBuilder.build()) + } }