Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Fix schema evolution issue with nested struct (within a map) and column renamed #3886

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Richard-code-gig
Copy link

@Richard-code-gig Richard-code-gig commented Nov 16, 2024

This PR fixes an issue with schema evolution in Delta Lake where adding a new field to a struct within a map and renaming an existing top level field caused the operation to fail.

The fix includes logic to handle these transformations properly, ensuring that new fields are added without conflicts.

It also resolved a ToDo of casting map types in the DeltaAnalysis.scala module.

Changes:

  • Updated schema evolution logic to support complex map transformations.
  • Enabled schema evolution for both map keys, simple and nested values
  • Added additional case statements to handle MapTypes in addCastToColumn method in DeltaAnalysis.scala module.
  • Modified TypeWideningInsertSchemaEvolutionSuite test to support schema evolution of maps.
  • Added an additional method (addCastsToMaps) to DeltaAnalysis.scala module.
  • Changed argument type of addCastToColumn from attributes to namedExpression
  • Added EvolutionWithMap in the example modules to demonstrate use case.
  • Modified nested struct type evolution with field upcast test in map in TypeWideningInsertSchemaEvolutionSuite.scala
  • Added new tests cases for maps to DeltaInsertIntoTableSuite.scala

Related Issues:

Which Delta project/connector is this regarding?

  • [✓] Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

How was this patch tested?

Tested through:

Does this PR introduce any user-facing changes?

No, it doesn't introduce any user-facing changes. It only resolved an issue even in the released versions of Delta Lake.

The previous behaviour was an error message when attempting operations involving adding extra fields to StructField in maps:
[DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION] Cannot resolve "metrics" due to data type mismatch: cannot cast "MAP<STRING, STRUCT<id: INT, value: INT, comment: STRING>>" to "MAP<STRING, STRUCT<id: INT, value: INT>>".

Copy link
Collaborator

@johanl-db johanl-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Richard-code-gig for taking the time to work on this!

The code looks good, two main points I see to address before we can merge this:

  • Support casting map keys and not just map values. It could be surprising for users otherwise to see that one works but not the other.
  • Add a few more test cases, I provided some ideas.

nice to have but not required:

  • support deeply nested maps
    that's arguably a less common use case and I don't know if it's going to be straightforward or not so it's ok if we don't support it.

Also, ignore the Delta Spark Master job failure, it has been broken for quite some time and won't be a blocker to merge this change

@@ -69,6 +69,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove

@@ -930,6 +931,18 @@ class DeltaAnalysis(session: SparkSession)
// Keep the type from the query, the target schema will be updated to widen the existing
// type to match it.
attr
case (s: MapType, t: MapType) if s != t =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to handle casting for arbitrary nested maps/arrays by recursively calling addCastToColumn inside addCastsToMaps instead of addCastsToStructs.

We should also try and support casting for map keys and not just values.

Thinking of the following:

case (s: MapType, t: MapType) if s != t =>
  addCastsToMaps(tblName, attr, s, t, allowTypeWidening)
private def addCastsToMaps() = {
val transformedKeys =
  if (source.keyType != target.keyType) {
    ArrayTransform(MapKeys(parent), {
      val keyVar = NamedLambdaVariable("keyVar", sourceMapType.keyType, nullable = false)
      val castedKeys =
          addCastsToColumn(
            tableName,
            keyVar,
            source.keyType,
            target.keyType
            allowTypeWidening
          )
          LambdaFunction(castedKeys, Seq(keyVar))
  } else { 
    MapKeys(parent)
  }

  // ... Same for values

  // Create new map from keys and transformed values
  MapFromArrays(transformedKeys, transformedValues)
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @johanl-db,
Should we be calling addCastsToColumn here especially as addCastsToColumn takes an attribute type for both parent and targetAttr arguments instead of NamedLambdaVariable.

Looking forward to your thoughts.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe to update addCastsToColumn to accept NamedExpression instead of Attribute.
At least the current code doesn't seem to particularly care about getting an actual attribute, just an expression + a name that is used when throwing exceptions and as alias for the result of the cast.

Since the name can be surfaced in exceptions, we should probably follow the convention of naming the lambda variable element, key, value respectively as that's how array elements and map keys/values are referenced.

It would be good then to have a test that covers such exception, for example notEnoughColumnsInInsert by inserting data with a missing field in a struct nested in a map (using insertInto())

@@ -930,6 +931,18 @@ class DeltaAnalysis(session: SparkSession)
// Keep the type from the query, the target schema will be updated to widen the existing
// type to match it.
attr
case (s: MapType, t: MapType) if s != t =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the Map case right below the Array case, before the type widening one. Map/Array usually belong together

@@ -1124,6 +1140,48 @@ class DeltaAnalysis(session: SparkSession)
DeltaViewHelper.stripTempViewForMerge(plan, conf)
}

/**
* Recursively casts maps in case it contains null types.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Recursively casts map data types in case the key/value type differs"

I'm not entirely sure why the struct method mentions in case it contains null types but I don't think that's accurate

@@ -1049,6 +1062,7 @@ class DeltaAnalysis(session: SparkSession)
/**
* Recursively casts structs in case it contains null types.
* TODO: Support other complex types like MapType and ArrayType
* The case mapType that calls addCastsToMaps addresses the MapType todo
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just remove the TODO at this point. Both arrays and maps are now (partially) handled. We can call out their respective limitations on the dedicated methods if needed

Comment on lines 1084 to 1073
// We could also handle maptype within struct here but there is restriction
// on deep nexted operations that may result in maxIteration error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, do you have the stack trace for the error you're seeing?

It should be possible to handle arbitrary nesting of maps/arrays/structs. although that might require a pretty deep refactor of this code and a high risk of introducing regressions or breaking changes that would likely negate the benefits

@@ -297,8 +297,7 @@ trait TypeWideningInsertSchemaEvolutionTests
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))))
)

// The next two tests document inconsistencies when handling maps. Using SQL or INSERT by position
// doesn't allow type evolution but using dataframe INSERT by name does.
// maps now allow type evolution for INSERT by position and name in SQL and dataframe.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add more tests, for example in DeltaInsertIntoSQLSuite:

  • add a test covering the initial issue from [BUG][Spark] INSERT INTO struct evolution in map/arrays breaks when a column is renamed #3227 - essentially a version of the example you provide, except that I don't think we run examples as part of PR tests (I might be wrong)
  • add tests covering inserting data into a map type using key/values that have (1) a different type than in the table, (2) more column than in the table with schema evolution enabled/disabled
  • if supporting maps/arrays nested inside maps: add a test covering 2 level deep nested maps

.add("m", MapType(StringType, new StructType()
.add("x", IntegerType)
.add("y", ShortType)))),
.add("y", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))))),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can actually remove this test case altogether and update the one below to remove includeInserts so that it will cover all insert types.

@Richard-code-gig
Copy link
Author

Thanks for the insight @johanl-db.
I will try get the stack trace for maxIteration error too

@Richard-code-gig Richard-code-gig force-pushed the feature/schema-evolution-with-map-fix branch 2 times, most recently from d25adc2 to 6fad5cc Compare November 22, 2024 01:28
Copy link
Collaborator

@johanl-db johanl-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Richard-code-gig Thanks for addressing my previous comments, I have a couple follow-up ones.

I expect this should be the last iteration before we can merge the PR. I'll be on PTO next week though and won't have a chance to look again before 12/03.

If you address the remaining comments by then, I'll do a final pass and approve when I'm back and we can aim to merge the change then

@@ -930,6 +931,8 @@ class DeltaAnalysis(session: SparkSession)
// Keep the type from the query, the target schema will be updated to widen the existing
// type to match it.
attr
case (s: MapType, t: MapType) if s != t =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One remaining concern here:
I expect addCastsToMaps to be slower than getCastFunction since it uses a number of spark expression to extract and transform values whereas getCastFunction uses Cast that will directly convert values: https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L1083

I would add a condition to only use addCastsToMaps when needed, e.g.:

Suggested change
case (s: MapType, t: MapType) if s != t =>
case (s: MapType, t: MapType)
if !DataType.equalsStructurally(s, t, ignoreNullability = true) || allowTypeWidening =>

that way we cover both schema evolution - a struct somewhere contains an extra field - and type widening.

if type widening is disabled and there are missing/extra struct fields, then we use getCastFunction, which will work correctly and be faster. That also limits the blast radius if there's a bug in addCastsToMaps

"key", sourceMapType.keyType, nullable = false)

val targetKeyAttr = AttributeReference(
"targetKey", targetMapType.keyType, nullable = false)()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this should be key instead of targetKey. Same for targetValue below.

You should be able to use key.toAttribute btw, which would settle that discussion

Comment on lines 328 to 333
assert(intercept[AnalysisException] {
evolvedDf.write
.mode("append")
.format("delta")
.insertInto(tableName)
}.getMessage.contains("A schema mismatch detected when writing to the Delta table")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best practice for tests that expect an exception is to use checkError instead of directly checking the error message.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Johan,
Yea, I agree with you on the comments and suggestions. Hopefully it is ready now for merging when you're back.
Enjoy your holidays :)

…mn renaming

Resolved the issue described in [Bug delta-io#3227](delta-io#3227) where adding a field inside a struct (nested within a map) while renaming a top column caused the operation to fail.

The fix focuses on handling schema changes without affecting the integrity of existing data structures, specifically avoiding issues with nested fields within a map and renamed columns.

fix!:renamed the added DeltaWriteExample to EvolutionW
ithMap

fix!: Modified TypeWideningInsertSchemaEvolutionSuite to accommodate that schema evolution is now allowed for maps

Signed-off-by: Sola Richard Olorunfemi <[email protected]>

fix!: addCastToMap to handle complex types. Added tests to cover new abilities

fix: resolved scalaStyle error

fix: yet another scalaStyle issue

fix!:made some schema evolution for maps tests in DeltaInsertIntoTableSuite more flexible

fix: DeltaAnalysis
@Richard-code-gig Richard-code-gig force-pushed the feature/schema-evolution-with-map-fix branch from 0b8fa40 to b1ab9ef Compare November 23, 2024 22:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG][Spark] INSERT INTO struct evolution in map/arrays breaks when a column is renamed
2 participants