diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index bbe452cf8c..d892a50b8a 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -122,6 +122,7 @@ use datafusion_physical_expr::LexOrdering; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; +use object_store::path::Path; use std::cmp::max; use std::{collections::HashMap, sync::Arc}; use url::Url; @@ -993,15 +994,18 @@ impl PhysicalPlanner { assert!(file.start + file.length <= file.file_size); let mut partitioned_file = PartitionedFile::new_with_range( - Url::parse(file.file_path.as_ref()) - .unwrap() - .path() - .to_string(), + String::new(), // Dummy file path. file.file_size as u64, file.start, file.start + file.length, ); + // Spark sends the path over as URL-encoded, parse that first. + let url = Url::parse(file.file_path.as_ref()).unwrap(); + // Convert that to a Path object to use in the PartitionedFile. + let path = Path::from_url_path(url.path()).unwrap(); + partitioned_file.object_meta.location = path; + // Process partition values // Create an empty input schema for partition values because they are all literals. let empty_schema = Arc::new(Schema::empty()); diff --git a/native/core/src/execution/datafusion/schema_adapter.rs b/native/core/src/execution/datafusion/schema_adapter.rs index ce858f65be..2c6032a0a2 100644 --- a/native/core/src/execution/datafusion/schema_adapter.rs +++ b/native/core/src/execution/datafusion/schema_adapter.rs @@ -114,7 +114,7 @@ impl SchemaAdapter for CometSchemaAdapter { required_schema: Arc::::clone(&self.required_schema), field_mappings, table_schema: Arc::::clone(&self.table_schema), - cast_options + cast_options, }), projection, )) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b30ad13968..5ee16bd7b0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -3243,7 +3243,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder() partitionVals.foreach(fileBuilder.addPartitionValues) fileBuilder - .setFilePath(file.filePath.toUri.toString) + .setFilePath(file.filePath.toString) .setStart(file.start) .setLength(file.length) .setFileSize(file.fileSize)