-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 4.0 integration #12494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 4.0 integration #12494
Conversation
build.gradle
Outdated
| mavenCentral() | ||
| mavenLocal() | ||
| maven { | ||
| url "https://repository.apache.org/content/repositories/snapshots/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need this for now because this PR uses 4.0.1-SNAPSHOT.
Will remove this in the future
| aliyun-sdk-oss = "3.10.2" | ||
| analyticsaccelerator = "1.0.0" | ||
| antlr = "4.9.3" | ||
| antlr413 = "4.13.1" # For Spark 4.0 support |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be consistent with antlr version in Spark 4.0
build.gradle
Outdated
| testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') | ||
| testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') | ||
| testImplementation libs.awaitility | ||
| testImplementation 'com.zaxxer:HikariCP:5.1.0' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark 4.0 removed dependency on com.jolbox:bonecp. In TestHiveMetastore.java, I explicitly set the connection pool to conf.set("datanucleus.connectionPoolingType", "HikariCP");, otherwise, it will default to ``com.jolbox:bonecp` and we will get
Exception in thread "TThreadPoolServer WorkerProcess-5" java.lang.NoClassDefFoundError: com/jolbox/bonecp/BoneCPConfig
at org.apache.hadoop.hive.metastore.txn.TxnHandler.setupJdbcConnectionPool(TxnHandler.java:3156)
at org.apache.hadoop.hive.metastore.txn.TxnHandler.setConf(TxnHandler.java:260)
Since I explicitly set the connection pool to conf.set("datanucleus.connectionPoolingType", "HikariCP");, we need to add the test dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DBCP(already pulled by Spark, see SPARK-48538) can be used to avoid pulling HikariCP dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
somehow after I changed to DBCP, I started to have
> Task :iceberg-spark:iceberg-spark-runtime-4.0_2.13:integrationTest FAILED
It doesn't have any additional information.
If I run ./gradlew :iceberg-spark:iceberg-spark-runtime-4.0_2.13:integrationTest on my local, it always passes.
Now even after I switched back to HikariCP. Task :iceberg-spark:iceberg-spark-runtime-4.0_2.13:integrationTest still failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be irrelevant, could you try uploading the reports for the failure test?
.github/workflows/spark-ci.yml
- uses: actions/upload-artifact@v4
if: failure()
with:
name: test logs
path: |
**/build/testlogs
+ **/build/reportsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the root cause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caused by: org.junit.platform.commons.JUnitException: OutputDirectoryProvider not available; probably due to unaligned versions of the junit-platform-engine and junit-platform-launcher jars on the classpath/module path.
I guess junit-platform-engine has been upgraded in upstream. I rebased and the problem went away.
| public static final String SPEC_ID_COLUMN_DOC = "Spec ID used to track the file containing a row"; | ||
| public static final NestedField SPEC_ID = | ||
| NestedField.required( | ||
| NestedField.optional( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Spark 4.0, the metadata columns are nullable, so need to change this field to optional.
apache/spark#50246 (comment)
| conf.set( | ||
| HiveConf.ConfVars.HIVE_IN_TEST.varname, HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue()); | ||
|
|
||
| conf.set("datanucleus.connectionPoolingType", "HikariCP"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark 4.0 removed dependency on com.jolbox:bonecp. If not set the cp type, it will default to com.jolbox:bonecp and we will get
Exception in thread "TThreadPoolServer WorkerProcess-5" java.lang.NoClassDefFoundError: com/jolbox/bonecp/BoneCPConfig
at org.apache.hadoop.hive.metastore.txn.TxnHandler.setupJdbcConnectionPool(TxnHandler.java:3156)
at org.apache.hadoop.hive.metastore.txn.TxnHandler.setConf(TxnHandler.java:260)
| val newCyclePath = cyclePath :+ currentViewIdent | ||
| if (currentViewIdent == viewIdent) { | ||
| throw new AnalysisException(String.format("Recursive cycle in view detected: %s (cycle: %s)", | ||
| throw new IcebergAnalysisException(String.format("Recursive cycle in view detected: %s (cycle: %s)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark 4.0 doesn't allow constructing an AnalysisException with error message only. We have to provide an error class. In order to get around this, I have added IcebergAnalysisException, which only needs an error message.
| import org.apache.spark.sql.catalyst.util.truncatedString | ||
| import org.apache.spark.sql.connector.iceberg.catalog.Procedure | ||
|
|
||
| case class IcebergCall(procedure: Procedure, args: Seq[Expression]) extends LeafCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark 4.0 has its own implementation of Call. In this Spark 4.0/Iceberg integration PR, I simply change Call to IcebergCall to avoid name collision. In the future, will need to migrate to Spark's native Call framework.
| TestBase.hiveConf = metastore.hiveConf(); | ||
|
|
||
| TestBase.spark.close(); | ||
| TestBase.spark.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't have this sql.SparkSession.close() any more so replace with stop
| }); | ||
| public void testDelegateUnsupportedProcedure() throws ParseException { | ||
| LogicalPlan plan = parser.parsePlan("CALL cat.d.t()"); | ||
| assertThat(plan.toString().contains("CALL cat.d.t()")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Spark 4.0 has Stored Procedure support, so this doesn't throw ParseException any more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we just remove this test then? I'm not sure what we are checking here any more?
...java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownInRowLevelOperations.java
Show resolved
Hide resolved
| Column[] sortSpec = sortSpec(df, repartitionSpec, false); | ||
| StructType schema = df.schema(); | ||
| String[] identifierFields = | ||
| Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't use Column::toString any more because it now returns a fully qualified column, and it causes fieldIndex(name: String) to fail later.
We actually have a String array of identifierFields in computeUpdateImages (only need to append MetadataColumns.CHANGE_ORDINAL.name() in the end). So Instead of calling
override def toString: String = node.sql
I think it's more efficient to pass in the String array of identifierFields
| SparkTable sparkTable = new SparkTable(table, snapshotId, false); | ||
| DataSourceV2Relation relation = createRelation(sparkTable, ImmutableMap.of()); | ||
| return Dataset.ofRows(spark, relation); | ||
| if (!(spark instanceof org.apache.spark.sql.classic.SparkSession)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark 4.0 has abstract class SparkSession and classic.SparkSession. We will always try to use the general one, and only cast to classic.SparkSession when the private APIs are needed.
The same rule applies to Dataset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I noted other places, but let's change all of these checks to a Precondition if we can
| final Configuration conf = spark.sessionState().newHadoopConf(); | ||
|
|
||
| spark | ||
| .sqlContext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generic abstract class SparkSession doesn't have this API sqlContext any more
|
cc @RussellSpitzer @szehon-ho @amogh-jahagirdar @nastra |
| } | ||
|
|
||
| if (sparkVersions.contains("4.0")) { | ||
| include ":iceberg-spark:spark-4.0_2.13" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this theoretically create Spark 4.0 if Scala 2.12 is set without error? Feels like we should still use scalaVersion here but have an assert > 2.12 or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that Flink does work with scala 2.13. I use 2.13 to build Spark module only. I think we can't assert > 2.12 because we still need 2.12 to build Flink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't that mean when Scala is set to 2.12 we just should not allow building Spark 4.0?
spark/v4.0/build.gradle
Outdated
| testImplementation libs.parquet.hadoop | ||
| testImplementation libs.awaitility | ||
| testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" | ||
| testImplementation "org.apache.datafusion:comet-spark-spark3.5_2.13:0.5.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using the old version of comet here? Just because the new one isn't available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rignt, we don't have a Spark 4.0 Comet yet.
...nsions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ProcedureArgumentCoercion.scala
Show resolved
Hide resolved
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Show resolved
Hide resolved
| extensions.injectResolutionRule { spark => ResolveViews(spark) } | ||
| extensions.injectResolutionRule { _ => ProcedureArgumentCoercion } | ||
| extensions.injectCheckRule(_ => CheckViews) | ||
| extensions.injectResolutionRule { _ => RewriteUpdateTableForRowLineage} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline with @huaxingao , since row lineage in 4.0 will have a very different implementation (no need for custom analyzer rules given teh changes done in apache/spark#49493), we will remove the changes from https://github.com/apache/iceberg/pull/12736/files that are included in this changes.
)" This reverts commit 829ae7a.
…pache#12736)"" This reverts commit 5170f8c.
|
@amogh-jahagirdar I have removed the changes from https://github.com/apache/iceberg/pull/12736/files for Spark 4.0. I have checked all the recent commits after my yesterday's rebase, the only one that has Spark changes is my Comet bump version PR. I will manually port the changes into this PR. After all the checks pass, we can merge. |
|
Ok, thank you @huaxingao for your dilligence on keeping up with this PR, I'll go ahead and merge and we can iterate on any follow ups. When the official spark 4.0 release is performed, we would do the dependency changes then. Thank you @RussellSpitzer @pan3793 @szehon-ho for reviewing. |
|
Thank you all! @amogh-jahagirdar @RussellSpitzer @pan3793 @szehon-ho @aihuaxu |
|
Congrats on merging to trunk, but alright, the commit history is lost eventually |
|
Sorry I forgot to squash the commits. I will fix this |
|
I will revert the PR for now |
This reverts commit ad7f5c4.
This reverts commit ad7f5c4.
This reverts commit a5bcacd.
This reverts commit ad7f5c4.
No description provided.