diff --git a/pom.xml b/pom.xml index ddcac283d4..25dc863314 100644 --- a/pom.xml +++ b/pom.xml @@ -741,12 +741,16 @@ under the License. -deprecation -unchecked -feature - -Xlint:_ - -Ywarn-dead-code -Ywarn-numeric-widen -Ywarn-value-discard -Ywarn-unused:imports,patvars,privates,locals,params,-implicits -Xfatal-warnings + diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index 6ef92d0a67..f0b1524c36 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -21,11 +21,15 @@ package org.apache.comet import java.nio.ByteBuffer +import scala.annotation.nowarn + import org.apache.spark.CometTaskMemoryManager import org.apache.spark.sql.comet.CometMetricNode import org.apache.comet.parquet.CometFileKeyUnwrapper +// Suppress unused warnings since native methods have no Scala impl +@nowarn("msg=never used") class Native extends NativeBase { // scalastyle:off diff --git a/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala index 8dc0c828c7..a4c4db9c38 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala @@ -19,9 +19,12 @@ package org.apache.comet.expressions +import scala.annotation.nowarn + object RegExp { /** Determine whether the regexp pattern is supported natively and compatible with Spark */ + @nowarn // suppress unused param warning since 'pattern' is only a placeholder here def isSupportedPattern(pattern: String): Boolean = { // this is a placeholder for implementing logic to determine if the pattern // is known to be compatible with Spark, so that we can enable regexp automatically diff --git a/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala b/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala index ac6a89ca3b..e5a59511d3 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala @@ -50,11 +50,12 @@ object SourceFilterSerde extends Logging { .setDatatype(dataType.get) .build() Some( - field.dataType, - ExprOuterClass.Expr - .newBuilder() - .setBound(boundExpr) - .build()) + ( + field.dataType, + ExprOuterClass.Expr + .newBuilder() + .setBound(boundExpr) + .build())) } else { None } @@ -80,8 +81,8 @@ object SourceFilterSerde extends Logging { // refer to org.apache.spark.sql.catalyst.CatalystTypeConverters.CatalystTypeConverter#toScala dataType match { case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean]) - case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte]) - case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short]) + case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt) + case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt) case _: IntegerType => exprBuilder.setIntVal(value.asInstanceOf[Int]) case _: LongType => exprBuilder.setLongVal(value.asInstanceOf[Long]) case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float]) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 950d0e9d37..231e833766 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -445,7 +445,7 @@ object CometScanRule extends Logging { val reason = "Object store config not supported by " + s"$SCAN_NATIVE_ICEBERG_COMPAT: ${e.getMessage}" fallbackReasons += reason - configValidityMap.put(cacheKey, Some(reason)) + configValidityMap.update(cacheKey, Some(reason)) } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 0a4b61fce2..d2523008d8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -19,6 +19,7 @@ package org.apache.comet.serde +import scala.annotation.unused import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ @@ -1804,7 +1805,7 @@ object QueryPlanSerde extends Logging with CometExprShim { .setFileSize(file.fileSize) partitionBuilder.addPartitionedFile(fileBuilder.build()) }) - nativeScanBuilder.addFilePartitions(partitionBuilder.build()) + nativeScanBuilder.addFilePartitions(partitionBuilder.build()); () } } @@ -1886,7 +1887,7 @@ trait CometExpressionSerde[T <: Expression] { * @return * Support level (Compatible, Incompatible, or Unsupported). */ - def getSupportLevel(expr: T): SupportLevel = Compatible(None) + def getSupportLevel(@unused expr: T): SupportLevel = Compatible(None) /** * Convert a Spark expression into a protocol buffer representation that can be passed into diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala index d6e129a3c7..6eea1878e9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala +++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala @@ -19,6 +19,7 @@ package org.apache.comet.serde +import scala.annotation.unused import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.expressions.{Attribute, EvalMode} @@ -414,7 +415,7 @@ trait CometCovBase { statsType: Int, inputs: Seq[Attribute], binding: Boolean, - conf: SQLConf): Option[ExprOuterClass.AggExpr] = { + @unused conf: SQLConf): Option[ExprOuterClass.AggExpr] = { val child1Expr = exprToProto(cov.left, inputs, binding) val child2Expr = exprToProto(cov.right, inputs, binding) val dataType = serializeDataType(cov.dataType) diff --git a/spark/src/main/scala/org/apache/comet/serde/literals.scala b/spark/src/main/scala/org/apache/comet/serde/literals.scala index 312f12a4c5..d75b68c23b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/literals.scala +++ b/spark/src/main/scala/org/apache/comet/serde/literals.scala @@ -72,8 +72,8 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging { exprBuilder.setIsNull(false) dataType match { case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean]) - case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte]) - case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short]) + case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt) + case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt) case _: IntegerType | _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int]) case _: LongType | _: TimestampType | _: TimestampNTZType => exprBuilder.setLongVal(value.asInstanceOf[Long]) diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala b/spark/src/main/scala/org/apache/spark/Plugins.scala index 1f3d4bba84..4e18962095 100644 --- a/spark/src/main/scala/org/apache/spark/Plugins.scala +++ b/spark/src/main/scala/org/apache/spark/Plugins.scala @@ -100,13 +100,13 @@ object CometDriverPlugin extends Logging { val extensions = conf.get(extensionKey, "") if (extensions.isEmpty) { logInfo(s"Setting $extensionKey=$extensionClass") - conf.set(extensionKey, extensionClass) + conf.set(extensionKey, extensionClass); () } else { val currentExtensions = extensions.split(",").map(_.trim) if (!currentExtensions.contains(extensionClass)) { val newValue = s"$extensions,$extensionClass" logInfo(s"Setting $extensionKey=$newValue") - conf.set(extensionKey, newValue) + conf.set(extensionKey, newValue); () } } } diff --git a/spark/src/main/scala/org/apache/spark/shuffle/sort/RowPartition.scala b/spark/src/main/scala/org/apache/spark/shuffle/sort/RowPartition.scala index d3536850f6..dec66e71cc 100644 --- a/spark/src/main/scala/org/apache/spark/shuffle/sort/RowPartition.scala +++ b/spark/src/main/scala/org/apache/spark/shuffle/sort/RowPartition.scala @@ -26,8 +26,8 @@ class RowPartition(initialSize: Int) { private var rowSizes: ArrayBuffer[Int] = new ArrayBuffer[Int](initialSize) def addRow(addr: Long, size: Int): Unit = { - rowAddresses += addr - rowSizes += size + rowAddresses.append(addr) + rowSizes.append(size) } def getNumRows: Int = if (rowAddresses == null) { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala index 8c779e8dc9..a6feee4eea 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala @@ -66,7 +66,7 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres override def next(): ColumnarBatch = { val batch = batches.next() - numOutputRows += batch.numRows() + numOutputRows += batch.numRows().toLong batch } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala index 95770592fd..fa3733f98b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala @@ -146,7 +146,7 @@ case class CometBroadcastExchangeExec( longMetric("numOutputRows") += numRows if (numRows >= maxBroadcastRows) { throw QueryExecutionErrors.cannotBroadcastTableOverMaxTableRowsError( - maxBroadcastRows, + maxBroadcastRows.toLong, numRows) } @@ -198,7 +198,7 @@ case class CometBroadcastExchangeExec( override protected def doPrepare(): Unit = { // Materialize the future. - relationFuture + relationFuture; () } override protected def doExecute(): RDD[InternalRow] = { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 09794e8e26..202ff63a96 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -88,7 +88,7 @@ case class CometCollectLimitExec( outputPartitioning, serializer, metrics) - metrics("numPartitions").set(dep.partitioner.numPartitions) + metrics("numPartitions").set(dep.partitioner.numPartitions.toDouble) new CometShuffledBatchRDD(dep, readMetrics) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index d965a6ff7b..c6f1dd14d5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -82,7 +82,7 @@ case class CometColumnarToRowExec(child: SparkPlan) val toUnsafe = UnsafeProjection.create(localOutput, localOutput) batches.flatMap { batch => numInputBatches += 1 - numOutputRows += batch.numRows() + numOutputRows += batch.numRows().toLong batch.rowIterator().asScala.map(toUnsafe) } } @@ -120,7 +120,7 @@ case class CometColumnarToRowExec(child: SparkPlan) .flatMap(CometUtils.decodeBatches(_, this.getClass.getSimpleName)) .flatMap { batch => numInputBatches += 1 - numOutputRows += batch.numRows() + numOutputRows += batch.numRows().toLong batch.rowIterator().asScala.map(toUnsafe) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index d4cb11ac62..70cd6e26a5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -242,7 +242,7 @@ case class CometScanExec( driverMetrics("staticFilesSize") = filesSize } if (relation.partitionSchema.nonEmpty) { - driverMetrics("numPartitions") = partitions.length + driverMetrics("numPartitions") = partitions.length.toLong } } @@ -284,7 +284,7 @@ case class CometScanExec( override def next(): ColumnarBatch = { val batch = batches.next() - numOutputRows += batch.numRows() + numOutputRows += batch.numRows().toLong batch } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala index bcf8918575..630a33172f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala @@ -83,7 +83,7 @@ case class CometSparkToColumnarExec(child: SparkPlan) val startNs = System.nanoTime() val batch = iter.next() conversionTime += System.nanoTime() - startNs - numInputRows += batch.numRows() + numInputRows += batch.numRows().toLong numOutputBatches += 1 batch } @@ -123,7 +123,7 @@ case class CometSparkToColumnarExec(child: SparkPlan) CometArrowConverters.rowToArrowBatchIter( sparkBatches, schema, - maxRecordsPerBatch, + maxRecordsPerBatch.toLong, timeZoneId, context) createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index aa89dec137..daf5420e46 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -96,7 +96,7 @@ case class CometTakeOrderedAndProjectExec( outputPartitioning, serializer, metrics) - metrics("numPartitions").set(dep.partitioner.numPartitions) + metrics("numPartitions").set(dep.partitioner.numPartitions.toDouble) new CometShuffledBatchRDD(dep, readMetrics) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala index 1283a745a6..0a9c0bed12 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala @@ -111,7 +111,7 @@ class CometBlockStoreShuffleReader[K, C]( // Update the context task metrics for each record read. val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => - readMetrics.incRecordsRead(record._2.numRows()) + readMetrics.incRecordsRead(record._2.numRows().toLong) record }, context.taskMetrics().mergeShuffleReadMetrics()) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 43a1e5b9a0..632a6792c0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -60,7 +60,9 @@ class CometNativeShuffleWriter[K, V]( private val OFFSET_LENGTH = 8 var partitionLengths: Array[Long] = _ - var mapStatus: MapStatus = _ + // Store MapStatus opaquely as AnyRef, + // to avoid private[spark] visibility issues; cast back when needed. + var mapStatus: AnyRef = _ override def write(inputs: Iterator[Product2[K, V]]): Unit = { val shuffleBlockResolver = @@ -302,7 +304,7 @@ class CometNativeShuffleWriter[K, V]( override def stop(success: Boolean): Option[MapStatus] = { if (success) { - Some(mapStatus) + Some(mapStatus.asInstanceOf[MapStatus]) } else { None } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 1f7d37a108..b36b17c3eb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -137,7 +137,7 @@ case class CometShuffleExchangeExec( outputPartitioning, serializer, metrics) - metrics("numPartitions").set(dep.partitioner.numPartitions) + metrics("numPartitions").set(dep.partitioner.numPartitions.toLong) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates( sparkContext, @@ -151,7 +151,7 @@ case class CometShuffleExchangeExec( outputPartitioning, serializer, metrics) - metrics("numPartitions").set(dep.partitioner.numPartitions) + metrics("numPartitions").set(dep.partitioner.numPartitions.toLong) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates( sparkContext, @@ -385,7 +385,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { // end up being almost the same regardless of the index. substantially scrambling the // seed by hashing will help. Refer to SPARK-21782 for more details. val partitionId = TaskContext.get().partitionId() - var position = new XORShiftRandom(partitionId).nextInt(numPartitions) + var position = new XORShiftRandom(partitionId.toLong).nextInt(numPartitions).toLong (_: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 @@ -432,7 +432,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { row: InternalRow): UnsafeExternalRowSorter.PrefixComputer.Prefix = { // The hashcode generated from the binary form of a [[UnsafeRow]] should not be null. result.isNull = false - result.value = row.hashCode() + result.value = row.hashCode().toLong result } } diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index ed9b79165a..1db412392c 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -21,7 +21,6 @@ package org.apache.spark.sql.comet.shims import org.apache.hadoop.fs.Path - import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} @@ -32,6 +31,8 @@ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.SPARK_VERSION_SHORT import org.apache.spark.util.VersionUtils + +import scala.annotation.nowarn import scala.math.Ordering.Implicits._ trait ShimCometScanExec { @@ -42,7 +43,7 @@ trait ShimCometScanExec { def isSparkVersionAtLeast355: Boolean = { VersionUtils.majorMinorPatchVersion(SPARK_VERSION_SHORT) match { - case Some((major, minor, patch)) => (major, minor, patch) >= (3, 5, 5) + case Some((major, minor, patch)) => (major, minor, patch) >= ((3, 5, 5)) case None => throw new IllegalArgumentException(s"Malformed Spark version: $SPARK_VERSION_SHORT") } @@ -62,7 +63,7 @@ trait ShimCometScanExec { fsRelation.fileFormat.fileConstantMetadataExtractors, options) - // see SPARK-39634 + @nowarn // Temporary implementation, see SPARK-39634 protected def isNeededForSchema(sparkSchema: StructType): Boolean = false protected def getPartitionedFile(f: FileStatusWithMetadata, p: PartitionDirectory): PartitionedFile = diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala index 3198035446..b68aa3952e 100644 --- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleExchangeExec, S import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.types.{StructField, StructType} +import scala.annotation.nowarn + trait ShimCometShuffleExchangeExec { // TODO: remove after dropping Spark 3.4 support def apply(s: ShuffleExchangeExec, shuffleType: ShuffleType): CometShuffleExchangeExec = { @@ -46,5 +48,6 @@ trait ShimCometShuffleExchangeExec { StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))) // TODO: remove after dropping Spark 3.x support + @nowarn // Suppress parameter never used warning. protected def getShuffleId(shuffleDependency: ShuffleDependency[Int, _, _]): Int = 0 }