Skip to content

Commit 529adf3

Browse files
szehon-hodongjoon-hyun
authored andcommitted
[SPARK-54525][SQL] Disable nested struct coercion in MERGE INTO under a config
### What changes were proposed in this pull request? #52225 allow MERGE INTO to support case where assignment value is a struct with less fields than the assignment key, ie UPDATE SET big_struct = source.small_struct. This makes this feature off by default, and turned on via a config. ### Why are the changes needed? The change brought some interesting question, for example there is some ambiguity in user intent. Does the UPDATE SET * mean set all nested fields or top level columns? In the first case, missing fields are kept. In the second case, missing fields are nullified. I tried to make a choice in #53149 but after some feedback, it may be a bit controversial, choosing one interpretation over another. A SQLConf may not be the right choice, and instead we may need to introduce some new syntax, which require more discussion. ### Does this PR introduce _any_ user-facing change? No this feature is unreleased ### How was this patch tested? Existing unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #53229 from szehon-ho/disable_merge_update_source_coercion. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 23d9253) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent e3ae7a0 commit 529adf3

File tree

3 files changed

+1379
-1256
lines changed

3 files changed

+1379
-1256
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala

Lines changed: 6 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import scala.collection.mutable
2121

2222
import org.apache.spark.sql.catalyst.SQLConfHelper
2323
import org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode.{NONE, RECURSE}
24-
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal}
25-
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, Expression, GetStructField, IsNull, Literal}
2625
import org.apache.spark.sql.catalyst.plans.logical.Assignment
2726
import org.apache.spark.sql.catalyst.types.DataTypeUtils
2827
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -182,31 +181,11 @@ object AssignmentUtils extends SQLConfHelper with CastSupport {
182181
} else if (exactAssignments.isEmpty && fieldAssignments.isEmpty) {
183182
TableOutputResolver.checkNullability(colExpr, col, conf, colPath)
184183
} else if (exactAssignments.nonEmpty) {
185-
if (updateStar) {
186-
val value = exactAssignments.head.value
187-
col.dataType match {
188-
case structType: StructType =>
189-
// Expand assignments to leaf fields
190-
val structAssignment =
191-
applyNestedFieldAssignments(col, colExpr, value, addError, colPath,
192-
coerceNestedTypes)
193-
194-
// Wrap with null check for missing source fields
195-
fixNullExpansion(col, value, structType, structAssignment,
196-
colPath, addError)
197-
case _ =>
198-
// For non-struct types, resolve directly
199-
val coerceMode = if (coerceNestedTypes) RECURSE else NONE
200-
TableOutputResolver.resolveUpdate("", value, col, conf, addError, colPath,
201-
coerceMode)
202-
}
203-
} else {
204-
val value = exactAssignments.head.value
205-
val coerceMode = if (coerceNestedTypes) RECURSE else NONE
206-
val resolvedValue = TableOutputResolver.resolveUpdate("", value, col, conf, addError,
207-
colPath, coerceMode)
208-
resolvedValue
209-
}
184+
val value = exactAssignments.head.value
185+
val coerceMode = if (coerceNestedTypes) RECURSE else NONE
186+
val resolvedValue = TableOutputResolver.resolveUpdate("", value, col, conf, addError,
187+
colPath, coerceMode)
188+
resolvedValue
210189
} else {
211190
applyFieldAssignments(col, colExpr, fieldAssignments, addError, colPath, coerceNestedTypes)
212191
}
@@ -240,63 +219,6 @@ object AssignmentUtils extends SQLConfHelper with CastSupport {
240219
}
241220
}
242221

243-
private def applyNestedFieldAssignments(
244-
col: Attribute,
245-
colExpr: Expression,
246-
value: Expression,
247-
addError: String => Unit,
248-
colPath: Seq[String],
249-
coerceNestedTyptes: Boolean): Expression = {
250-
251-
col.dataType match {
252-
case structType: StructType =>
253-
val fieldAttrs = DataTypeUtils.toAttributes(structType)
254-
255-
val updatedFieldExprs = fieldAttrs.zipWithIndex.map { case (fieldAttr, ordinal) =>
256-
val fieldPath = colPath :+ fieldAttr.name
257-
val targetFieldExpr = GetStructField(colExpr, ordinal, Some(fieldAttr.name))
258-
259-
// Try to find a corresponding field in the source value by name
260-
val sourceFieldValue: Expression = value.dataType match {
261-
case valueStructType: StructType =>
262-
valueStructType.fields.find(f => conf.resolver(f.name, fieldAttr.name)) match {
263-
case Some(matchingField) =>
264-
// Found matching field in source, extract it
265-
val fieldIndex = valueStructType.fieldIndex(matchingField.name)
266-
GetStructField(value, fieldIndex, Some(matchingField.name))
267-
case None =>
268-
// Field doesn't exist in source, use target's current value with null check
269-
TableOutputResolver.checkNullability(targetFieldExpr, fieldAttr, conf, fieldPath)
270-
}
271-
case _ =>
272-
// Value is not a struct, cannot extract field
273-
addError(s"Cannot assign non-struct value to struct field '${fieldPath.quoted}'")
274-
Literal(null, fieldAttr.dataType)
275-
}
276-
277-
// Recurse or resolve based on field type
278-
fieldAttr.dataType match {
279-
case nestedStructType: StructType =>
280-
// Field is a struct, recurse
281-
applyNestedFieldAssignments(fieldAttr, targetFieldExpr, sourceFieldValue,
282-
addError, fieldPath, coerceNestedTyptes)
283-
case _ =>
284-
// Field is not a struct, resolve with TableOutputResolver
285-
val coerceMode = if (coerceNestedTyptes) RECURSE else NONE
286-
TableOutputResolver.resolveUpdate("", sourceFieldValue, fieldAttr, conf, addError,
287-
fieldPath, coerceMode)
288-
}
289-
}
290-
toNamedStruct(structType, updatedFieldExprs)
291-
292-
case otherType =>
293-
addError(
294-
"Updating nested fields is only supported for StructType but " +
295-
s"'${colPath.quoted}' is of type $otherType")
296-
colExpr
297-
}
298-
}
299-
300222
private def toNamedStruct(structType: StructType, fieldExprs: Seq[Expression]): Expression = {
301223
val namedStructExprs = structType.fields.zip(fieldExprs).flatMap { case (field, expr) =>
302224
Seq(Literal(field.name), expr)
@@ -350,55 +272,6 @@ object AssignmentUtils extends SQLConfHelper with CastSupport {
350272
IsNull(currentExpr)
351273
}
352274

353-
/**
354-
* As UPDATE SET * can assign struct fields individually (preserving existing fields),
355-
* this will lead to null expansion, ie, a struct is created where all fields are null.
356-
* Wraps a struct assignment with null checks for the source and missing source fields.
357-
* Return null if all are null.
358-
*
359-
* @param col the target column attribute
360-
* @param value the source value expression
361-
* @param structType the target struct type
362-
* @param structAssignment the struct assignment result to wrap
363-
* @param colPath the column path for error reporting
364-
* @param addError error reporting function
365-
* @return the wrapped expression with null checks
366-
*/
367-
private def fixNullExpansion(
368-
col: Attribute,
369-
value: Expression,
370-
structType: StructType,
371-
structAssignment: Expression,
372-
colPath: Seq[String],
373-
addError: String => Unit): Expression = {
374-
// As StoreAssignmentPolicy.LEGACY is not allowed in DSv2, always add null check for
375-
// non-nullable column
376-
if (!col.nullable) {
377-
AssertNotNull(value)
378-
} else {
379-
// Check if source struct is null
380-
val valueIsNull = IsNull(value)
381-
382-
// Check if missing source paths (paths in target but not in source) are not null
383-
// These will be null for the case of UPDATE SET * and
384-
val missingSourcePaths = getMissingSourcePaths(structType, value.dataType, colPath, addError)
385-
val condition = if (missingSourcePaths.nonEmpty) {
386-
// Check if all target attributes at missing source paths are null
387-
val missingFieldNullChecks = missingSourcePaths.map { path =>
388-
createNullCheckForFieldPath(col, path)
389-
}
390-
// Combine all null checks with AND
391-
val allMissingFieldsNull = missingFieldNullChecks.reduce[Expression]((a, b) => And(a, b))
392-
And(valueIsNull, allMissingFieldsNull)
393-
} else {
394-
valueIsNull
395-
}
396-
397-
// Return: If (condition) THEN NULL ELSE structAssignment
398-
If(condition, Literal(null, structAssignment.dataType), structAssignment)
399-
}
400-
}
401-
402275
/**
403276
* Checks whether assignments are aligned and compatible with table columns.
404277
*

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6615,10 +6615,11 @@ object SQLConf {
66156615
buildConf("spark.sql.merge.nested.type.coercion.enabled")
66166616
.internal()
66176617
.doc("If enabled, allow MERGE INTO to coerce source nested types if they have less" +
6618-
"nested fields than the target table's nested types.")
6618+
"nested fields than the target table's nested types. This is experimental and" +
6619+
"the semantics may change.")
66196620
.version("4.1.0")
66206621
.booleanConf
6621-
.createWithDefault(true)
6622+
.createWithDefault(false)
66226623

66236624
/**
66246625
* Holds information about keys that have been deprecated.

0 commit comments

Comments
 (0)