Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Feb 6, 2023

Change Logs

NOTE: This is a squashed version of the original PR #6361

This PR cleans up considerable amount of Spark's (internal) resolution logic that has been copied over into Hudi components, while in reality there's no actual need for that -- instead we can rely on Spark itself to resolve most of these
constructs and instead plugin our custom implementation (post-hoc) after resolution has been performed.

Issues this will be addressing (among others):

Impact

Resolves many Spark SQL issues stemming from the inconsistent implementation of Hudi's bespoke resolution rules.

Risk level (write none, low medium or high below)

Medium

This is being verified t/h a combination of

  • CI (GH/Azure)
  • Long running Jenkins job (verifying Spark DS aspect of it)
  • Benchmarking

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

* of the [[LogicalRelation]] resolving into Hudi table. Note that, it's a safe operation since we
* actually need to ignore these values anyway
*/
case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends Rule[LogicalPlan] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core of the change:

  • Here we remove implementation of the bespoke resolution rules for Hudi components, instead relying on Spark to resolve these (most of these constructs don't have any custom logic relative to vanilla Spark SQL and therefore would be perfectly fine resolved by standard Spark resolution rules)
  • Instead we repurpose these rules to serve as conversion point from Spark's standard constructs (like MergeIntoTable) into Hudi's ones that implement Hudi-specific semantic (MergeIntoHoodieTableCommand). Note that, we require these constructs be fully resolved prior to us trying to convert them

*
* @param sparkSession
*/
case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[LogicalPlan]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of the custom rule that is removed completely

*
* TODO explain workflow for MOR tables
*/
case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends HoodieLeafRunnableCommand
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to this command were required due to

  • Switching from bespoke resolution to Spark's standard one: we need to abide by Spark's semantic and had to get rid of some customizations implemented previously
  • Cleaned up the code considerably simplifying the implementation by getting rid of custom utilities and replacing them w/ Spark's standard ones (for ex, to resolve, bind expressions, etc)
  • Adding documentation to elaborate on the overall workflow

private val resolver = spark.sessionState.conf.resolver

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These rules were borrowed from Spark 3.1.3 to bring support for Merge Into statement to Spark 2.x, which doesn't have it out of the box


public static ColumnUpdateChange get(InternalSchema schema) {
return new ColumnUpdateChange(schema);
private ColumnUpdateChange(InternalSchema schema) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary to properly handle various case-sensitivity settings

@alexeykudinkin alexeykudinkin removed the priority:blocker Production down; release blocker label Feb 7, 2023
@codope codope added the priority:high Significant impact; potential bugs label Feb 7, 2023
@alexeykudinkin alexeykudinkin added priority:critical Production degraded; pipelines stalled and removed priority:high Significant impact; potential bugs labels Feb 7, 2023
@xiarixiaoyao xiarixiaoyao self-assigned this Feb 8, 2023
}

object HoodieCatalystExpressionUtils {
object HoodieCatalystExpressionUtils extends SparkAdapterSupport {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to extend SparkAdapterSupport? Is there something that changes across spark versions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, adapter is needed to match Cast expression (MatchCast object below)

*/
def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan
// TODO scala-docs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs or remove the comment?

*/
public static <T, U> U reduce(Collection<T> c, U identity, BiFunction<U, T, U> reducer) {
return c.stream()
.sequential()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have to be strictly sequential? I mean the elements of collection should be independent of each other. Is there any value add in parameterizing this behavior, say we add a boolean shouldReduceParallelly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reducing is inherently sequential operation. In this case i'm just creating a convenience wrapper enforcing this in exchange of simplifying the API provided by the streams (if someone wants to do it in parallel they can use Streams API directly)

protected final boolean caseSensitive;

BaseColumnChange(InternalSchema schema) {
this(schema, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why default caseSensitive is false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep things compatible w/ how they are today

val resolver = sparkSession.sessionState.analyzer.resolver
val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField

// TODO elaborate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the code below speaks for itself, but please remove TODO and add a comment if you think it needs description.

//updateActions.foreach(update =>
// assert(update.assignments.length == targetTableSchema.length,
// s"The number of update assignments[${update.assignments.length}] must equal to the " +
// s"targetTable field size[${targetTableSchema.length}]"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the commented part?

}

def toStructType(attrs: Seq[Attribute]): StructType =
StructType(attrs.map(a => StructField(a.qualifiedName.replace('.', '_'), a.dataType, a.nullable, a.metadata)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to replace perios by underscores?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we combine the output of joining operation back into schema so that we can assert that the records we receive in ExpressionPayload still adhere to the expected schema. Here we need to replace dots, b/c

  • Names could be qualified (in case of nested structs; note that this schema is essentially a flattened one)
  • Field names can't have dots


// NOTE: We need to set "spark.testing" property to make sure Spark can appropriately
// recognize environment as testing
System.setProperty("spark.testing", "true")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! So is this property there for all spark versions and what are the benefits of setting this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can configure some of the configs that aren't configurable in prod env

*
* PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY NECESSARY
*/
object HoodieSpark2Analysis {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought MIT is supported in Spark 2.x. We have unit tests for MIT that run for Spark 2.x as well, isn't it? If it's not supported, should we make it clear in the quickstart guide as well? And should we add this support in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's supported.

The difference is that previously Hudi's bespoke MIT resolution logic was applied to all Spark versions not only Spark 2.x. Now,

  • For Spark 3.x we rely on Spark's own logic to resolve
  • For Spark 2.x we back-ported code from Spark 3.1.x

*
* Check out HUDI-4178 for more details
*/
case class HoodieDataSourceV2ToV1Fallback(sparkSession: SparkSession) extends Rule[LogicalPlan]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand why this is only needed for Spark 3.2.x?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

B/c we're using DSVv2 only in HoodieCatalog that's Spark >= 3.2

Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin Overall, looks good to me. Thanks for responding to. my queries. Please address some of the minor comments.
@xiarixiaoyao Could you also review this PR?

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codope codope merged commit 812950b into apache:master Feb 23, 2023
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Apr 5, 2023
…7871)

Cleans up considerable amount of Spark's (internal) resolution logic 
that has been copied over into Hudi components, while in reality 
there's no actual need for that -- instead we can rely on Spark itself 
to resolve most of these constructs and instead plugin our custom 
implementation (post-hoc) after resolution has been performed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:sql SQL interfaces engine:spark Spark integration priority:critical Production degraded; pipelines stalled

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

4 participants