Skip to content

Commit cf8c232

Browse files
Sola Richard OlorunfemiSola Richard Olorunfemi
Sola Richard Olorunfemi
authored and
Sola Richard Olorunfemi
committed
Fix schema evolution issue with nested struct (within a map) and column renaming
Resolved the issue described in [Bug delta-io#3227](delta-io#3227) where adding a field inside a struct (nested within a map) while renaming a top column caused the operation to fail. The fix focuses on handling schema changes without affecting the integrity of existing data structures, specifically avoiding issues with nested fields within a map and renamed columns. Signed-off-by: Sola Richard Olorunfemi <[email protected]> fix!:renamed the added DeltaWriteExample to EvolutionWithMap
1 parent 3a98b8a commit cf8c232

File tree

2 files changed

+156
-0
lines changed

2 files changed

+156
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package example
18+
19+
import org.apache.spark.sql.types._
20+
import org.apache.spark.sql.functions._
21+
import org.apache.spark.sql.Row
22+
import org.apache.spark.sql.SparkSession
23+
24+
object EvolutionWithMap {
25+
def main(args: Array[String]): Unit = {
26+
val spark = SparkSession.builder()
27+
.appName("EvolutionWithMap")
28+
.master("local[*]")
29+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
30+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
31+
.getOrCreate()
32+
33+
import spark.implicits._
34+
35+
val tableName = "insert_map_schema_evolution"
36+
37+
try {
38+
// Define initial schema
39+
val initialSchema = StructType(Seq(
40+
StructField("key", IntegerType, nullable = false),
41+
StructField("metrics", MapType(StringType, StructType(Seq(
42+
StructField("id", IntegerType, nullable = false),
43+
StructField("value", IntegerType, nullable = false)
44+
))))
45+
))
46+
47+
val data = Seq(
48+
Row(1, Map("event" -> Row(1, 1)))
49+
)
50+
51+
val rdd = spark.sparkContext.parallelize(data)
52+
53+
val initialDf = spark.createDataFrame(rdd, initialSchema)
54+
55+
initialDf.write
56+
.option("overwriteSchema", "true")
57+
.mode("overwrite")
58+
.format("delta")
59+
.saveAsTable(s"$tableName")
60+
61+
// Define the schema with simulteneous change in a StructField name
62+
// And additional field in a map column
63+
val evolvedSchema = StructType(Seq(
64+
StructField("renamed_key", IntegerType, nullable = false),
65+
StructField("metrics", MapType(StringType, StructType(Seq(
66+
StructField("id", IntegerType, nullable = false),
67+
StructField("value", IntegerType, nullable = false),
68+
StructField("comment", StringType, nullable = true)
69+
))))
70+
))
71+
72+
val evolvedData = Seq(
73+
Row(1, Map("event" -> Row(1, 1, "deprecated")))
74+
)
75+
76+
val evolvedRDD = spark.sparkContext.parallelize(evolvedData)
77+
78+
val modifiedDf = spark.createDataFrame(evolvedRDD, evolvedSchema)
79+
80+
// The below would fail without schema evolution for map types
81+
modifiedDf.write
82+
.mode("append")
83+
.option("mergeSchema", "true")
84+
.format("delta")
85+
.insertInto(s"$tableName")
86+
87+
spark.sql(s"SELECT * FROM $tableName").show(false)
88+
89+
} finally {
90+
91+
// Cleanup
92+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
93+
94+
spark.stop()
95+
}
96+
97+
}
98+
}

spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala

+58
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ import org.apache.spark.sql.internal.SQLConf
6969
import org.apache.spark.sql.types._
7070
import org.apache.spark.sql.util.CaseInsensitiveStringMap
7171

72+
7273
/**
7374
* Analysis rules for Delta. Currently, these rules enable schema enforcement / evolution with
7475
* INSERT INTO.
@@ -930,6 +931,18 @@ class DeltaAnalysis(session: SparkSession)
930931
// Keep the type from the query, the target schema will be updated to widen the existing
931932
// type to match it.
932933
attr
934+
case (s: MapType, t: MapType) if s != t =>
935+
// Handle only specific cases where the value type of the MapType is a StructType
936+
// This could be revisited and expanded in the future when needs for more
937+
// nested complex operations are desired vis-a-vis ALTER TABLE COLUMN operations
938+
// for deep nested fields
939+
(s.valueType, t.valueType) match {
940+
case (structS: StructType, structT: StructType) if structS != structT =>
941+
addCastsToMaps(tblName, attr, s, t, allowTypeWidening)
942+
case _ =>
943+
// Default for all other MapType cases
944+
getCastFunction(attr, targetAttr.dataType, targetAttr.name)
945+
}
933946
case _ =>
934947
getCastFunction(attr, targetAttr.dataType, targetAttr.name)
935948
}
@@ -1049,6 +1062,7 @@ class DeltaAnalysis(session: SparkSession)
10491062
/**
10501063
* Recursively casts structs in case it contains null types.
10511064
* TODO: Support other complex types like MapType and ArrayType
1065+
* The case mapType that calls addCastsToMaps addresses the MapType todo
10521066
*/
10531067
private def addCastsToStructs(
10541068
tableName: String,
@@ -1067,6 +1081,8 @@ class DeltaAnalysis(session: SparkSession)
10671081
val subField = Alias(GetStructField(parent, i, Option(name)), target(i).name)(
10681082
explicitMetadata = Option(metadata))
10691083
addCastsToStructs(tableName, subField, nested, t, allowTypeWidening)
1084+
// We could also handle maptype within struct here but there is restriction
1085+
// on deep nexted operations that may result in maxIteration error
10701086
case o =>
10711087
val field = parent.qualifiedName + "." + name
10721088
val targetName = parent.qualifiedName + "." + target(i).name
@@ -1124,6 +1140,48 @@ class DeltaAnalysis(session: SparkSession)
11241140
DeltaViewHelper.stripTempViewForMerge(plan, conf)
11251141
}
11261142

1143+
/**
1144+
* Recursively casts maps in case it contains null types.
1145+
*/
1146+
private def addCastsToMaps(
1147+
tableName: String,
1148+
parent: NamedExpression,
1149+
sourceMapType: MapType,
1150+
targetMapType: MapType,
1151+
allowTypeWidening: Boolean): Expression = {
1152+
// First get keys from the map
1153+
val keysExpr = MapKeys(parent)
1154+
1155+
// Create a transformation for the values
1156+
val transformLambdaFunc = {
1157+
val elementVar = NamedLambdaVariable(
1158+
"elementVar", sourceMapType.valueType, sourceMapType.valueContainsNull)
1159+
val castedValue = sourceMapType.valueType match {
1160+
case structType: StructType =>
1161+
// Handle StructType values
1162+
addCastsToStructs(
1163+
tableName,
1164+
elementVar,
1165+
structType,
1166+
targetMapType.valueType.asInstanceOf[StructType],
1167+
allowTypeWidening
1168+
)
1169+
case _ =>
1170+
// Not expected to get here: see addCastsToColumn
1171+
throw new IllegalArgumentException(
1172+
s"Target type must be a StructType")
1173+
}
1174+
1175+
LambdaFunction(castedValue, Seq(elementVar))
1176+
}
1177+
1178+
val transformedValues = ArrayTransform(
1179+
MapValues(parent), transformLambdaFunc)
1180+
1181+
// Create new map from keys and transformed values
1182+
MapFromArrays(keysExpr, transformedValues)
1183+
}
1184+
11271185
/**
11281186
* Verify the input plan for a SINGLE streaming query with the following:
11291187
* 1. Schema location must be under checkpoint location, if not lifted by flag

0 commit comments

Comments
 (0)