From 76339729701caaf7eaecd9287831cbf64a99a206 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 12 Dec 2024 10:35:11 -0800 Subject: [PATCH 1/4] fix: CometScanExec was created for unsupported cases if only COMET_NATIVE_SCAN is enabled --- .../main/java/org/apache/comet/parquet/BatchReader.java | 2 +- .../java/org/apache/comet/parquet/NativeBatchReader.java | 8 ++++---- .../src/main/scala/org/apache/comet/DataTypeSupport.scala | 5 ++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 11c6d14dca..675dae9e78 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -272,7 +272,7 @@ public void init() throws URISyntaxException, IOException { requestedSchema = CometParquetReadSupport.clipParquetSchema( requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds); - if (requestedSchema.getColumns().size() != sparkSchema.size()) { + if (requestedSchema.getFieldCount() != sparkSchema.size()) { throw new IllegalArgumentException( String.format( "Spark schema has %d columns while " + "Parquet schema has %d columns", diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index 17fab47e54..3ac55ba4d9 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -245,7 +245,7 @@ public void init() throws URISyntaxException, IOException { requestedSchema = CometParquetReadSupport.clipParquetSchema( requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds); - if (requestedSchema.getColumns().size() != sparkSchema.size()) { + if (requestedSchema.getFieldCount() != sparkSchema.size()) { throw new IllegalArgumentException( String.format( "Spark schema has %d columns while " + "Parquet schema has %d columns", @@ -267,9 +267,9 @@ public void init() throws URISyntaxException, IOException { // ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema); for (int i = 0; i < requestedSchema.getFieldCount(); i++) { Type t = requestedSchema.getFields().get(i); - Preconditions.checkState( - t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED), - "Complex type is not supported"); + // Preconditions.checkState( + // t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED), + // "Complex type is not supported"); String[] colPath = paths.get(i); if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) { // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala index 09c062b8b7..eb524af906 100644 --- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala +++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala @@ -39,7 +39,10 @@ trait DataTypeSupport { BinaryType | StringType | _: DecimalType | DateType | TimestampType => true case t: DataType if t.typeName == "timestamp_ntz" => true - case _: StructType => true + case _: StructType + if CometConf.COMET_FULL_NATIVE_SCAN_ENABLED + .get() || CometConf.COMET_NATIVE_ARROW_SCAN_ENABLED.get() => + true case _ => false } From bfaecb0bf89831fe3c4ccdf9b1e98d57fc77d335 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 12 Dec 2024 18:29:04 -0800 Subject: [PATCH 2/4] fix: Another try to fix ' test("Comet native metrics: BroadcastHashJoin") --- .../org/apache/spark/sql/comet/CometScanExec.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 49f7694bcf..5d28b4b729 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -131,8 +131,15 @@ case class CometScanExec( // exposed for testing lazy val bucketedScan: Boolean = wrapped.bucketedScan - override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = - (wrapped.outputPartitioning, wrapped.outputOrdering) + override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { + if (bucketedScan) { + (wrapped.outputPartitioning, wrapped.outputOrdering) + } else { + val files = selectedPartitions.flatMap(partition => partition.files) + val numPartitions = files.length + (UnknownPartitioning(numPartitions), wrapped.outputOrdering) + } + } @transient private lazy val pushedDownFilters = getPushedDownFilters(relation, dataFilters) From a7fad04fbcf9b2f4442b130d561b4646a0980427 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 12 Dec 2024 18:29:56 -0800 Subject: [PATCH 3/4] fix: some tests are valid only when full native scan is enabled --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 85ac6138bb..e65feb6b2f 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2217,6 +2217,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> v1List, CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true", CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { val df = spark.read.parquet(dir.toString()) @@ -2249,6 +2250,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> v1List, CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true", CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { val df = spark.read.parquet(dir.toString()) From 1f3a3ee9f2a6f226e345e1319eb1024c4b3a9ebb Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 12 Dec 2024 18:35:23 -0800 Subject: [PATCH 4/4] Merge pull request #1 from andygrove/fix-tests-spark-cast-options --- native/spark-expr/src/cast.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index 17ab73b72b..a6d139716f 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -571,7 +571,7 @@ impl SparkCastOptions { eval_mode, timezone: timezone.to_string(), allow_incompat, - is_adapting_schema: false, + is_adapting_schema: false } } @@ -583,6 +583,7 @@ impl SparkCastOptions { is_adapting_schema: false, } } + } /// Spark-compatible cast implementation. Defers to DataFusion's cast where that is known @@ -2087,7 +2088,7 @@ mod tests { let timezone = "UTC".to_string(); // test casting string dictionary array to timestamp array - let cast_options = SparkCastOptions::new(EvalMode::Legacy, timezone.clone(), false); + let cast_options = SparkCastOptions::new(EvalMode::Legacy, &timezone, false); let result = cast_array( dict_array, &DataType::Timestamp(TimeUnit::Microsecond, Some(timezone.clone().into())), @@ -2296,7 +2297,7 @@ mod tests { fn test_cast_unsupported_timestamp_to_date() { // Since datafusion uses chrono::Datetime internally not all dates representable by TimestampMicrosecondType are supported let timestamps: PrimitiveArray = vec![i64::MAX].into(); - let cast_options = SparkCastOptions::new(EvalMode::Legacy, "UTC".to_string(), false); + let cast_options = SparkCastOptions::new(EvalMode::Legacy, "UTC", false); let result = cast_array( Arc::new(timestamps.with_timezone("Europe/Copenhagen")), &DataType::Date32, @@ -2309,7 +2310,7 @@ mod tests { fn test_cast_invalid_timezone() { let timestamps: PrimitiveArray = vec![i64::MAX].into(); let cast_options = - SparkCastOptions::new(EvalMode::Legacy, "Not a valid timezone".to_string(), false); + SparkCastOptions::new(EvalMode::Legacy, "Not a valid timezone", false); let result = cast_array( Arc::new(timestamps.with_timezone("Europe/Copenhagen")), &DataType::Date32, @@ -2335,7 +2336,7 @@ mod tests { let string_array = cast_array( c, &DataType::Utf8, - &SparkCastOptions::new(EvalMode::Legacy, "UTC".to_owned(), false), + &SparkCastOptions::new(EvalMode::Legacy, "UTC", false), ) .unwrap(); let string_array = string_array.as_string::(); @@ -2400,10 +2401,9 @@ mod tests { let cast_array = spark_cast( ColumnarValue::Array(c), &DataType::Struct(fields), - EvalMode::Legacy, + &SparkCastOptions::new(EvalMode::Legacy, "UTC", - false, - false, + false) ) .unwrap(); if let ColumnarValue::Array(cast_array) = cast_array {