Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Jan 15, 2025

What changes were proposed in this pull request?

This PR introduces conditional nullification of metadata columns in DELETE, UPDATE, and MERGE operations. Previously, connectors could project metadata columns in a row-level operation, but the metadata values were always preserved and could not be nullified. After this change, connectors control which metadata columns preserved and when.

The new behavior is implemented via flags in metadataInJSON exposed for MetadataAttribute. This PR also extends the existing DataWriter and DeltaWriter interfaces.

Why are the changes needed?

These changes are essential to support row lineage in Iceberg and Delta Lake. Both projects define a row ID and a row version as part of their metadata concepts. The row ID is a synthetic metadata column that is null when a record is first inserted and becomes assigned through inheritance. Once assigned, the row ID must remain constant and unaltered. In contrast, the row version is updated with every modification and must be re-assigned. The existing implementation of DELETE, UPDATE, and MERGE operations in Spark doesn't support conditional metadata column nullification required to support row lineage.

Suppose there is a table containing the following rows:

 dep |   name    | salary | _row_lineage_id | _row_lineage_version |     _file      | _pos
-----+-----------+--------+-----------------+----------------------+----------------+------
 hr  | Alice     |  200   | 101             | v1                   | fileA.parquet  | 0
 hr  | Robert    |  240   | 102             | v1                   | fileA.parquet  | 1
 it  | Charlie   |  260   | 103             | v1                   | fileA.parquet  | 2
 it  | Bob       |  220   | 104             | v1                   | fileA.parquet  | 3

Then UPDATE t SET salary = salary + 10 WHERE dep = 'hr' should produce:

 operation | row_id (_file, _pos)        | row (dep, name, salary)  | metadata (_row_lineage_id, _row_lineage_version)
-----------+-----------------------------+--------------------------+-------------------------------------------------
 update    | (fileA.parquet, 0)          | (hr, Alice, 210)         | (101, null)
 update    | (fileA.parquet, 1)          | (hr, Robert, 250)        | (102, null)

Note that _row_lineage_id values are preserved but _row_lineage_version are nullified.

Does this PR introduce any user-facing change?

Yes, but the changes are backward compatible due to default values for added flags.

How was this patch tested?

This PR comes with unit tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Jan 15, 2025
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follows exactly what we have in Column.

@aokolnychyi
Copy link
Contributor Author

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we simply name the new write function in the base class as insert?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it may not always be a new record to insert in DataWriter. In group-based DELETE, UPDATE, and MERGE operations that replace entire files in Delta and Iceberg, certain records have to be copied over. That means those records aren't really inserts. Leaving the method name as write in DeltaWriter keeps its purpose fairly generic and allows us to use it beyond simple inserts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may change too. Trying an idea.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rebase this PR to the master branch, @aokolnychyi ? According to the CI failure, it seems to be affected by the bug which is fixed in the master branch already. I hope it doesn't hide other real bugs.


import org.apache.spark.sql.catalyst.ProjectingInternalRow

case class ReplaceDataProjections(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to WriteDeltaProjections we already have but specific to ReplaceData.

@cloud-fan
Copy link
Contributor

thanks, merging to master/4.0!

@cloud-fan cloud-fan closed this in 8313320 Jan 22, 2025
cloud-fan pushed a commit that referenced this pull request Jan 22, 2025
…s in DML

### What changes were proposed in this pull request?

This PR introduces conditional nullification of metadata columns in DELETE, UPDATE, and MERGE operations. Previously, connectors could project metadata columns in a row-level operation, but the metadata values were always preserved and could not be nullified. After this change, connectors control which metadata columns preserved and when.

The new behavior is implemented via flags in `metadataInJSON` exposed for `MetadataAttribute`. This PR also extends the existing `DataWriter` and `DeltaWriter` interfaces.

### Why are the changes needed?

These changes are essential to support row lineage in Iceberg and Delta Lake. Both projects define a row ID and a row version as part of their metadata concepts. The row ID is a synthetic metadata column that is null when a record is first inserted and becomes assigned through inheritance. Once assigned, the row ID must remain constant and unaltered. In contrast, the row version is updated with every modification and must be re-assigned. The existing implementation of DELETE, UPDATE, and MERGE operations in Spark doesn't support conditional metadata column nullification required to support row lineage.

Suppose there is a table containing the following rows:

```
 dep |   name    | salary | _row_lineage_id | _row_lineage_version |     _file      | _pos
-----+-----------+--------+-----------------+----------------------+----------------+------
 hr  | Alice     |  200   | 101             | v1                   | fileA.parquet  | 0
 hr  | Robert    |  240   | 102             | v1                   | fileA.parquet  | 1
 it  | Charlie   |  260   | 103             | v1                   | fileA.parquet  | 2
 it  | Bob       |  220   | 104             | v1                   | fileA.parquet  | 3
```

Then `UPDATE t SET salary = salary + 10 WHERE dep = 'hr'` should produce:

```
 operation | row_id (_file, _pos)        | row (dep, name, salary)  | metadata (_row_lineage_id, _row_lineage_version)
-----------+-----------------------------+--------------------------+-------------------------------------------------
 update    | (fileA.parquet, 0)          | (hr, Alice, 210)         | (101, null)
 update    | (fileA.parquet, 1)          | (hr, Robert, 250)        | (102, null)
```

Note that `_row_lineage_id` values are preserved but `_row_lineage_version` are nullified.

### Does this PR introduce _any_ user-facing change?

Yes, but the changes are backward compatible due to default values for added flags.

### How was this patch tested?

This PR comes with unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49493 from aokolnychyi/spark-50820.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8313320)
Signed-off-by: Wenchen Fan <[email protected]>
@aokolnychyi
Copy link
Contributor Author

Thank you, @cloud-fan @dongjoon-hyun!

colOrdinals: Seq[Int],
attrs: Seq[Attribute]): ProjectingInternalRow = {
val schema = StructType(attrs.zipWithIndex.map { case (attr, index) =>
val nullable = outputs.exists(output => output(colOrdinals(index)).nullable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi Do we only need this for metadata columns? For regular columns, shall we use attr.nullable instead?

zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 14, 2025
…s in DML

### What changes were proposed in this pull request?

This PR introduces conditional nullification of metadata columns in DELETE, UPDATE, and MERGE operations. Previously, connectors could project metadata columns in a row-level operation, but the metadata values were always preserved and could not be nullified. After this change, connectors control which metadata columns preserved and when.

The new behavior is implemented via flags in `metadataInJSON` exposed for `MetadataAttribute`. This PR also extends the existing `DataWriter` and `DeltaWriter` interfaces.

### Why are the changes needed?

These changes are essential to support row lineage in Iceberg and Delta Lake. Both projects define a row ID and a row version as part of their metadata concepts. The row ID is a synthetic metadata column that is null when a record is first inserted and becomes assigned through inheritance. Once assigned, the row ID must remain constant and unaltered. In contrast, the row version is updated with every modification and must be re-assigned. The existing implementation of DELETE, UPDATE, and MERGE operations in Spark doesn't support conditional metadata column nullification required to support row lineage.

Suppose there is a table containing the following rows:

```
 dep |   name    | salary | _row_lineage_id | _row_lineage_version |     _file      | _pos
-----+-----------+--------+-----------------+----------------------+----------------+------
 hr  | Alice     |  200   | 101             | v1                   | fileA.parquet  | 0
 hr  | Robert    |  240   | 102             | v1                   | fileA.parquet  | 1
 it  | Charlie   |  260   | 103             | v1                   | fileA.parquet  | 2
 it  | Bob       |  220   | 104             | v1                   | fileA.parquet  | 3
```

Then `UPDATE t SET salary = salary + 10 WHERE dep = 'hr'` should produce:

```
 operation | row_id (_file, _pos)        | row (dep, name, salary)  | metadata (_row_lineage_id, _row_lineage_version)
-----------+-----------------------------+--------------------------+-------------------------------------------------
 update    | (fileA.parquet, 0)          | (hr, Alice, 210)         | (101, null)
 update    | (fileA.parquet, 1)          | (hr, Robert, 250)        | (102, null)
```

Note that `_row_lineage_id` values are preserved but `_row_lineage_version` are nullified.

### Does this PR introduce _any_ user-facing change?

Yes, but the changes are backward compatible due to default values for added flags.

### How was this patch tested?

This PR comes with unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49493 from aokolnychyi/spark-50820.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit d27389e)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants