Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/util/install-spark-resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ case "$1" in
cd ${INSTALL_DIR} && \
install_spark "4.0.1" "3" "2.12"
;;
4.1)
# Spark-4.x, scala 2.12 // using 2.12 as a hack as 4.0 does not have 2.13 suffix
cd ${INSTALL_DIR} && \
install_spark "4.1.0" "3" "2.12"
;;
*)
echo "Spark version is expected to be specified."
exit 1
Expand Down
106 changes: 106 additions & 0 deletions .github/workflows/velox_backend_x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1481,3 +1481,109 @@ jobs:
**/target/*.log
**/gluten-ut/**/hs_err_*.log
**/gluten-ut/**/core.*

spark-test-spark41:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhouyuan
I understand that we need to add this here. Spark 4.1 has a new option spark.sql.unionOutputPartitioning introduced in apache/spark#51623. Currently, it needs to be set to false for successful execution. I plan to submit a separate PR later to address this, which will make the review process more convenient."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs: build-native-lib-centos-7
runs-on: ubuntu-22.04
env:
SPARK_TESTING: true
container: apache/gluten:centos-8-jdk17
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
uses: actions/download-artifact@v4
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
- name: Download Arrow Jars
uses: actions/download-artifact@v4
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare
run: |
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools==77.0.3 && \
pip3 install pyspark==3.5.5 cython && \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pyspark version should be 4.1.0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackylee-ch

Interesting, it was copied from Spark 4.0, cc @zhouyuan

However, starting with Spark 4.1(apache/spark#51259), the minimum supported Python version is 3.10. I'm not familiar with how to configure the Python environment, so I've excluded these two unit tests for now, see (2ef147c).

pip3 install pandas==2.2.3 pyarrow==20.0.0
- name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image update
run: |
rm -rf /opt/shims/spark41
bash .github/workflows/util/install-spark-resources.sh 4.1
mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 /opt/shims/spark41/spark_home/assembly/target/scala-2.13
- name: Build and Run unit test for Spark 4.1.0 with scala-2.13 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.13
yum install -y java-17-openjdk-devel
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
if: always()
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}-report
path: '**/surefire-reports/TEST-*.xml'
- name: Upload unit tests log files
if: ${{ !success() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}-test-log
path: |
**/target/*.log
**/gluten-ut/**/hs_err_*.log
**/gluten-ut/**/core.*

spark-test-spark41-slow:
needs: build-native-lib-centos-7
runs-on: ubuntu-22.04
env:
SPARK_TESTING: true
container: apache/gluten:centos-8-jdk17
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
uses: actions/download-artifact@v4
with:
name: velox-native-lib-centos-7-${{github.sha}}
path: ./cpp/build/releases
- name: Download Arrow Jars
uses: actions/download-artifact@v4
with:
name: arrow-jars-centos-7-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image update
run: |
rm -rf /opt/shims/spark41
bash .github/workflows/util/install-spark-resources.sh 4.1
mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 /opt/shims/spark41/spark_home/assembly/target/scala-2.13
- name: Build and Run unit test for Spark 4.0 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
yum install -y java-17-openjdk-devel
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: Upload test report
if: always()
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}-report
path: '**/surefire-reports/TEST-*.xml'
- name: Upload unit tests log files
if: ${{ !success() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}-test-log
path: |
**/target/*.log
**/gluten-ut/**/hs_err_*.log
**/gluten-ut/**/core.*
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ import java.nio.charset.StandardCharsets

import scala.collection.convert.ImplicitConversions.`map AsScala`

/**
* Extracts a CSVTable from a DataSourceV2Relation.
*
* Only the table variable of DataSourceV2Relation is accessed to improve compatibility across
* different Spark versions.
* @since Spark
* 4.1
*/
private object CSVTableExtractor {
def unapply(relation: DataSourceV2Relation): Option[(DataSourceV2Relation, CSVTable)] = {
relation.table match {
case t: CSVTable =>
Some((relation, t))
case _ => None
}
}
}

@Experimental
case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
Expand All @@ -56,25 +74,15 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
l.copy(relation = r.copy(fileFormat = new ArrowCSVFileFormat(csvOptions))(session))
case _ => l
}
case d @ DataSourceV2Relation(
t @ CSVTable(
name,
sparkSession,
options,
paths,
userSpecifiedSchema,
fallbackFileFormat),
_,
_,
_,
_) if validate(session, t.dataSchema, options.asCaseSensitiveMap().toMap) =>
case CSVTableExtractor(d, t)
if validate(session, t.dataSchema, t.options.asCaseSensitiveMap().toMap) =>
d.copy(table = ArrowCSVTable(
"arrow" + name,
sparkSession,
options,
paths,
userSpecifiedSchema,
fallbackFileFormat))
"arrow" + t.name,
t.sparkSession,
t.options,
t.paths,
t.userSpecifiedSchema,
t.fallbackFileFormat))
case r =>
r
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReader, ParquetOptions}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
Expand Down Expand Up @@ -135,7 +135,7 @@ object ParquetMetadataUtils extends Logging {
parquetOptions: ParquetOptions): Option[String] = {
val footer =
try {
ParquetFooterReader.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER)
ParquetFooterReaderShim.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER)
} catch {
case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) =>
return Some("Encrypted Parquet footer detected.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,11 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
val df = sql("SELECT 1")
checkAnswer(df, Row(1))
val plan = df.queryExecution.executedPlan
assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined)
if (isSparkVersionGE("4.1")) {
assert(plan.find(_.getClass.getSimpleName == "OneRowRelationExec").isDefined)
} else {
assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined)
}
assert(plan.find(_.isInstanceOf[ProjectExecTransformer]).isDefined)
assert(plan.find(_.isInstanceOf[RowToVeloxColumnarExec]).isDefined)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,9 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {

// The computing is combined into one single whole stage transformer.
val wholeStages = plan.collect { case wst: WholeStageTransformer => wst }
if (SparkShimLoader.getSparkVersion.startsWith("3.2.")) {
if (isSparkVersionLE("3.2")) {
assert(wholeStages.length == 1)
} else if (
SparkShimLoader.getSparkVersion.startsWith("3.5.") ||
SparkShimLoader.getSparkVersion.startsWith("4.0.")
) {
} else if (isSparkVersionGE("3.5")) {
assert(wholeStages.length == 5)
} else {
assert(wholeStages.length == 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ class VeloxStringFunctionsSuite extends VeloxWholeStageTransformerSuite {
s"from $LINEITEM_TABLE limit 5") { _ => }
}

testWithMinSparkVersion("split", "3.4") {
// TODO: fix on spark-4.1
testWithSpecifiedSparkVersion("split", "3.4", "3.5") {
runQueryAndCompare(
s"select l_orderkey, l_comment, split(l_comment, '') " +
s"from $LINEITEM_TABLE limit 5") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
.set("spark.executor.cores", "1")
}

test("arrow_udf test: without projection") {
// TODO: fix on spark-4.1
testWithMaxSparkVersion("arrow_udf test: without projection", "4.0") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
.toDF("a", "b")
Expand All @@ -59,7 +60,8 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
checkAnswer(df2, expected)
}

test("arrow_udf test: with unrelated projection") {
// TODO: fix on spark-4.1
testWithMaxSparkVersion("arrow_udf test: with unrelated projection", "4.0") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
.toDF("a", "b")
Expand Down
2 changes: 1 addition & 1 deletion dev/format-scala-code.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ MVN_CMD="${BASEDIR}/../build/mvn"
# If a new profile is introduced for new modules, please add it here to ensure
# the new modules are covered.
PROFILES="-Pbackends-velox -Pceleborn,uniffle -Piceberg,delta,hudi,paimon \
-Pspark-3.2,spark-3.3,spark-3.4,spark-3.5,spark-4.0 -Pspark-ut"
-Pspark-3.2,spark-3.3,spark-3.4,spark-3.5,spark-4.0,spark-4.1 -Pspark-ut"

COMMAND=$1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.gluten.extension.caller

import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.util.SparkVersionUtil

/**
* Helper API that stores information about the call site of the columnar rule. Specific columnar
Expand Down Expand Up @@ -70,7 +70,12 @@ object CallerInfo {
}

private def inStreamingCall(stack: Seq[StackTraceElement]): Boolean = {
stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head))
val streamName = if (SparkVersionUtil.gteSpark41) {
"org.apache.spark.sql.execution.streaming.runtime.StreamExecution"
} else {
"org.apache.spark.sql.execution.streaming.StreamExecution"
}
stack.exists(_.getClassName.equals(streamName))
}

private def inBloomFilterStatFunctionCall(stack: Seq[StackTraceElement]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object SparkVersionUtil {
val gteSpark33: Boolean = comparedWithSpark33 >= 0
val gteSpark35: Boolean = comparedWithSpark35 >= 0
val gteSpark40: Boolean = compareMajorMinorVersion((4, 0)) >= 0
val gteSpark41: Boolean = compareMajorMinorVersion((4, 1)) >= 0

// Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one > other.
def compareMajorMinorVersion(other: (Int, Int)): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,13 +856,6 @@ object ExpressionConverter extends SQLConfHelper with Logging {
dateAdd.children,
dateAdd
)
case timeAdd: TimeAdd =>
BackendsApiManager.getSparkPlanExecApiInstance.genDateAddTransformer(
attributeSeq,
substraitExprName,
timeAdd.children,
timeAdd
)
case ss: StringSplit =>
BackendsApiManager.getSparkPlanExecApiInstance.genStringSplitTransformer(
substraitExprName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ object ExpressionMappings {
Sig[Second](EXTRACT),
Sig[FromUnixTime](FROM_UNIXTIME),
Sig[DateAdd](DATE_ADD),
Sig[TimeAdd](TIMESTAMP_ADD),
Sig[DateSub](DATE_SUB),
Sig[DateDiff](DATE_DIFF),
Sig[ToUnixTimestamp](TO_UNIX_TIMESTAMP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution

import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.PlanUtil

import org.apache.spark.sql.{Column, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
Expand Down Expand Up @@ -130,10 +130,8 @@ object GlutenImplicits {
val (innerNumGlutenNodes, innerFallbackNodeToReason) =
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// re-plan manually to skip cached data
val newSparkPlan = QueryExecution.createSparkPlan(
spark,
spark.sessionState.planner,
p.inputPlan.logicalLink.get)
val newSparkPlan = SparkShimLoader.getSparkShims.createSparkPlan(
spark, spark.sessionState.planner, p.inputPlan.logicalLink.get)
val newExecutedPlan = QueryExecution.prepareExecutedPlan(
spark,
newSparkPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with GlutenSQLTest
import testImplicits._

private def readRowGroupRowCounts(path: String): Seq[Long] = {
ParquetFooterReader
ParquetFooterReaderShim
.readFooter(
spark.sessionState.newHadoopConf(),
new Path(path),
Expand Down
Loading