Skip to content

Conversation

@Reo-LEI
Copy link
Contributor

@Reo-LEI Reo-LEI commented Jul 25, 2021

fork from #1996

author: @openinx

case INSERT:
if (upsert) {
writer.delete(row);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could only delete row on INSERT. I don't think there will be only have UPDATE_AFTER row and lost UPDATE_BEFORE situation. @openinx please check this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an update operation in flink, the UPDATE_AFTER event will must be emitted to the downstream, while the UPDATE_BEFORE is a best effort behavior or an configured behavior from the upstream flink source. You can take a look at this GroupAggFunction, if the flink source is configured to produce UPDATE_AFTER only, then it won't emit any UPDATE_BEFORE to the downstream.

For the downstream iceberg sink, we need to handle all the UPDATE_AFTER as UPSERT. That also means we need to do nothing for the UPDATE_BEFORE because we will remove the previous key in the next UPDATE_AFTER events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can only delete row on UPDATE_AFTER and keep UPDATE_BEFORE do nothing to prevent delete one row twice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By name, I thought INSERT is "add a new row". Then we don't need to add a delete for it. But I guess it actually means "append a row (new or updated)".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @openinx mentioned in #1996 (comment), we need to transform INSERT/UPDATE_AFTER to be UPSERT(delete + insert). If we don't add a delete on INSERT row when upsert mode is enable, we will get duplicate rows for same primary key.

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Jul 25, 2021

I pick #1996 up on this PR, could you take a look? @rdblue @openinx

@coolderli
Copy link
Contributor

@Reo-LEI any update?

@github-actions github-actions bot added the core label Jul 28, 2021
@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Jul 28, 2021

@Reo-LEI any update?

Yep, I will keep push this PR. But I only have time on night, so the progress will be slow.

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Jul 31, 2021

ping @openinx @rdblue

# Conflicts:
#	flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
#	flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
#	flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
#	flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
#	flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Aug 23, 2021

This PR have been open for some time, I think this feature is very importent for user, and just waiting someone to review and merge that. Has anybody to take a look of this? @rdblue @openinx @aokolnychyi @stevenzwu @kbendick

"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
Preconditions.checkState(!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
if (!table.spec().isUnpartitioned()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my own learning, does partition field must be included in equality fields?

e.g., we can have an equality field (like user_id) and table can be partitioned by hour. would that be a valid scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the

As @openinx comment as above, we shoule restrict the partition fields is a subset of equality fields to ensure we can delete the old data in same partition.

e.g., we can have an equality field (like user_id) and table can be partitioned by hour. would that be a valid scenario?

I think that is not a valid scenario, to keep user_id unique in all different hour parition is make no sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a table with user_id and hour, the business primary key is user_id, which mean the table should have at most one row for each given user_id. Now let's take about the partition strategy.

If we just partition the table by hour field, that means two different hour partitions may have the same user_id, because people may insert the user_id in hour=01 and hour=02. If we wanna to keep the primary key semantics, then we will need to delete the old user_id in the hour=01 first, then insert the new user_id in the hour=02. But when an INSERT come, we don't know which partition has the specific user_id, then we have to broadcast the DELETE to all the partitions, which is quite inefficient.

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Sep 5, 2021

I have fixed the unittest, could you retry the ci? :) @openinx

@Reo-LEI Reo-LEI requested a review from openinx September 5, 2021 06:13
@openinx
Copy link
Member

openinx commented Sep 8, 2021

Retry to run CI ...

@openinx openinx closed this Sep 8, 2021
@openinx openinx reopened this Sep 8, 2021
@openinx openinx changed the title Flink: Transform INSERT as one DELETE following one INSERT Flink: Add streaming upsert write option. Sep 8, 2021
@openinx
Copy link
Member

openinx commented Sep 8, 2021

LGTM if the travis CI says OK ! Thanks for @Reo-LEI for picking this up !

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions for my own understanding.

Overall this looks good to me, but I'll leave it for others until I have a chance to look more closely as Flink is an area I'm somewhat less specialized in. Thanks for working on this @Reo-LEI and @openinx!

Comment on lines +226 to +227
public static final String UPSERT_MODE_ENABLE = "write.upsert.enable";
public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions, one that's somewhat unrelated:

  1. Is this only used in streaming mode now? Or does this work with Flink batch sink as well?
  2. (Somewhat unrelated / thinking out loud) If we have this new write.upsert.enabled flag, could we possibly use it to add our own support for CDC on top of Spark Structured Streaming?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only used in streaming mode now? Or does this work with Flink batch sink as well?

Yes, it's only used for streaming mode right now. The batch upsert semantic has been implemented correctly by the MERGE INTO clause.

could we possibly use it to add our own support for CDC on top of Spark Structured Streaming?

In theory, it's possible to add the CDC support for spark sturctured streaming, though the spark structured streaming does not support CDC event natively (I mean flink support INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER events natively while Spark streaming doesn't unless we add extra field to indicate what's the operation type it is). I think @XuQianJin-Stars @chenjunjiedada 's team are working on this issue in their own repo.

Comment on lines +83 to +85
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking question:

Are there possible concerns with events coming out of order for some reason? I guess since the table commits are serializable, this isn't a concern as the same row for these equality fields shouldn't be updated twice in the same commit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question, @kbendick ! Let's describe the out-of-order in two dimension:

  1. Is possible to produce disordered events in a single iceberg transaction ? First of all, if we want to maintain the correct data semantics between the source table and iceberg sink table, the records consumed from source table must be the correct order. Second, the streaming job will need to shuffle based on the equality fields so that the records with same key are dispatched to the specialized parallelism task, otherwise the out-of-order issue happen if different tasks write the records with same equality fields to the iceberg table. In this way, the order in a single transaction is guaranteed.

  2. The out-of-order issue between two continues transaction. In our flink stream integration, we have guaranteed the exact commit order even if a failover happen. For the spark streaming, I think we will need more consideration to this issue.

Hopefully, I've answered your question, @kbendick :-)

Comment on lines +348 to +350
if (upsertMode) {
Preconditions.checkState(!overwrite,
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test that verifies the builder doesn't allow overwrite and upsert / upsertMode?

Maybe I missed it, but seems like an important thing to have a test for in case of future code refractors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed !

Copy link
Contributor Author

@Reo-LEI Reo-LEI Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a unittest to cover this, thanks for your reminder!:) @kbendick

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Sep 12, 2021

@openinx @stevenzwu @kbendick do you have another other concerns for this PR? I think this feature is very important for flink user, we should merge this PR as soon as possible. :)

@openinx
Copy link
Member

openinx commented Sep 13, 2021

@Reo-LEI , The PR looks good to me now, thanks for the patient contribution, thanks all for reviewing (@kbendick & @stevenzwu ) . I will get this merged once the travis CI says okay !

@openinx openinx closed this Sep 13, 2021
@openinx openinx reopened this Sep 13, 2021
@openinx
Copy link
Member

openinx commented Sep 13, 2021

Reopened PR to retry the CI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants