Skip to content

Conversation

@kbendick
Copy link
Owner

No description provided.

@hililiwei
Copy link

Yes, there is no dedicated Upsert type in flink. When the row is transferred for deletion, I think we should prune the row first and retain only the key field. But it's not doing that at the moment.

@hililiwei
Copy link

hililiwei commented Mar 16, 2022

this.structProjection = StructProjection.create(schema, deleteSchema);
   

 public void delete(T row) throws IOException {
      if (!internalPosDelete(structProjection.wrap(asStructLike(row)))) {
        eqDeleteWriter.write(row);
      }
  }

 public void deleteKey(T key) throws IOException {
     if (!internalPosDelete(asStructLike(key))) {
       eqDeleteWriter.write(key);
     }
  }

In our test case, only id province should be paased to deleteKey. It seems that we should use structProjection to trim row.

@hililiwei
Copy link

I found that it seemed that the eqDeleteRowSchema passed in was wrong.

@hililiwei
Copy link

By passing in the correct eqDeleteRowSchema, our test cases are now running properly. After you approve my PR, let's see if I can pass the CI.

@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch from d17483f to 93b8487 Compare March 16, 2022 16:05
@kbendick kbendick marked this pull request as ready for review March 16, 2022 16:06
@kbendick
Copy link
Owner Author

Closing and reopening for tests.

@kbendick kbendick closed this Mar 16, 2022
@kbendick kbendick reopened this Mar 16, 2022
Comment on lines 78 to 79
// TODO provide the ability to customize the equality-delete row schema.
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the present fix to the current issue.

@kbendick kbendick changed the title Fix flink upsert delta file writer - Draft to show Fix flink upsert delta file writer writing equality delete files Mar 16, 2022
@kbendick kbendick changed the title Fix flink upsert delta file writer writing equality delete files Fix flink upsert delta file writer writing equality delete files with incorrect row metrics Mar 16, 2022
@kbendick kbendick changed the title Fix flink upsert delta file writer writing equality delete files with incorrect row metrics Flink - Fix delta file writer writing equality delete files with incorrect row metrics for equality deletes when using upsert Mar 16, 2022
@kbendick
Copy link
Owner Author

kbendick commented Mar 16, 2022

This fixes the test cases presented, but causes a number of tests to fail. However, all of those tests are upsert or changelog based tests where the upsert happens on the data key, which is the table's partition key (when partitioned) and also the table's only equality field id.

The tests that fail are all testChangeLogOnDataKey and rtestUpsertOnDataKey. And in the test source, the data key is a special key that's part of the table's configuration.

Of note, tests where we upsert on data and some other field still pass.

So we might need to only use the update delete projection when the equality ids are not the same as the partition ids and update the delete schema accordingly. Or we possibly need to update the test harness source, given that it has special handling for this particular key.

I'd also like to add tests for partition evolution, but this should be addressed first.

> Task :iceberg-flink:iceberg-flink-1.14:test

org.apache.iceberg.flink.TestChangeLogTable > testChangeLogOnDataKey[PartitionedTable=true] FAILED
    java.lang.AssertionError: Should have the expected records for the checkpoint#1 expected:<[Record(1, aaa), Record(1, bbb), Record(1, ccc)]> but was:<[Record(1, aaa), Record(2, aaa, 1, false), Record(1, bbb, 1, false), Record(1, ccc)]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.apache.iceberg.flink.TestChangeLogTable.testSqlChangeLog(TestChangeLogTable.java:285)
        at org.apache.iceberg.flink.TestChangeLogTable.testChangeLogOnDataKey(TestChangeLogTable.java:167)

org.apache.iceberg.flink.TestChangeLogTable > testChangeLogOnDataKey[PartitionedTable=false] FAILED
    java.lang.AssertionError: Should have the expected records for the checkpoint#1 expected:<[Record(1, aaa), Record(1, bbb), Record(1, ccc)]> but was:<[Record(1, aaa), Record(2, aaa, 3, false), Record(1, bbb, 2, false), Record(1, ccc)]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.apache.iceberg.flink.TestChangeLogTable.testSqlChangeLog(TestChangeLogTable.java:285)
        at org.apache.iceberg.flink.TestChangeLogTable.testChangeLogOnDataKey(TestChangeLogTable.java:167)

@hililiwei
Copy link

apache#3834


/**
* Delete an eleemnt with an equality delete using the
* @param row
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description needed

case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
// Upserts come in modeled as INSERT. We need to be sure that
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's best to avoid personal pronouns in comments and docs. It isn't clear who "we" is and omitting it is typically much more direct. For example: "The columns that are not equality ID columns may have changed in the inserted row, so the delete file must be written using only the equality ID columns."

@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch from 91af385 to 735b811 Compare March 18, 2022 23:52
@kbendick kbendick changed the title Flink - Fix delta file writer writing equality delete files with incorrect row metrics for equality deletes when using upsert [WIP[ Flink - Fix incorrect row being written for delta files when using upsert mode Mar 18, 2022
@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch 7 times, most recently from f8b590f to 607ce7c Compare March 23, 2022 00:31
@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch 3 times, most recently from 5995ed6 to 3ce3286 Compare March 23, 2022 19:07
@kbendick kbendick force-pushed the fix-flink-upsert-delta-file-writer branch from f538167 to 089e7e0 Compare March 24, 2022 18:15
@github-actions github-actions bot added the ORC label Mar 24, 2022
@kbendick kbendick closed this Apr 13, 2022
@kbendick kbendick deleted the fix-flink-upsert-delta-file-writer branch April 13, 2022 16:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants