Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -741,12 +741,16 @@ under the License.
<arg>-deprecation</arg>
<arg>-unchecked</arg>
<arg>-feature</arg>
<arg>-Xlint:_</arg>
<arg>-Ywarn-dead-code</arg>
<arg>-Ywarn-numeric-widen</arg>
<arg>-Ywarn-value-discard</arg>
<arg>-Ywarn-unused:imports,patvars,privates,locals,params,-implicits</arg>
<arg>-Xfatal-warnings</arg>
<!--
Private trait reference under class CometShuffleManager
<arg>-Xlint:_</arg>
UnsupportedException being thrown on SparkPlan default.
<arg>-Ywarn-dead-code</arg>
-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this comment is necessary ? or may be I am missing something here

</args>
</configuration>
</plugin>
Expand Down
4 changes: 4 additions & 0 deletions spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ package org.apache.comet

import java.nio.ByteBuffer

import scala.annotation.nowarn

import org.apache.spark.CometTaskMemoryManager
import org.apache.spark.sql.comet.CometMetricNode

import org.apache.comet.parquet.CometFileKeyUnwrapper

// Suppress unused warnings since native methods have no Scala impl
@nowarn("msg=never used")
class Native extends NativeBase {

// scalastyle:off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.apache.comet.expressions

import scala.annotation.nowarn

object RegExp {

/** Determine whether the regexp pattern is supported natively and compatible with Spark */
@nowarn // suppress unused param warning since 'pattern' is only a placeholder here
def isSupportedPattern(pattern: String): Boolean = {
// this is a placeholder for implementing logic to determine if the pattern
// is known to be compatible with Spark, so that we can enable regexp automatically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ object SourceFilterSerde extends Logging {
.setDatatype(dataType.get)
.build()
Some(
field.dataType,
ExprOuterClass.Expr
.newBuilder()
.setBound(boundExpr)
.build())
(
field.dataType,
ExprOuterClass.Expr
.newBuilder()
.setBound(boundExpr)
.build()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this formatting error is intended ?

} else {
None
}
Expand All @@ -80,8 +81,8 @@ object SourceFilterSerde extends Logging {
// refer to org.apache.spark.sql.catalyst.CatalystTypeConverters.CatalystTypeConverter#toScala
dataType match {
case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte])
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt)
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt)
case _: IntegerType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType => exprBuilder.setLongVal(value.asInstanceOf[Long])
case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ object CometScanRule extends Logging {
val reason = "Object store config not supported by " +
s"$SCAN_NATIVE_ICEBERG_COMPAT: ${e.getMessage}"
fallbackReasons += reason
configValidityMap.put(cacheKey, Some(reason))
configValidityMap.update(cacheKey, Some(reason))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.comet.serde

import scala.annotation.unused
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -1804,7 +1805,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
.setFileSize(file.fileSize)
partitionBuilder.addPartitionedFile(fileBuilder.build())
})
nativeScanBuilder.addFilePartitions(partitionBuilder.build())
nativeScanBuilder.addFilePartitions(partitionBuilder.build()); ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems incomprehensible.

}
}

Expand Down Expand Up @@ -1886,7 +1887,7 @@ trait CometExpressionSerde[T <: Expression] {
* @return
* Support level (Compatible, Incompatible, or Unsupported).
*/
def getSupportLevel(expr: T): SupportLevel = Compatible(None)
def getSupportLevel(@unused expr: T): SupportLevel = Compatible(None)

/**
* Convert a Spark expression into a protocol buffer representation that can be passed into
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/aggregates.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.comet.serde

import scala.annotation.unused
import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.{Attribute, EvalMode}
Expand Down Expand Up @@ -414,7 +415,7 @@ trait CometCovBase {
statsType: Int,
inputs: Seq[Attribute],
binding: Boolean,
conf: SQLConf): Option[ExprOuterClass.AggExpr] = {
@unused conf: SQLConf): Option[ExprOuterClass.AggExpr] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is unused can we just remove it?

val child1Expr = exprToProto(cov.left, inputs, binding)
val child2Expr = exprToProto(cov.right, inputs, binding)
val dataType = serializeDataType(cov.dataType)
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging {
exprBuilder.setIsNull(false)
dataType match {
case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte])
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt)
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt)
case _: IntegerType | _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType | _: TimestampType | _: TimestampNTZType =>
exprBuilder.setLongVal(value.asInstanceOf[Long])
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/org/apache/spark/Plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ object CometDriverPlugin extends Logging {
val extensions = conf.get(extensionKey, "")
if (extensions.isEmpty) {
logInfo(s"Setting $extensionKey=$extensionClass")
conf.set(extensionKey, extensionClass)
conf.set(extensionKey, extensionClass); ()
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps an unintended () ?

val currentExtensions = extensions.split(",").map(_.trim)
if (!currentExtensions.contains(extensionClass)) {
val newValue = s"$extensions,$extensionClass"
logInfo(s"Setting $extensionKey=$newValue")
conf.set(extensionKey, newValue)
conf.set(extensionKey, newValue); ()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class RowPartition(initialSize: Int) {
private var rowSizes: ArrayBuffer[Int] = new ArrayBuffer[Int](initialSize)

def addRow(addr: Long, size: Int): Unit = {
rowAddresses += addr
rowSizes += size
rowAddresses.append(addr)
rowSizes.append(size)
}

def getNumRows: Int = if (rowAddresses == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres

override def next(): ColumnarBatch = {
val batch = batches.next()
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ case class CometBroadcastExchangeExec(
longMetric("numOutputRows") += numRows
if (numRows >= maxBroadcastRows) {
throw QueryExecutionErrors.cannotBroadcastTableOverMaxTableRowsError(
maxBroadcastRows,
maxBroadcastRows.toLong,
numRows)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ case class CometBroadcastExchangeExec(

override protected def doPrepare(): Unit = {
// Materialize the future.
relationFuture
relationFuture; ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps an unintended () ?

}

override protected def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ case class CometCollectLimitExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toDouble)

new CometShuffledBatchRDD(dep, readMetrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batches.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch.rowIterator().asScala.map(toUnsafe)
}
}
Expand Down Expand Up @@ -120,7 +120,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
.flatMap(CometUtils.decodeBatches(_, this.getClass.getSimpleName))
.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch.rowIterator().asScala.map(toUnsafe)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ case class CometScanExec(
driverMetrics("staticFilesSize") = filesSize
}
if (relation.partitionSchema.nonEmpty) {
driverMetrics("numPartitions") = partitions.length
driverMetrics("numPartitions") = partitions.length.toLong
}
}

Expand Down Expand Up @@ -284,7 +284,7 @@ case class CometScanExec(

override def next(): ColumnarBatch = {
val batch = batches.next()
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ case class CometSparkToColumnarExec(child: SparkPlan)
val startNs = System.nanoTime()
val batch = iter.next()
conversionTime += System.nanoTime() - startNs
numInputRows += batch.numRows()
numInputRows += batch.numRows().toLong
numOutputBatches += 1
batch
}
Expand Down Expand Up @@ -123,7 +123,7 @@ case class CometSparkToColumnarExec(child: SparkPlan)
CometArrowConverters.rowToArrowBatchIter(
sparkBatches,
schema,
maxRecordsPerBatch,
maxRecordsPerBatch.toLong,
timeZoneId,
context)
createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class CometTakeOrderedAndProjectExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toDouble)

new CometShuffledBatchRDD(dep, readMetrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class CometBlockStoreShuffleReader[K, C](
// Update the context task metrics for each record read.
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(record._2.numRows())
readMetrics.incRecordsRead(record._2.numRows().toLong)
record
},
context.taskMetrics().mergeShuffleReadMetrics())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ class CometNativeShuffleWriter[K, V](
private val OFFSET_LENGTH = 8

var partitionLengths: Array[Long] = _
var mapStatus: MapStatus = _
// Store MapStatus opaquely as AnyRef,
// to avoid private[spark] visibility issues; cast back when needed.
var mapStatus: AnyRef = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we are removing type assignment of AnyRef ?


override def write(inputs: Iterator[Product2[K, V]]): Unit = {
val shuffleBlockResolver =
Expand Down Expand Up @@ -302,7 +304,7 @@ class CometNativeShuffleWriter[K, V](

override def stop(success: Boolean): Option[MapStatus] = {
if (success) {
Some(mapStatus)
Some(mapStatus.asInstanceOf[MapStatus])
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ case class CometShuffleExchangeExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
Expand All @@ -151,7 +151,7 @@ case class CometShuffleExchangeExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
Expand Down Expand Up @@ -385,7 +385,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
// end up being almost the same regardless of the index. substantially scrambling the
// seed by hashing will help. Refer to SPARK-21782 for more details.
val partitionId = TaskContext.get().partitionId()
var position = new XORShiftRandom(partitionId).nextInt(numPartitions)
var position = new XORShiftRandom(partitionId.toLong).nextInt(numPartitions).toLong
(_: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
Expand Down Expand Up @@ -432,7 +432,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
row: InternalRow): UnsafeExternalRowSorter.PrefixComputer.Prefix = {
// The hashcode generated from the binary form of a [[UnsafeRow]] should not be null.
result.isNull = false
result.value = row.hashCode()
result.value = row.hashCode().toLong
result
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.spark.sql.comet.shims


import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended formatting issue ?

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
Expand All @@ -32,6 +31,8 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.util.VersionUtils

import scala.annotation.nowarn
import scala.math.Ordering.Implicits._

trait ShimCometScanExec {
Expand All @@ -42,7 +43,7 @@ trait ShimCometScanExec {

def isSparkVersionAtLeast355: Boolean = {
VersionUtils.majorMinorPatchVersion(SPARK_VERSION_SHORT) match {
case Some((major, minor, patch)) => (major, minor, patch) >= (3, 5, 5)
case Some((major, minor, patch)) => (major, minor, patch) >= ((3, 5, 5))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended formatting issue ?

case None =>
throw new IllegalArgumentException(s"Malformed Spark version: $SPARK_VERSION_SHORT")
}
Expand All @@ -62,7 +63,7 @@ trait ShimCometScanExec {
fsRelation.fileFormat.fileConstantMetadataExtractors,
options)

// see SPARK-39634
@nowarn // Temporary implementation, see SPARK-39634
protected def isNeededForSchema(sparkSchema: StructType): Boolean = false

protected def getPartitionedFile(f: FileStatusWithMetadata, p: PartitionDirectory): PartitionedFile =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleExchangeExec, S
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.types.{StructField, StructType}

import scala.annotation.nowarn

trait ShimCometShuffleExchangeExec {
// TODO: remove after dropping Spark 3.4 support
def apply(s: ShuffleExchangeExec, shuffleType: ShuffleType): CometShuffleExchangeExec = {
Expand All @@ -46,5 +48,6 @@ trait ShimCometShuffleExchangeExec {
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))

// TODO: remove after dropping Spark 3.x support
@nowarn // Suppress parameter never used warning.
protected def getShuffleId(shuffleDependency: ShuffleDependency[Int, _, _]): Int = 0
}
Loading