Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,8 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
outputs: Seq[Seq[Expression]],
colOrdinals: Seq[Int],
attrs: Seq[Attribute]): ProjectingInternalRow = {
val schema = StructType(attrs.zipWithIndex.map { case (attr, index) =>
val nullable = outputs.exists(output => output(colOrdinals(index)).nullable)
StructField(attr.name, attr.dataType, nullable, attr.metadata)
val schema = StructType(attrs.zipWithIndex.map { case (attr, _) =>
StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi I got quite some test failures in iceberg/spark 4.0 integration because the nullable don't match. If I change the code to use attr.nullable, those tests will pass. Does the above change look correct to you? Thanks!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also encountered the same issues when testing out an Iceberg v3 feature with the spark 4.0 integration. That said, I vaguely recall there was some reasoning for this nullability being derived from the output instead of the attribute itself. Let me see if I can dig through PRs and find that info. Maybe it no longer holds true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/iceberg/blob/main/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelIcebergCommand.scala#L111 in the older Spark 3.4 extension we had in Iceberg before plans were in Spark.

output attr is nullable if at least one output projection may produce null for that attr
but row ID and metadata attrs are projected only for update/delete records and
row attrs are projected only in insert/update records
that's why the projection schema must rely only on relevant outputs
instead of blindly inheriting the output attr nullability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amogh-jahagirdar for your comment! I took a closer look at why the test passed in Spark 3.4 extension, but failed with Spark 4.0.
In Spark 3.4 extension, when building the metadataProjection, we are using updateAndDeleteOutputs, which does not contain the INSERT_OPERATION
Screenshot 2025-03-31 at 2 06 21 PM
in which _spec_id has nullable false, and _partition has nullable true.

In Spark4.0, when building metadataProjection, we are using outputsWithMetadata, which contains REINSERT_OPERATION, so the outputs contains two rows
Screenshot 2025-03-31 at 2 32 31 PM
Since the second row has null for both _spec_id and _partition, the calculated nullable for both the metadata columns are true, which led the schema verification for MetadataSchema failed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try to make metadata attrs only be projected for update/delete records, the same behavior as Spark 3.4 extension, but I am not sure it's the correct fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi Do you have any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, let me take a closer look tomorrow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic was here on purpose. I have to validate whether our recent work on nullable metadata columns triggers this behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao, I think the Spark behavior here is correct but Iceberg would need to relax its check.

PR #49493 added a notion of reinsert to DeltaWriter to support row lineage. Iceberg leverages reinserts. Previously, Spark never passed metadata with reinsert and the metadata attributes preserved its nullability. This is no longer the case in 4.0. Spark now passes metadata with reinsert and the metadata attributes are actually nullified. Therefore, Spark seems to pass a correct schema info as metadata attributes are now nullable.

  /**
   * Reinserts a row with metadata.
   * <p>
   * This method handles the insert portion of updated rows split into deletes and inserts.
   *
   * @param metadata values for metadata columns
   * @param row a row to reinsert
   * @throws IOException if failure happens during disk/network IO like writing files
   *
   * @since 4.0.0
   */
  default void reinsert(T metadata, T row) throws IOException {
    insert(row);
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Row ID information will be part of metadata in reinsert.

})
ProjectingInternalRow(schema, colOrdinals)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,55 @@ class DeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite
insertWriteLogEntry(data = Row(6, 0, "new")))
}
}

test("SPARK-51479: Test Column Nullable") {
withTempView("source") {
createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
"""{ "pk": 1, "salary": 100, "dep": "hr" }
|{ "pk": 2, "salary": 200, "dep": "software" }
|{ "pk": 3, "salary": 300, "dep": "hr" }
|{ "pk": 4, "salary": 400, "dep": "hr" }
|{ "pk": 5, "salary": 500, "dep": "hr" }
|""".stripMargin)

val sourceDF = Seq(3, 4, 5, 6).toDF("pk")
sourceDF.createOrReplaceTempView("source")

sql(
s"""MERGE INTO $tableNameAsString t
|USING source s
|ON t.pk = s.pk
|WHEN MATCHED THEN
| UPDATE SET t.salary = 1000
|WHEN NOT MATCHED THEN
| INSERT (pk, salary, dep) VALUES (s.pk, 0, 'new')
|WHEN NOT MATCHED BY SOURCE AND pk = 1 THEN
| DELETE
|""".stripMargin)

checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
Row(2, 200, "software"), // unchanged
Row(3, 1000, "hr"), // update
Row(4, 1000, "hr"), // update
Row(5, 1000, "hr"), // update
Row(6, 0, "new"))) // insert

checkLastWriteInfo(
expectedRowSchema = table.schema,
expectedRowIdSchema = Some(StructType(Array(PK_FIELD))),
expectedMetadataSchema = Some(StructType(Array(PARTITION_FIELD, INDEX_FIELD_NULLABLE))))

checkLastWriteLog(
deleteWriteLogEntry(id = 1, metadata = Row("hr", null)),
deleteWriteLogEntry(id = 3, metadata = Row("hr", null)),
reinsertWriteLogEntry(metadata = Row("hr", null), data = Row(3, 1000, "hr")),
deleteWriteLogEntry(id = 4, metadata = Row("hr", null)),
reinsertWriteLogEntry(metadata = Row("hr", null), data = Row(4, 1000, "hr")),
deleteWriteLogEntry(id = 5, metadata = Row("hr", null)),
reinsertWriteLogEntry(metadata = Row("hr", null), data = Row(5, 1000, "hr")),
insertWriteLogEntry(data = Row(6, 0, "new")))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ class DeltaBasedUpdateAsDeleteAndInsertTableSuite extends DeltaBasedUpdateTableS
Row(1, -1, "hr") :: Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil)

checkLastWriteInfo(
expectedRowSchema = StructType(table.schema.map {
case attr if attr.name == "id" => attr.copy(nullable = false) // input is a constant
case attr => attr
}),
expectedRowSchema = table.schema,
expectedRowIdSchema = Some(StructType(Array(PK_FIELD))),
expectedMetadataSchema = Some(StructType(Array(PARTITION_FIELD, INDEX_FIELD_NULLABLE))))

Expand Down Expand Up @@ -82,10 +79,7 @@ class DeltaBasedUpdateAsDeleteAndInsertTableSuite extends DeltaBasedUpdateTableS
Row(1, -1, "hr") :: Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil)

checkLastWriteInfo(
expectedRowSchema = StructType(table.schema.map {
case attr if attr.name == "id" => attr.copy(nullable = false) // input is a constant
case attr => attr
}),
expectedRowSchema = table.schema,
expectedRowIdSchema = Some(StructType(Array(PK_FIELD))),
expectedMetadataSchema = Some(StructType(Array(PARTITION_FIELD, INDEX_FIELD_NULLABLE))))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ class DeltaBasedUpdateTableSuite extends DeltaBasedUpdateTableSuiteBase {
Row(1, -1, "hr") :: Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil)

checkLastWriteInfo(
expectedRowSchema = StructType(table.schema.map {
case attr if attr.name == "id" => attr.copy(nullable = false) // input is a constant
case attr => attr
}),
expectedRowSchema = table.schema,
expectedRowIdSchema = Some(StructType(Array(PK_FIELD))),
expectedMetadataSchema = Some(StructType(Array(PARTITION_FIELD, INDEX_FIELD_NULLABLE))))

Expand Down Expand Up @@ -80,10 +77,7 @@ class DeltaBasedUpdateTableSuite extends DeltaBasedUpdateTableSuiteBase {
Row(1, -1, "hr") :: Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil)

checkLastWriteInfo(
expectedRowSchema = StructType(table.schema.map {
case attr if attr.name == "id" => attr.copy(nullable = false) // input is a constant
case attr => attr
}),
expectedRowSchema = table.schema,
expectedRowIdSchema = Some(StructType(Array(PK_FIELD))),
expectedMetadataSchema = Some(StructType(Array(PARTITION_FIELD, INDEX_FIELD_NULLABLE))))

Expand Down