Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Aug 31, 2022

Change Logs

As part of adding support for Spark 3.3 in Hudi 0.12, a lot of the logic from Spark 3.2 module has been simply copied over.

This PR is rectifying that by

  1. Creating new module "hudi-spark3.2plus-common" (that is shared across Spark 3.2 and Spark 3.3)
  2. Moving shared components under "hudi-spark3.2plus-common"

Impact

Low

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@alexeykudinkin alexeykudinkin changed the title [WIP][HUDI-4690] Cleaning up duplicated classes in Spark 3.3 module [HUDI-4690] Cleaning up duplicated classes in Spark 3.3 module Aug 31, 2022
@alexeykudinkin alexeykudinkin changed the title [HUDI-4690] Cleaning up duplicated classes in Spark 3.3 module [HUDI-4691] Cleaning up duplicated classes in Spark 3.3 module Sep 1, 2022
@codope codope self-assigned this Sep 1, 2022
@codope
Copy link
Member

codope commented Sep 1, 2022

@alexeykudinkin The build for spark3 profile failed for me locally. Did it pass for you?
Fails for all Spark3 profiles.

[ERROR] Failed to execute goal on project hudi-utilities_2.12: Could not resolve dependencies for project org.apache.hudi:hudi-utilities_2.12:jar:0.13.0-SNAPSHOT: Failure to find io.streamnative.connectors:pulsar-spark-connector_2.12:jar:2.4.5 in https://repo.maven.apache.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of Maven Central has elapsed or updates are forced

@alexeykudinkin
Copy link
Contributor Author

alexeykudinkin commented Sep 1, 2022

@codope I've realized what's the issue you've stumbled upon: you need to specify -Dscala-2.12 profile explicitly to make sure correct version of Pulsar connector is picked up.

I'm gonna address this issue in this PR as well.

</dependency>

<!-- Hoodie -->
<dependency>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not necessary -- we can just depend on the terminal "hudi_spark" module

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<artifactId>hudi-spark_${scala.binary.version}</artifactId>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GH really weirdly shows re-ordering of some of these.

Changes:

  • Removed unnecessary deps (see comment above)
  • Re-ordered Spark deps to be grouped together (for easier discovery)


} else if (HoodieSparkUtils.gteqSpark3_1) {
val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.Spark312ResolveHudiAlterTableCommand"
val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.Spark31ResolveHudiAlterTableCommand"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a problem if not all writers and readers use the same Hudi version? Maybe not in this case, but just calling it out to think through the change of class names. I had encountered one issue while doing HBase upgrade (however that was because we actually wrote the kv comparator class name in data files).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in this case class renames don't have any impact

sparkAdapter.getCatalogUtils.asInstanceOf[HoodieSpark3CatalogUtils]
.unapplyBucketTransform(t)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline

* </ol>
*/
def unapplyBucketTransform(t: Transform): Option[(Int, Seq[NamedReference], Seq[NamedReference])]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline

override def output: Seq[Attribute] = Nil
override def unapplyBucketTransform(t: Transform): Option[(Int, Seq[NamedReference], Seq[NamedReference])] =
t match {
case BucketTransform(numBuckets, ref) => Some(numBuckets, Seq(ref), Seq.empty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sorted refs is an empty sequence? Did BucketTransform not support sorted refs argument in Spark 3.1 or 3.2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, they for whatever reason just submit a single ref


// TODO: we should remove this file when we support datasourceV2 for hoodie on spark3.1x
case class AlterTableCommand312(table: CatalogTable, changes: Seq[TableChange], changeType: ColumnChangeID) extends RunnableCommand with Logging {
case class Spark31AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], changeType: ColumnChangeID) extends RunnableCommand with Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the refactoring has been done with the assumption that Spark won't break things between patch versions. I think it's a fair assumption. However, it cannot be guaranteed. Just something to be cautious about in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do refined our Spark compatibility mode in 0.11: we now promise we'd stay compatible w/ ALL versions w/in a minor branch.

import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.util.Utils

object Spark32DataSourceUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps due to removal of this, we need that Legacy behavior policy in avro deserializer. Would be good to move some of the comments there.

Copy link
Contributor Author

@alexeykudinkin alexeykudinkin Sep 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, not propagating a config was a miss before (b/c of duplication of the classes, it was handled in 3.2, but not in 3.1)

* Please check out HUDI-4178 for more details
*/
class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ {
class Spark32PlusDefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove all the commented part in this class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is to restore it once we migrate to DSv2

pom.xml Outdated
<hudi.spark.module>hudi-spark3.2.x</hudi.spark.module>
<hudi.spark.common.module>hudi-spark3-common</hudi.spark.common.module>
<!-- This glob has to include hudi-spark3-common, hudi-spark3.2plus-common -->
<hudi.spark.common.modules.glob>hudi-spark3*-common</hudi.spark.common.modules.glob>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to avoid glob pattern and define separate parameter like hudi.spark2.common.module, hudi.spark3.common.module, hudi.spark32plus.common.module, hudi.spark33plus.common.module (in future)? It makes it easier to cherry-pick, otherwise we need to maintain glob pattern every so often.

Copy link
Contributor Author

@alexeykudinkin alexeykudinkin Sep 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not a big fan of globbing (it's quite brittle). Let me try to have separate property (the only reason i opted for globbing initially was b/c wasn't sure if Maven will be able to handle an empty clause, since for ex, this parameter for Spark 2 and 3.1 would be empty)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 prefer to explicitly add

@nsivabalan nsivabalan self-assigned this Sep 3, 2022
@yihua yihua added engine:spark Spark integration priority:blocker Production down; release blocker labels Sep 5, 2022
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few minor comments. mostly looks ok to me.
I have 2 ask though:

  • Lets run integration tests for all diff spark versions and ensure a green report.
  • Lets test out slim bundles as well ( as I see some changes in slim bundles pom as well)

val spark3Analysis: RuleBuilder =
session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]]

val spark3ResolveReferencesClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be not related to changes in this patch. but "HoodieSpark3Analysis" actually refers to 3.2 or greater right? but the naming is not right. I don't see it being used for 3.1 below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be you can name the variable in L78, 79 accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is actually deleted



org.apache.hudi.Spark3xDefaultSource
org.apache.hudi.Spark31DefaultSource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we keep it as Spark31xDefaultSource

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using 2 digits nomenclature to designate Spark version in class names (Spark31, Spark32, etc)


private val avroDeserializer = new AvroDeserializer(rootAvroType, rootCatalystType)
private val avroDeserializer = {
val avroRebaseModeInRead = LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per master, I see this conf is of interest only for spark3.2 and above. But now, we are also adding it for spark3.1. is that intentional ?

case IdentityTransform(FieldReference(Seq(col))) =>
identityCols += col

case MatchBucketTransform(numBuckets, col, sortCol) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this some bug fix. doesn't look like pure refactoring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fix for BucketTransform changed b/w Spark 3.2 and 3.3

import java.net.URI
import java.util
import scala.collection.JavaConverters.{mapAsScalaMapConverter, setAsJavaSetConverter}
import scala.jdk.CollectionConverters.{mapAsScalaMapConverter, setAsJavaSetConverter}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this intentional change? I see we widely use scala.collection.JavaConverters. ? If you prefer to replace all usages,can we file a tracking jira and do it separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaConverters are not available in Scala 2.11

<exclude>ch.qos.logback:logback-classic</exclude>
<!-- NOTE: We're banning any HBase deps versions other than the approved ${hbase.version},
which is aimed at preventing the classpath collisions w/ transitive deps usually) -->
<exclude>org.apache.hbase:hbase-common:*</exclude>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua : can you review hbase related changes. mostly should be good, just wanted to double confirm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. In terms of the actual impact, does this only resolve issues for Spark 3 due to transitive dependencies of Hive? I assume without this change, Spark 2 should still work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should resolve issues for all Spark versions

pom.xml Outdated
<hudi.spark.module>hudi-spark3.2.x</hudi.spark.module>
<hudi.spark.common.module>hudi-spark3-common</hudi.spark.common.module>
<!-- This glob has to include hudi-spark3-common, hudi-spark3.2plus-common -->
<hudi.spark.common.modules.glob>hudi-spark3*-common</hudi.spark.common.modules.glob>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 prefer to explicitly add

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-common_${scala.binary.version}</artifactId>
<artifactId>hudi-common</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hudi-spark pulls in hudi-common as well right. do we need to explicitly depend here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan the note above explained it


variables:
BUILD_PROFILES: '-Dscala-2.11 -Dspark2 -Dflink1.14'
BUILD_PROFILES: '-Dscala-2.11 -Dspark2.4 -Dflink1.14'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we update the docs/readme as well?

<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
<exclusions>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive-* and hbase-* dependencies are already shaded. wouldn't that be sufficient? why do we need to exclude here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive brings its own HBase deps, that collied w/ the ones we depend on (we need 2.4.9, Hive brings 1.1.1)

@alexeykudinkin
Copy link
Contributor Author

Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except the modules.glob part. Also hard to spot any further issue from the pom diff. this requires more bundle testing with different combinations

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for addressing the comments.

@codope codope merged commit 8c296e0 into apache:master Sep 14, 2022

<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, these 2 dependencies affect the hudi-flink module, and the local tests can not run now because flink 1.15.x requires scala 2.12.x, can you fix that ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:spark Spark integration priority:blocker Production down; release blocker

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

7 participants