Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
import java.io.{DataInputStream, DataOutputStream}
import java.util.concurrent.atomic.AtomicBoolean

import scala.annotation.nowarn
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -362,7 +363,7 @@ case class ColumnarArrowEvalPythonExec(
StructField(s"_$i", dt)
}.toSeq)

val contextAwareIterator = new ContextAwareIterator(context, iter)
@nowarn val contextAwareIterator = new ContextAwareIterator(context, iter)
val inputCbCache = new ArrayBuffer[ColumnarBatch]()
var start_time: Long = 0
val inputBatchIter = contextAwareIterator.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil
import java.lang.{Class, Long => JLong}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}

import scala.annotation.nowarn
import scala.collection.JavaConverters._

object GlutenIcebergSourceUtil {
Expand All @@ -50,6 +51,7 @@ object GlutenIcebergSourceUtil {
}
}

@nowarn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the warning message? Can we fix the warning?

Copy link
Contributor Author

@baibaichen baibaichen Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.iceberg.ContentFile::path() is marked as deprecated. Without introducing a shim, it's hard to fix properly, so we'll suppress the warning for now.

        asFileScanTask(tasks.toList).foreach {
          task =>
            paths.add(
              BackendsApiManager.getTransformerApiInstance
                .encodeFilePathIfNeed(task.file().path().toString))  // => deprecated

Additionally, @nowarn can only be added before method names; it cannot be applied before individual statements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment indicates, please use location

  /**
   * Returns fully qualified path to the file, suitable for constructing a Hadoop Path.
   *
   * @deprecated since 1.7.0, will be removed in 2.0.0; use {@link #location()} instead.
   */
  @Deprecated
  CharSequence path();

  /** Return the fully qualified path to the file. */
  default String location() {
    return path().toString();
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is shim for iceberg like this PR, #11146, we can accept current workaround, and fix it after #11146 merge

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment indicates, please use location

  /**
   * Returns fully qualified path to the file, suitable for constructing a Hadoop Path.
   *
   * @deprecated since 1.7.0, will be removed in 2.0.0; use {@link #location()} instead.
   */
  @Deprecated
  CharSequence path();

  /** Return the fully qualified path to the file. */
  default String location() {
    return path().toString();
  }

No, location doesn't work in profiles in Spark 3.3 and earlier.

def genSplitInfo(
partition: SparkDataSourceRDDPartition,
readPartitionSchema: StructType): SplitInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.TimestampTypes
Expand All @@ -40,6 +40,7 @@ import org.apache.spark.util.Utils

import java.io.File
import java.net.URI
import java.nio.file.Files
import java.util.Locale

import scala.collection.mutable
Expand Down Expand Up @@ -320,7 +321,7 @@ class GlutenSQLQueryTestSuite
newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER")
}

val input = fileToString(new File(testCase.inputFile))
val input = Files.readString(new File(testCase.inputFile).toPath)

val (comments, code) = splitCommentsAndCodes(input)

Expand All @@ -331,7 +332,7 @@ class GlutenSQLQueryTestSuite
testCaseName =>
listTestCases.find(_.name == testCaseName).map {
testCase =>
val input = fileToString(new File(testCase.inputFile))
val input = Files.readString(new File(testCase.inputFile).toPath)
val (_, code) = splitCommentsAndCodes(input)
code
}
Expand Down Expand Up @@ -738,7 +739,7 @@ class GlutenSQLQueryTestSuite
makeOutput: (String, Option[String], String) => QueryTestOutput): Unit = {
// Read back the golden file.
val expectedOutputs: Seq[QueryTestOutput] = {
val goldenOutput = fileToString(new File(resultFile))
val goldenOutput = Files.readString(new File(resultFile).toPath)
val segments = goldenOutput.split("-- !query.*\n")

val numSegments = outputs.map(_.numSegments).sum + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.gluten.execution.SortExecTransformer
import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row}
import org.apache.spark.sql.execution.ColumnarShuffleExchangeExec
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -38,25 +39,25 @@ class GlutenDataSourceV2Suite extends DataSourceV2Suite with GlutenSQLTestsBaseT
val df = spark.read.format(cls.getName).load()
checkAnswer(df, Seq(Row(1, 4), Row(1, 4), Row(3, 6), Row(2, 6), Row(4, 2), Row(4, 2)))

val groupByColA = df.groupBy('i).agg(sum('j))
val groupByColA = df.groupBy(col("i")).agg(sum(col("j")))
checkAnswer(groupByColA, Seq(Row(1, 8), Row(2, 6), Row(3, 6), Row(4, 4)))
assert(collectFirst(groupByColA.queryExecution.executedPlan) {
case e: ColumnarShuffleExchangeExec => e
}.isEmpty)

val groupByColAB = df.groupBy('i, 'j).agg(count("*"))
val groupByColAB = df.groupBy(col("i"), col("j")).agg(count("*"))
checkAnswer(groupByColAB, Seq(Row(1, 4, 2), Row(2, 6, 1), Row(3, 6, 1), Row(4, 2, 2)))
assert(collectFirst(groupByColAB.queryExecution.executedPlan) {
case e: ColumnarShuffleExchangeExec => e
}.isEmpty)

val groupByColB = df.groupBy('j).agg(sum('i))
val groupByColB = df.groupBy(col("j")).agg(sum(col("i")))
checkAnswer(groupByColB, Seq(Row(2, 8), Row(4, 2), Row(6, 5)))
assert(collectFirst(groupByColB.queryExecution.executedPlan) {
case e: ColumnarShuffleExchangeExec => e
}.isDefined)

val groupByAPlusB = df.groupBy('i + 'j).agg(count("*"))
val groupByAPlusB = df.groupBy(col("i") + col("j")).agg(count("*"))
checkAnswer(groupByAPlusB, Seq(Row(5, 2), Row(6, 2), Row(8, 1), Row(9, 1)))
assert(collectFirst(groupByAPlusB.queryExecution.executedPlan) {
case e: ColumnarShuffleExchangeExec => e
Expand Down Expand Up @@ -138,7 +139,7 @@ class GlutenDataSourceV2Suite extends DataSourceV2Suite with GlutenSQLTestsBaseT
{
val windowPartByColIOrderByColJ = df.withColumn(
"no",
row_number().over(Window.partitionBy(Symbol("i")).orderBy(Symbol("j"))))
row_number().over(Window.partitionBy(col("i")).orderBy(col("j"))))
checkAnswer(
windowPartByColIOrderByColJ,
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestData.TestData
Expand Down Expand Up @@ -254,13 +255,13 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName
) {
val df1 = spark.range(10).withColumn("a", 'id)
val df2 = spark.range(10).withColumn("b", 'id)
val df1 = spark.range(10).withColumn("a", col("id"))
val df2 = spark.range(10).withColumn("b", col("id"))
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val testDf = df1
.where('a > 10)
.join(df2.where('b > 10), Seq("id"), "left_outer")
.groupBy('a)
.where(col("a") > 10)
.join(df2.where(col("b") > 10), Seq("id"), "left_outer")
.groupBy(col("a"))
.count()
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
Expand All @@ -269,9 +270,9 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
val testDf = df1
.where('a > 10)
.join(df2.where('b > 10), Seq("id"), "left_outer")
.groupBy('a)
.where(col("a") > 10)
.join(df2.where(col("b") > 10), Seq("id"), "left_outer")
.groupBy(col("a"))
.count()
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
Expand Down Expand Up @@ -346,8 +347,8 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
// +- ShuffleExchange

// After applied the 'OptimizeShuffleWithLocalRead' rule, we can convert all the four
// shuffle read to local shuffle read in the bottom two 'BroadcastHashJoin'.
// For the top level 'BroadcastHashJoin', the probe side is not shuffle query stage
// shuffle read to local shuffle read in the bottom two Symbol("b")roadcastHashJoin'.
// For the top level Symbol("b")roadcastHashJoin', the probe side is not shuffle query stage
// and the build side shuffle query stage is also converted to local shuffle read.
checkNumLocalShuffleReads(adaptivePlan, 0)
}
Expand Down Expand Up @@ -652,19 +653,19 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
spark
.range(0, 1000, 1, 10)
.select(
when('id < 250, 249)
.when('id >= 750, 1000)
.otherwise('id)
when(col("id") < 250, 249)
.when(col("id") >= 750, 1000)
.otherwise(col("id"))
.as("key1"),
'id.as("value1"))
col("id").as("value1"))
.createOrReplaceTempView("skewData1")
spark
.range(0, 1000, 1, 10)
.select(
when('id < 250, 249)
.otherwise('id)
when(col("id") < 250, 249)
.otherwise(col("id"))
.as("key2"),
'id.as("value2"))
col("id").as("value2"))
.createOrReplaceTempView("skewData2")

def checkSkewJoin(
Expand Down Expand Up @@ -780,19 +781,19 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
spark
.range(0, 1000, 1, 10)
.select(
when('id < 250, 249)
.when('id >= 750, 1000)
.otherwise('id)
when(col("id") < 250, 249)
.when(col("id") >= 750, 1000)
.otherwise(col("id"))
.as("key1"),
'id.as("value1"))
col("id").as("value1"))
.createOrReplaceTempView("skewData1")
spark
.range(0, 1000, 1, 10)
.select(
when('id < 250, 249)
.otherwise('id)
when(col("id") < 250, 249)
.otherwise(col("id"))
.as("key2"),
'id.as("value2"))
col("id").as("value2"))
.createOrReplaceTempView("skewData2")
val (_, adaptivePlan) =
runAdaptiveAndVerifyResult("SELECT * FROM skewData1 join skewData2 ON key1 = key2")
Expand Down Expand Up @@ -940,7 +941,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
// Repartition with no partition num specified.
checkBHJ(
df.repartition('b),
df.repartition(col("b")),
// The top shuffle from repartition is optimized out.
optimizeOutRepartition = true,
probeSideLocalRead = false,
Expand All @@ -949,7 +950,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

// Repartition with default partition num (5 in test env) specified.
checkBHJ(
df.repartition(5, 'b),
df.repartition(5, col("b")),
// The top shuffle from repartition is optimized out
// The final plan must have 5 partitions, no optimization can be made to the probe side.
optimizeOutRepartition = true,
Expand All @@ -959,7 +960,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

// Repartition with non-default partition num specified.
checkBHJ(
df.repartition(4, 'b),
df.repartition(4, col("b")),
// The top shuffle from repartition is not optimized out
optimizeOutRepartition = false,
probeSideLocalRead = true,
Expand All @@ -968,7 +969,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

// Repartition by col and project away the partition cols
checkBHJ(
df.repartition('b).select('key),
df.repartition(col("b")).select(col("key")),
// The top shuffle from repartition is not optimized out
optimizeOutRepartition = false,
probeSideLocalRead = true,
Expand All @@ -986,15 +987,16 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
) {
// Repartition with no partition num specified.
checkSMJ(
df.repartition('b),
df.repartition(col("b")),
// The top shuffle from repartition is optimized out.
optimizeOutRepartition = true,
optimizeSkewJoin = false,
coalescedRead = true)
coalescedRead = true
)

// Repartition with default partition num (5 in test env) specified.
checkSMJ(
df.repartition(5, 'b),
df.repartition(5, col("b")),
// The top shuffle from repartition is optimized out.
// The final plan must have 5 partitions, can't do coalesced read.
optimizeOutRepartition = true,
Expand All @@ -1004,15 +1006,16 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

// Repartition with non-default partition num specified.
checkSMJ(
df.repartition(4, 'b),
df.repartition(4, col("b")),
// The top shuffle from repartition is not optimized out.
optimizeOutRepartition = false,
optimizeSkewJoin = true,
coalescedRead = false)
coalescedRead = false
)

// Repartition by col and project away the partition cols
checkSMJ(
df.repartition('b).select('key),
df.repartition(col("b")).select(col("key")),
// The top shuffle from repartition is not optimized out.
optimizeOutRepartition = false,
optimizeSkewJoin = true,
Expand Down
Loading