Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
import java.io.{DataInputStream, DataOutputStream}
import java.util.concurrent.atomic.AtomicBoolean

import scala.annotation.nowarn
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -362,7 +363,7 @@ case class ColumnarArrowEvalPythonExec(
StructField(s"_$i", dt)
}.toSeq)

val contextAwareIterator = new ContextAwareIterator(context, iter)
@nowarn val contextAwareIterator = new ContextAwareIterator(context, iter)
val inputCbCache = new ArrayBuffer[ColumnarBatch]()
var start_time: Long = 0
val inputBatchIter = contextAwareIterator.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil
import java.lang.{Class, Long => JLong}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}

import scala.annotation.nowarn
import scala.collection.JavaConverters._

object GlutenIcebergSourceUtil {
Expand All @@ -50,6 +51,7 @@ object GlutenIcebergSourceUtil {
}
}

@nowarn

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the warning message? Can we fix the warning?

@baibaichen baibaichen Dec 30, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.iceberg.ContentFile::path() is marked as deprecated. Without introducing a shim, it's hard to fix properly, so we'll suppress the warning for now.

        asFileScanTask(tasks.toList).foreach {
          task =>
            paths.add(
              BackendsApiManager.getTransformerApiInstance
                .encodeFilePathIfNeed(task.file().path().toString))  // => deprecated

Additionally, @nowarn can only be added before method names; it cannot be applied before individual statements.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment indicates, please use location

  /**
   * Returns fully qualified path to the file, suitable for constructing a Hadoop Path.
   *
   * @deprecated since 1.7.0, will be removed in 2.0.0; use {@link #location()} instead.
   */
  @Deprecated
  CharSequence path();

  /** Return the fully qualified path to the file. */
  default String location() {
    return path().toString();
  }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is shim for iceberg like this PR, #11146, we can accept current workaround, and fix it after #11146 merge

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment indicates, please use location

  /**
   * Returns fully qualified path to the file, suitable for constructing a Hadoop Path.
   *
   * @deprecated since 1.7.0, will be removed in 2.0.0; use {@link #location()} instead.
   */
  @Deprecated
  CharSequence path();

  /** Return the fully qualified path to the file. */
  default String location() {
    return path().toString();
  }

No, location doesn't work in profiles in Spark 3.3 and earlier.

def genSplitInfo(
partition: SparkDataSourceRDDPartition,
readPartitionSchema: StructType): SplitInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.TimestampTypes
Expand All @@ -40,6 +40,7 @@ import org.apache.spark.util.Utils

import java.io.File
import java.net.URI
import java.nio.file.Files
import java.util.Locale

import scala.collection.mutable
Expand Down Expand Up @@ -320,7 +321,7 @@ class GlutenSQLQueryTestSuite
newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER")
}

val input = fileToString(new File(testCase.inputFile))
val input = Files.readString(new File(testCase.inputFile).toPath)

val (comments, code) = splitCommentsAndCodes(input)

Expand All @@ -331,7 +332,7 @@ class GlutenSQLQueryTestSuite
testCaseName =>
listTestCases.find(_.name == testCaseName).map {
testCase =>
val input = fileToString(new File(testCase.inputFile))
val input = Files.readString(new File(testCase.inputFile).toPath)
val (_, code) = splitCommentsAndCodes(input)
code
}
Expand Down Expand Up @@ -738,7 +739,7 @@ class GlutenSQLQueryTestSuite
makeOutput: (String, Option[String], String) => QueryTestOutput): Unit = {
// Read back the golden file.
val expectedOutputs: Seq[QueryTestOutput] = {
val goldenOutput = fileToString(new File(resultFile))
val goldenOutput = Files.readString(new File(resultFile).toPath)
val segments = goldenOutput.split("-- !query.*\n")

val numSegments = outputs.map(_.numSegments).sum + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.gluten.execution.SortExecTransformer
import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row}
import org.apache.spark.sql.execution.ColumnarShuffleExchangeExec
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -38,25 +39,25 @@ class GlutenDataSourceV2Suite extends DataSourceV2Suite with GlutenSQLTestsBaseT
val df = spark.read.format(cls.getName).load()
checkAnswer(df, Seq(Row(1, 4), Row(1, 4), Row(3, 6), Row(2, 6), Row(4, 2), Row(4, 2)))

val groupByColA = df.groupBy('i).agg(sum('j))
val groupByColA = df.groupBy(col("i")).agg(sum(col("j")))
checkAnswer(groupByColA, Seq(Row(1, 8), Row(2, 6), Row(3, 6), Row(4, 4)))
assert(collectFirst(groupByColA.queryExecution.executedPlan) {
case e: ColumnarShuffleExchangeExec => e
}.isEmpty)

val groupByColAB = df.groupBy('i, 'j).agg(count("*"))
val groupByColAB = df.groupBy(col("i"), col("j")).agg(count("*"))
checkAnswer(groupByColAB, Seq(Row(1, 4, 2), Row(2, 6, 1), Row(3, 6, 1), Row(4, 2, 2)))
assert(collectFirst(groupByColAB.queryExecution.executedPlan) {
case e: ColumnarShuffleExchangeExec => e
}.isEmpty)

val groupByColB = df.groupBy('j).agg(sum('i))
val groupByColB = df.groupBy(col("j")).agg(sum(col("i")))
checkAnswer(groupByColB, Seq(Row(2, 8), Row(4, 2), Row(6, 5)))
assert(collectFirst(groupByColB.queryExecution.executedPlan) {
case e: ColumnarShuffleExchangeExec => e
}.isDefined)

val groupByAPlusB = df.groupBy('i + 'j).agg(count("*"))
val groupByAPlusB = df.groupBy(col("i") + col("j")).agg(count("*"))
checkAnswer(groupByAPlusB, Seq(Row(5, 2), Row(6, 2), Row(8, 1), Row(9, 1)))
assert(collectFirst(groupByAPlusB.queryExecution.executedPlan) {
case e: ColumnarShuffleExchangeExec => e
Expand Down Expand Up @@ -138,7 +139,7 @@ class GlutenDataSourceV2Suite extends DataSourceV2Suite with GlutenSQLTestsBaseT
{
val windowPartByColIOrderByColJ = df.withColumn(
"no",
row_number().over(Window.partitionBy(Symbol("i")).orderBy(Symbol("j"))))
row_number().over(Window.partitionBy(col("i")).orderBy(col("j"))))
checkAnswer(
windowPartByColIOrderByColJ,
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestData.TestData
Expand Down Expand Up @@ -254,13 +255,13 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName
) {
val df1 = spark.range(10).withColumn("a", 'id)
val df2 = spark.range(10).withColumn("b", 'id)
val df1 = spark.range(10).withColumn("a", col("id"))
val df2 = spark.range(10).withColumn("b", col("id"))
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val testDf = df1
.where('a > 10)
.join(df2.where('b > 10), Seq("id"), "left_outer")
.groupBy('a)
.where(col("a") > 10)
.join(df2.where(col("b") > 10), Seq("id"), "left_outer")
.groupBy(col("a"))
.count()
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
Expand All @@ -269,9 +270,9 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
val testDf = df1
.where('a > 10)
.join(df2.where('b > 10), Seq("id"), "left_outer")
.groupBy('a)
.where(col("a") > 10)
.join(df2.where(col("b") > 10), Seq("id"), "left_outer")
.groupBy(col("a"))
.count()
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
Expand Down Expand Up @@ -346,8 +347,8 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
// +- ShuffleExchange

// After applied the 'OptimizeShuffleWithLocalRead' rule, we can convert all the four
// shuffle read to local shuffle read in the bottom two 'BroadcastHashJoin'.
// For the top level 'BroadcastHashJoin', the probe side is not shuffle query stage
// shuffle read to local shuffle read in the bottom two Symbol("b")roadcastHashJoin'.
// For the top level Symbol("b")roadcastHashJoin', the probe side is not shuffle query stage
// and the build side shuffle query stage is also converted to local shuffle read.
checkNumLocalShuffleReads(adaptivePlan, 0)
}
Expand Down Expand Up @@ -652,19 +653,19 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
spark
.range(0, 1000, 1, 10)
.select(
when('id < 250, 249)
.when('id >= 750, 1000)
.otherwise('id)
when(col("id") < 250, 249)
.when(col("id") >= 750, 1000)
.otherwise(col("id"))
.as("key1"),
'id.as("value1"))
col("id").as("value1"))
.createOrReplaceTempView("skewData1")
spark
.range(0, 1000, 1, 10)
.select(
when('id < 250, 249)
.otherwise('id)
when(col("id") < 250, 249)
.otherwise(col("id"))
.as("key2"),
'id.as("value2"))
col("id").as("value2"))
.createOrReplaceTempView("skewData2")

def checkSkewJoin(
Expand Down Expand Up @@ -780,19 +781,19 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
spark
.range(0, 1000, 1, 10)
.select(
when('id < 250, 249)
.when('id >= 750, 1000)
.otherwise('id)
when(col("id") < 250, 249)
.when(col("id") >= 750, 1000)
.otherwise(col("id"))
.as("key1"),
'id.as("value1"))
col("id").as("value1"))
.createOrReplaceTempView("skewData1")
spark
.range(0, 1000, 1, 10)
.select(
when('id < 250, 249)
.otherwise('id)
when(col("id") < 250, 249)
.otherwise(col("id"))
.as("key2"),
'id.as("value2"))
col("id").as("value2"))
.createOrReplaceTempView("skewData2")
val (_, adaptivePlan) =
runAdaptiveAndVerifyResult("SELECT * FROM skewData1 join skewData2 ON key1 = key2")
Expand Down Expand Up @@ -940,7 +941,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
// Repartition with no partition num specified.
checkBHJ(
df.repartition('b),
df.repartition(col("b")),
// The top shuffle from repartition is optimized out.
optimizeOutRepartition = true,
probeSideLocalRead = false,
Expand All @@ -949,7 +950,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

// Repartition with default partition num (5 in test env) specified.
checkBHJ(
df.repartition(5, 'b),
df.repartition(5, col("b")),
// The top shuffle from repartition is optimized out
// The final plan must have 5 partitions, no optimization can be made to the probe side.
optimizeOutRepartition = true,
Expand All @@ -959,7 +960,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

// Repartition with non-default partition num specified.
checkBHJ(
df.repartition(4, 'b),
df.repartition(4, col("b")),
// The top shuffle from repartition is not optimized out
optimizeOutRepartition = false,
probeSideLocalRead = true,
Expand All @@ -968,7 +969,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

// Repartition by col and project away the partition cols
checkBHJ(
df.repartition('b).select('key),
df.repartition(col("b")).select(col("key")),
// The top shuffle from repartition is not optimized out
optimizeOutRepartition = false,
probeSideLocalRead = true,
Expand All @@ -986,15 +987,16 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute
) {
// Repartition with no partition num specified.
checkSMJ(
df.repartition('b),
df.repartition(col("b")),
// The top shuffle from repartition is optimized out.
optimizeOutRepartition = true,
optimizeSkewJoin = false,
coalescedRead = true)
coalescedRead = true
)

// Repartition with default partition num (5 in test env) specified.
checkSMJ(
df.repartition(5, 'b),
df.repartition(5, col("b")),
// The top shuffle from repartition is optimized out.
// The final plan must have 5 partitions, can't do coalesced read.
optimizeOutRepartition = true,
Expand All @@ -1004,15 +1006,16 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute

// Repartition with non-default partition num specified.
checkSMJ(
df.repartition(4, 'b),
df.repartition(4, col("b")),
// The top shuffle from repartition is not optimized out.
optimizeOutRepartition = false,
optimizeSkewJoin = true,
coalescedRead = false)
coalescedRead = false
)

// Repartition by col and project away the partition cols
checkSMJ(
df.repartition('b).select('key),
df.repartition(col("b")).select(col("key")),
// The top shuffle from repartition is not optimized out.
optimizeOutRepartition = false,
optimizeSkewJoin = true,
Expand Down
Loading