Skip to content

Conversation

@huaxingao
Copy link
Contributor

What changes were proposed in this pull request?

fix nullable in Row Level Operation column

Why are the changes needed?

In iceberg/spark 4.0 integration, there are a few test failures because of nullable is not correctly computed.

TestMergeOnReadUpdate > testUpdateWithMultiColumnInSubquery() > catalogName = spark_catalog, implementation = org.apache.iceberg.spark.SparkSessionCatalog, config = {type=hive, default-namespace=default, clients=1, parquet-enabled=false, cache-enabled=false}, format = AVRO, vectorized = false, distributionMode = range, fanout = false, branch = test, planningMode = DISTRIBUTED, formatVersion = 3 FAILED
    java.lang.IllegalArgumentException: Provided metadata schema is incompatible with expected schema:
    table {
      2147483643: _spec_id: required int (Spec ID used to track the file containing a row)
      2147483642: _partition: optional struct<> (Partition to which a row belongs to)
    }
    Provided schema:
    table {
      2147483643: _spec_id: optional int
      2147483642: _partition: optional struct<>
    }
    Problems:
    * _spec_id should be required, but is optional
        at org.apache.iceberg.types.TypeUtil.checkSchemaCompatibility(TypeUtil.java:493)

Does this PR introduce any user-facing change?

no

How was this patch tested?

new test

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the SQL label Mar 12, 2025
dongjoon-hyun
dongjoon-hyun previously approved these changes Mar 13, 2025
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @huaxingao .

cc @aokolnychyi , @szehon-ho , @cloud-fan

val nullable = outputs.exists(output => output(colOrdinals(index)).nullable)
StructField(attr.name, attr.dataType, nullable, attr.metadata)
val schema = StructType(attrs.zipWithIndex.map { case (attr, _) =>
StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi I got quite some test failures in iceberg/spark 4.0 integration because the nullable don't match. If I change the code to use attr.nullable, those tests will pass. Does the above change look correct to you? Thanks!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also encountered the same issues when testing out an Iceberg v3 feature with the spark 4.0 integration. That said, I vaguely recall there was some reasoning for this nullability being derived from the output instead of the attribute itself. Let me see if I can dig through PRs and find that info. Maybe it no longer holds true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/iceberg/blob/main/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelIcebergCommand.scala#L111 in the older Spark 3.4 extension we had in Iceberg before plans were in Spark.

output attr is nullable if at least one output projection may produce null for that attr
but row ID and metadata attrs are projected only for update/delete records and
row attrs are projected only in insert/update records
that's why the projection schema must rely only on relevant outputs
instead of blindly inheriting the output attr nullability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amogh-jahagirdar for your comment! I took a closer look at why the test passed in Spark 3.4 extension, but failed with Spark 4.0.
In Spark 3.4 extension, when building the metadataProjection, we are using updateAndDeleteOutputs, which does not contain the INSERT_OPERATION
Screenshot 2025-03-31 at 2 06 21 PM
in which _spec_id has nullable false, and _partition has nullable true.

In Spark4.0, when building metadataProjection, we are using outputsWithMetadata, which contains REINSERT_OPERATION, so the outputs contains two rows
Screenshot 2025-03-31 at 2 32 31 PM
Since the second row has null for both _spec_id and _partition, the calculated nullable for both the metadata columns are true, which led the schema verification for MetadataSchema failed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try to make metadata attrs only be projected for update/delete records, the same behavior as Spark 3.4 extension, but I am not sure it's the correct fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi Do you have any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, let me take a closer look tomorrow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic was here on purpose. I have to validate whether our recent work on nullable metadata columns triggers this behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao, I think the Spark behavior here is correct but Iceberg would need to relax its check.

PR #49493 added a notion of reinsert to DeltaWriter to support row lineage. Iceberg leverages reinserts. Previously, Spark never passed metadata with reinsert and the metadata attributes preserved its nullability. This is no longer the case in 4.0. Spark now passes metadata with reinsert and the metadata attributes are actually nullified. Therefore, Spark seems to pass a correct schema info as metadata attributes are now nullable.

  /**
   * Reinserts a row with metadata.
   * <p>
   * This method handles the insert portion of updated rows split into deletes and inserts.
   *
   * @param metadata values for metadata columns
   * @param row a row to reinsert
   * @throws IOException if failure happens during disk/network IO like writing files
   *
   * @since 4.0.0
   */
  default void reinsert(T metadata, T row) throws IOException {
    insert(row);
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Row ID information will be part of metadata in reinsert.

@szehon-ho
Copy link
Member

Thanks, this makes sense to me, but I guess @aokolnychyi knows more

@aokolnychyi
Copy link
Contributor

Sorry about the delay. I will check tomorrow.

@huaxingao
Copy link
Contributor Author

@aokolnychyi Thanks for the explanation! I will close this PR and relax the check on Iceberg side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants