Skip to content

Commit eafda43

Browse files
authored
[comet-parquet-exec] Pass Spark's partitions to DF's ParquetExec (#1081)
* I think serde works. Gonna try removing the old stuff. * Fixes after merging in upstream. * Remove previous file_config logic. Clippy. * Temporary assertion for testing. * Remove old path proto value. * Selectively generate projection vector.
1 parent 33d2b23 commit eafda43

File tree

3 files changed

+76
-41
lines changed

3 files changed

+76
-41
lines changed

native/core/src/execution/datafusion/planner.rs

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -949,8 +949,8 @@ impl PhysicalPlanner {
949949
))
950950
}
951951
OpStruct::NativeScan(scan) => {
952-
let data_schema = parse_message_type(&*scan.data_schema).unwrap();
953-
let required_schema = parse_message_type(&*scan.required_schema).unwrap();
952+
let data_schema = parse_message_type(&scan.data_schema).unwrap();
953+
let required_schema = parse_message_type(&scan.required_schema).unwrap();
954954

955955
let data_schema_descriptor =
956956
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
@@ -968,16 +968,6 @@ impl PhysicalPlanner {
968968
)
969969
.unwrap(),
970970
);
971-
assert!(!required_schema_arrow.fields.is_empty());
972-
973-
let mut projection_vector: Vec<usize> =
974-
Vec::with_capacity(required_schema_arrow.fields.len());
975-
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
976-
required_schema_arrow.fields.iter().for_each(|field| {
977-
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
978-
});
979-
980-
assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
981971

982972
// Convert the Spark expressions to Physical expressions
983973
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
@@ -997,39 +987,56 @@ impl PhysicalPlanner {
997987
))
998988
});
999989

1000-
let object_store_url = ObjectStoreUrl::local_filesystem();
1001-
let paths: Vec<Url> = scan
1002-
.path
1003-
.iter()
1004-
.map(|path| Url::parse(path).unwrap())
1005-
.collect();
1006-
1007990
let object_store = object_store::local::LocalFileSystem::new();
1008991
// register the object store with the runtime environment
1009992
let url = Url::try_from("file://").unwrap();
1010993
self.session_ctx
1011994
.runtime_env()
1012995
.register_object_store(&url, Arc::new(object_store));
1013996

1014-
let files: Vec<PartitionedFile> = paths
1015-
.iter()
1016-
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
1017-
.collect();
1018-
1019-
// partition the files
1020-
// TODO really should partition the row groups
1021-
1022-
let mut file_groups = vec![vec![]; partition_count];
1023-
files.iter().enumerate().for_each(|(idx, file)| {
1024-
file_groups[idx % partition_count].push(file.clone());
997+
// Generate file groups
998+
let mut file_groups: Vec<Vec<PartitionedFile>> =
999+
Vec::with_capacity(partition_count);
1000+
scan.file_partitions.iter().for_each(|partition| {
1001+
let mut files = Vec::with_capacity(partition.partitioned_file.len());
1002+
partition.partitioned_file.iter().for_each(|file| {
1003+
assert!(file.start + file.length <= file.file_size);
1004+
files.push(PartitionedFile::new_with_range(
1005+
Url::parse(file.file_path.as_ref())
1006+
.unwrap()
1007+
.path()
1008+
.to_string(),
1009+
file.file_size as u64,
1010+
file.start,
1011+
file.start + file.length,
1012+
));
1013+
});
1014+
file_groups.push(files);
10251015
});
10261016

1027-
let file_scan_config =
1017+
// TODO: I think we can remove partition_count in the future, but leave for testing.
1018+
assert_eq!(file_groups.len(), partition_count);
1019+
1020+
let object_store_url = ObjectStoreUrl::local_filesystem();
1021+
let mut file_scan_config =
10281022
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
1029-
.with_file_groups(file_groups)
1030-
.with_projection(Some(projection_vector));
1023+
.with_file_groups(file_groups);
1024+
1025+
// Check for projection, if so generate the vector and add to FileScanConfig.
1026+
if !required_schema_arrow.fields.is_empty() {
1027+
let mut projection_vector: Vec<usize> =
1028+
Vec::with_capacity(required_schema_arrow.fields.len());
1029+
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
1030+
required_schema_arrow.fields.iter().for_each(|field| {
1031+
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
1032+
});
1033+
1034+
assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
1035+
file_scan_config = file_scan_config.with_projection(Some(projection_vector));
1036+
}
10311037

10321038
let mut table_parquet_options = TableParquetOptions::new();
1039+
// TODO: Maybe these are configs?
10331040
table_parquet_options.global.pushdown_filters = true;
10341041
table_parquet_options.global.reorder_filters = true;
10351042

@@ -1041,7 +1048,7 @@ impl PhysicalPlanner {
10411048
}
10421049

10431050
let scan = builder.build();
1044-
return Ok((vec![], Arc::new(scan)));
1051+
Ok((vec![], Arc::new(scan)))
10451052
}
10461053
OpStruct::Scan(scan) => {
10471054
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

native/proto/src/proto/operator.proto

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,19 @@ message Operator {
4747
}
4848
}
4949

50+
message SparkPartitionedFile {
51+
string file_path = 1;
52+
int64 start = 2;
53+
int64 length = 3;
54+
int64 file_size = 4;
55+
}
56+
57+
// This name and the one above are not great, but they correspond to the (unfortunate) Spark names.
58+
// I prepended "Spark" since I think there's a name collision on the native side, but we can revisit.
59+
message SparkFilePartition {
60+
repeated SparkPartitionedFile partitioned_file = 1;
61+
}
62+
5063
message Scan {
5164
repeated spark.spark_expression.DataType fields = 1;
5265
// The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
@@ -61,10 +74,10 @@ message NativeScan {
6174
// is purely for informational purposes when viewing native query plans in
6275
// debug mode.
6376
string source = 2;
64-
repeated string path = 3;
65-
string required_schema = 4;
66-
string data_schema = 5;
67-
repeated spark.spark_expression.Expr data_filters = 6;
77+
string required_schema = 3;
78+
string data_schema = 4;
79+
repeated spark.spark_expression.Expr data_filters = 5;
80+
repeated SparkFilePartition file_partitions = 6;
6881
}
6982

7083
message Projection {

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.execution
3535
import org.apache.spark.sql.execution._
3636
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
3737
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec}
38+
import org.apache.spark.sql.execution.datasources.FileScanRDD
3839
import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter
3940
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
4041
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec}
@@ -2496,16 +2497,30 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
24962497
val dataFilters = scan.dataFilters.map(exprToProto(_, scan.output))
24972498
nativeScanBuilder.addAllDataFilters(dataFilters.map(_.get).asJava)
24982499

2500+
// Eventually we'll want to modify CometNativeScan to generate the file partitions
2501+
// for us without instantiating the RDD.
2502+
val file_partitions = scan.inputRDD.asInstanceOf[FileScanRDD].filePartitions;
2503+
file_partitions.foreach(partition => {
2504+
val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder()
2505+
partition.files.foreach(file => {
2506+
val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder()
2507+
fileBuilder
2508+
.setFilePath(file.pathUri.toString)
2509+
.setStart(file.start)
2510+
.setLength(file.length)
2511+
.setFileSize(file.fileSize)
2512+
partitionBuilder.addPartitionedFile(fileBuilder.build())
2513+
})
2514+
nativeScanBuilder.addFilePartitions(partitionBuilder.build())
2515+
})
2516+
24992517
val requiredSchemaParquet =
25002518
new SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema)
25012519
val dataSchemaParquet =
25022520
new SparkToParquetSchemaConverter(conf).convert(scan.relation.dataSchema)
25032521

25042522
nativeScanBuilder.setRequiredSchema(requiredSchemaParquet.toString)
25052523
nativeScanBuilder.setDataSchema(dataSchemaParquet.toString)
2506-
scan.relation.location.inputFiles.foreach { f =>
2507-
nativeScanBuilder.addPath(f)
2508-
}
25092524

25102525
Some(result.setNativeScan(nativeScanBuilder).build())
25112526

0 commit comments

Comments
 (0)