Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
da2bd5c
[Scala 2.13][IntelliJ] Remove suppression for lint-multiarg-infix war…
baibaichen Dec 1, 2025
091bde8
[Scala 2.13][IntelliJ] Suppress warning for `ContentFile::path`
Dec 15, 2025
3613026
[Scala 2.13][IntelliJ] Suppress warning for ContextAwareIterator init…
baibaichen Dec 1, 2025
40a596e
[Scala 2.13][IntelliJ] Refactor to use Symbol for column references t…
baibaichen Dec 1, 2025
a87e851
[Fix] Replace deprecated fileToString with Files.readString for file …
baibaichen Dec 22, 2025
94478cb
[Scala 2.13][IntelliJ] Update the Java compiler release version from …
baibaichen Dec 24, 2025
fdb0a26
[Feat] Update spark.version to 4.1.0 in pom.xml
baibaichen Dec 17, 2025
d33198a
[Fix] Update Scala version to 2.13.17 in pom.xml to fix `java.lang.No…
baibaichen Dec 17, 2025
b1693c6
[Fix] Update Spark and Scala versions in pom.xml to 4.1.0 and 2.13.17
baibaichen Dec 18, 2025
dd020be
[Fix] Import `StoragePartitionJoinParams` in batch scan classes
baibaichen Dec 4, 2025
fa7e601
[Fix] Remove unused MDC import from FileSourceScanExecShim.scala
baibaichen Dec 4, 2025
0008967
[Fix] Use `.key` for sparkConf.set in test suites.
baibaichen Dec 4, 2025
3fd9753
[Fix] Add printOutputColumns parameter to generateTreeString methods
Dec 4, 2025
c60dccf
[Fix] Update imports to reflect streaming runtime package refactoring…
baibaichen Dec 16, 2025
1af7ce8
[Fix] Using new interface of ParquetFooterReader
baibaichen Dec 16, 2025
81ad11c
[Fix] Adapt to DataSourceV2Relation interface change
baibaichen Dec 16, 2025
2cc6581
[Fix] Adapt to QueryExecution.createSparkPlan interface change
baibaichen Dec 16, 2025
5006feb
[Draft] Remove TimeAdd from ExpressionConverter and ExpressionMapping…
baibaichen Dec 4, 2025
334ee4e
[Feat] Bump Spark 4.0 dependency to 4.1.0 in install script
baibaichen Dec 18, 2025
e59c5b5
[Fix] Update Parquet dependency version to 1.16.0 in pom.xml
baibaichen Dec 18, 2025
0bf988f
[Fix] Refactor Spark version checks in VeloxHashJoinSuite to improve …
baibaichen Dec 26, 2025
7d88ad9
[Fix] Fix MiscOperatorSuite to support OneRowRelationExec plan Spark 4.1
baibaichen Dec 26, 2025
3b32bda
[Fix] Refactor Spark version checks in VeloxHashJoinSuite to improve …
baibaichen Dec 30, 2025
a451fff
[4.1.0] Exclude SPLIT test in VeloxStringFunctionsSuite
baibaichen Dec 26, 2025
928721e
[4.1.0] Only Run ArrowEvalPythonExecSuite tests up to Spark 4.0, we n…
baibaichen Dec 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/util/install-spark-resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ case "$1" in
cd ${INSTALL_DIR} && \
install_spark "3.5.5" "3" "2.13"
;;
4.0)
# Spark-4.0, scala 2.12 // using 2.12 as a hack as 4.0 does not have 2.13 suffix
4.1)
# Spark-4.x, scala 2.12 // using 2.12 as a hack as 4.0 does not have 2.13 suffix
cd ${INSTALL_DIR} && \
install_spark "4.0.1" "3" "2.12"
install_spark "4.1.0" "3" "2.12"
;;
*)
echo "Spark version is expected to be specified."
Expand Down
24 changes: 12 additions & 12 deletions .github/workflows/velox_backend_x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ jobs:
ccache -s
"

spark-test-spark40:
spark-test-spark41:
needs: build-native-lib-centos-7
runs-on: ubuntu-22.04
env:
Expand All @@ -1403,11 +1403,11 @@ jobs:
pip3 install setuptools==77.0.3 && \
pip3 install pyspark==3.5.5 cython && \
pip3 install pandas==2.2.3 pyarrow==20.0.0
- name: Prepare Spark Resources for Spark 4.0.1 #TODO remove after image update
- name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image update
run: |
rm -rf /opt/shims/spark40
bash .github/workflows/util/install-spark-resources.sh 4.0
mv /opt/shims/spark40/spark_home/assembly/target/scala-2.12 /opt/shims/spark40/spark_home/assembly/target/scala-2.13
rm -rf /opt/shims/spark41
bash .github/workflows/util/install-spark-resources.sh 4.1
mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 /opt/shims/spark41/spark_home/assembly/target/scala-2.13
- name: Build and Run unit test for Spark 4.0.0 with scala-2.13 (other tests)
run: |
cd $GITHUB_WORKSPACE/
Expand All @@ -1417,7 +1417,7 @@ jobs:
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test -Pspark-4.0 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark40/spark_home/" \
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
if: always()
Expand All @@ -1435,7 +1435,7 @@ jobs:
**/gluten-ut/**/hs_err_*.log
**/gluten-ut/**/core.*

spark-test-spark40-slow:
spark-test-spark41-slow:
needs: build-native-lib-centos-7
runs-on: ubuntu-22.04
env:
Expand All @@ -1453,11 +1453,11 @@ jobs:
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare Spark Resources for Spark 4.0.1 #TODO remove after image update
- name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image update
run: |
rm -rf /opt/shims/spark40
bash .github/workflows/util/install-spark-resources.sh 4.0
mv /opt/shims/spark40/spark_home/assembly/target/scala-2.12 /opt/shims/spark40/spark_home/assembly/target/scala-2.13
rm -rf /opt/shims/spark41
bash .github/workflows/util/install-spark-resources.sh 4.1
mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 /opt/shims/spark41/spark_home/assembly/target/scala-2.13
- name: Build and Run unit test for Spark 4.0 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
Expand All @@ -1466,7 +1466,7 @@ jobs:
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test -Pspark-4.0 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark40/spark_home/" \
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: Upload test report
if: always()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.PermissiveMode
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2RelationShim.CSVTableExtractor
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkArrowUtil

Expand All @@ -56,25 +55,15 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
l.copy(relation = r.copy(fileFormat = new ArrowCSVFileFormat(csvOptions))(session))
case _ => l
}
case d @ DataSourceV2Relation(
t @ CSVTable(
name,
sparkSession,
options,
paths,
userSpecifiedSchema,
fallbackFileFormat),
_,
_,
_,
_) if validate(session, t.dataSchema, options.asCaseSensitiveMap().toMap) =>
case CSVTableExtractor(d, t)
if validate(session, t.dataSchema, t.options.asCaseSensitiveMap().toMap) =>
d.copy(table = ArrowCSVTable(
"arrow" + name,
sparkSession,
options,
paths,
userSpecifiedSchema,
fallbackFileFormat))
"arrow" + t.name,
t.sparkSession,
t.options,
t.paths,
t.userSpecifiedSchema,
t.fallbackFileFormat))
case r =>
r
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReader, ParquetOptions}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
Expand Down Expand Up @@ -135,7 +135,7 @@ object ParquetMetadataUtils extends Logging {
parquetOptions: ParquetOptions): Option[String] = {
val footer =
try {
ParquetFooterReader.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER)
ParquetFooterReaderShim.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER)
} catch {
case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) =>
return Some("Encrypted Parquet footer detected.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
import java.io.{DataInputStream, DataOutputStream}
import java.util.concurrent.atomic.AtomicBoolean

import scala.annotation.nowarn
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -362,7 +363,7 @@ case class ColumnarArrowEvalPythonExec(
StructField(s"_$i", dt)
}.toSeq)

val contextAwareIterator = new ContextAwareIterator(context, iter)
@nowarn val contextAwareIterator = new ContextAwareIterator(context, iter)
val inputCbCache = new ArrayBuffer[ColumnarBatch]()
var start_time: Long = 0
val inputBatchIter = contextAwareIterator.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,11 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
val df = sql("SELECT 1")
checkAnswer(df, Row(1))
val plan = df.queryExecution.executedPlan
assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined)
if (isSparkVersionGE("4.1")) {
assert(plan.find(_.getClass.getSimpleName == "OneRowRelationExec").isDefined)
} else {
assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined)
}
assert(plan.find(_.isInstanceOf[ProjectExecTransformer]).isDefined)
assert(plan.find(_.isInstanceOf[RowToVeloxColumnarExec]).isDefined)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,9 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {

// The computing is combined into one single whole stage transformer.
val wholeStages = plan.collect { case wst: WholeStageTransformer => wst }
if (SparkShimLoader.getSparkVersion.startsWith("3.2.")) {
if (isSparkVersionLE("3.2")) {
assert(wholeStages.length == 1)
} else if (
SparkShimLoader.getSparkVersion.startsWith("3.5.") ||
SparkShimLoader.getSparkVersion.startsWith("4.0.")
) {
} else if (isSparkVersionGE("3.5")) {
assert(wholeStages.length == 5)
} else {
assert(wholeStages.length == 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ class VeloxStringFunctionsSuite extends VeloxWholeStageTransformerSuite {
s"from $LINEITEM_TABLE limit 5") { _ => }
}

testWithMinSparkVersion("split", "3.4") {
// TODO: fix on spark-4.1
testWithSpecifiedSparkVersion("split", "3.4", "3.5") {
runQueryAndCompare(
s"select l_orderkey, l_comment, split(l_comment, '') " +
s"from $LINEITEM_TABLE limit 5") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
.set("spark.executor.cores", "1")
}

test("arrow_udf test: without projection") {
testWithMaxSparkVersion("arrow_udf test: without projection", "4.0") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
.toDF("a", "b")
Expand All @@ -59,7 +59,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
checkAnswer(df2, expected)
}

test("arrow_udf test: with unrelated projection") {
testWithMaxSparkVersion("arrow_udf test: with unrelated projection", "4.0") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
.toDF("a", "b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.gluten.extension.caller

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.util.SparkVersionUtil

/**
* Helper API that stores information about the call site of the columnar rule. Specific columnar
Expand Down Expand Up @@ -70,7 +70,12 @@ object CallerInfo {
}

private def inStreamingCall(stack: Seq[StackTraceElement]): Boolean = {
stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head))
val streamName = if (SparkVersionUtil.gteSpark41) {
"org.apache.spark.sql.execution.streaming.runtime.StreamExecution"
} else {
"org.apache.spark.sql.execution.streaming.StreamExecution"
}
stack.exists(_.getClassName.equals(streamName))
}

private def inBloomFilterStatFunctionCall(stack: Seq[StackTraceElement]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object SparkVersionUtil {
val gteSpark33: Boolean = comparedWithSpark33 >= 0
val gteSpark35: Boolean = comparedWithSpark35 >= 0
val gteSpark40: Boolean = compareMajorMinorVersion((4, 0)) >= 0
val gteSpark41: Boolean = compareMajorMinorVersion((4, 1)) >= 0

// Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one > other.
def compareMajorMinorVersion(other: (Int, Int)): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil
import java.lang.{Class, Long => JLong}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}

import scala.annotation.nowarn
import scala.collection.JavaConverters._

object GlutenIcebergSourceUtil {
Expand All @@ -50,6 +51,7 @@ object GlutenIcebergSourceUtil {
}
}

@nowarn
def genSplitInfo(
partition: SparkDataSourceRDDPartition,
readPartitionSchema: StructType): SplitInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,13 +856,6 @@ object ExpressionConverter extends SQLConfHelper with Logging {
dateAdd.children,
dateAdd
)
case timeAdd: TimeAdd =>
BackendsApiManager.getSparkPlanExecApiInstance.genDateAddTransformer(
attributeSeq,
substraitExprName,
timeAdd.children,
timeAdd
)
case ss: StringSplit =>
BackendsApiManager.getSparkPlanExecApiInstance.genStringSplitTransformer(
substraitExprName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ object ExpressionMappings {
Sig[Second](EXTRACT),
Sig[FromUnixTime](FROM_UNIXTIME),
Sig[DateAdd](DATE_ADD),
Sig[TimeAdd](TIMESTAMP_ADD),
Sig[DateSub](DATE_SUB),
Sig[DateDiff](DATE_DIFF),
Sig[ToUnixTimestamp](TO_UNIX_TIMESTAMP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
import org.apache.gluten.utils.PlanUtil

import org.apache.spark.sql.{Column, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
Expand Down Expand Up @@ -130,10 +129,8 @@ object GlutenImplicits {
val (innerNumGlutenNodes, innerFallbackNodeToReason) =
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// re-plan manually to skip cached data
val newSparkPlan = QueryExecution.createSparkPlan(
spark,
spark.sessionState.planner,
p.inputPlan.logicalLink.get)
val newSparkPlan = QueryExecutionShim.createSparkPlan(
spark, spark.sessionState.planner, p.inputPlan.logicalLink.get)
val newExecutedPlan = QueryExecution.prepareExecutedPlan(
spark,
newSparkPlan
Expand Down
8 changes: 6 additions & 2 deletions gluten-ut/spark40/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
<packaging>jar</packaging>
<name>Gluten Unit Test Spark40</name>

<properties>
<parquet.version>1.16.0</parquet.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
Expand All @@ -23,14 +27,14 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.15.2</version>
<version>${parquet.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>1.15.2</version>
<version>${parquet.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.TimestampTypes
Expand All @@ -40,6 +40,7 @@ import org.apache.spark.util.Utils

import java.io.File
import java.net.URI
import java.nio.file.Files
import java.util.Locale

import scala.collection.mutable
Expand Down Expand Up @@ -320,7 +321,7 @@ class GlutenSQLQueryTestSuite
newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER")
}

val input = fileToString(new File(testCase.inputFile))
val input = Files.readString(new File(testCase.inputFile).toPath)

val (comments, code) = splitCommentsAndCodes(input)

Expand All @@ -331,7 +332,7 @@ class GlutenSQLQueryTestSuite
testCaseName =>
listTestCases.find(_.name == testCaseName).map {
testCase =>
val input = fileToString(new File(testCase.inputFile))
val input = Files.readString(new File(testCase.inputFile).toPath)
val (_, code) = splitCommentsAndCodes(input)
code
}
Expand Down Expand Up @@ -738,7 +739,7 @@ class GlutenSQLQueryTestSuite
makeOutput: (String, Option[String], String) => QueryTestOutput): Unit = {
// Read back the golden file.
val expectedOutputs: Seq[QueryTestOutput] = {
val goldenOutput = fileToString(new File(resultFile))
val goldenOutput = Files.readString(new File(resultFile).toPath)
val segments = goldenOutput.split("-- !query.*\n")

val numSegments = outputs.map(_.numSegments).sum + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.internal.SQLConf

abstract class GlutenDynamicPartitionPruningSuiteBase
Expand Down
Loading
Loading