-
Notifications
You must be signed in to change notification settings - Fork 3k
[Core][Flink][Spark]: Refactor TaskWriter implementations
#4132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dungdm93
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's breaking change, but only effected if you have custom implementation.
Never the less, those 2 interface just introduce in 0.13, so the number of affected users can be negligible.
| * @return PathOffset of written row | ||
| */ | ||
| void write(T row); | ||
| PathOffset write(T row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a delete, it can have 2 rows in delete files. One for EqualityDelete to delete record in previous snapshot, and one for PositionDelete to delete record in current snapshot. So it's required to track PathOffset of all inserted records in current snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean each abstracted FileWriter will get a PathOffset back when append a newly row ? That does not make sense for me because not every writer need this PathOffset to do the following thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that not every writer need PathOffset, but it's required for writing delta data (like Flink's DeltaTaskWriter)
For the writer that does not need this, just ignore the return.
|
@dungdm93, I don't think I understand quite what you're trying to do in this PR from the description. Can you add some more detail about what your motivation is and what you're changing? It would probably also help to do this in several smaller PRs. |
1b8b040 to
f40f485
Compare
|
@rdblue sorry for my bad wording. Let's me try to add more details |
f40f485 to
0359202
Compare
|
Hello @rdblue, I just update the PR description, hope it's clear enough to understand. |
2977511 to
da95b4d
Compare
| public void write(T row) { | ||
| public PathOffset write(T row) { | ||
| appender.add(row); | ||
| return PathOffset.of(location, recordCount++); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Iceberg, we don't use the return value of ++ operators because it is hard to read code that uses them. Can you move the increment to a separate line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to:
long offset = recordCount++;
return PathOffset.of(location, offset);|
The description makes sense to me here, It will take me some time though to get through this whole PR, I'll try to set aside time later this week. |
da95b4d to
18bc270
Compare
| */ | ||
| @SuppressWarnings("unchecked") | ||
| public void partition(StructLike row) { | ||
| public PartitionKey partition(StructLike row) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm think this will produce an api compatibility issue, why do we need to change this basic API ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a side-change, I made it to align with other StructLike wrappers, StructProjection.wrap, IndexedStructLike.wrap, InternalRowWrapper.wrap,... to name a few.
@openinx Could you please explain how this can make an API compatibility issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The downstream users may add this iceberg-api module to their application project, since the PartitionKey is a public API, then their application artifact does include the void partition(StructLike row) . When they upgrade their iceberg-api to the next release version, then it will fail to load the expected void partition(StructLike row). That breaks a user's normal upgrade process and that's why we say it's an API compatibility issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Let's me roll it back.
@openinx could you help me review other changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm currently checking the whole write path. I think I will need one or two day to understand the whole newly introduced writers since 0.13.x. Replacing the old writer API with the new one is a great thing. I think we can collaborate to make this forward. Thanks
| * @return PathOffset of written row | ||
| */ | ||
| void write(T row); | ||
| PathOffset write(T row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean each abstracted FileWriter will get a PathOffset back when append a newly row ? That does not make sense for me because not every writer need this PathOffset to do the following thing.
06f2708 to
3ef9ea5
Compare
Signed-off-by: Đặng Minh Dũng <[email protected]>
3ef9ea5 to
86128e7
Compare
Signed-off-by: Đặng Minh Dũng <[email protected]>
86128e7 to
197b233
Compare
| import org.apache.iceberg.io.DefaultPartitioningWriterFactory.Type; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
|
||
| public interface PartitioningWriterFactory<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why name it as PartitioningWriterFactory ? I don't see any partition info for those defined interface methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's factory class used to create PartitioningWriters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just add a little docs for better understanding.
core/src/main/java/org/apache/iceberg/io/PartitioningWriterFactory.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkTaskWriter.java
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkTaskWriter.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Đặng Minh Dũng <[email protected]>
Signed-off-by: Đặng Minh Dũng <[email protected]>
Signed-off-by: Đặng Minh Dũng <[email protected]>
Signed-off-by: Đặng Minh Dũng <[email protected]>
Signed-off-by: Đặng Minh Dũng <[email protected]>
to use `DirectTaskWriter` and `FlinkTaskWriter` Signed-off-by: Đặng Minh Dũng <[email protected]>
Signed-off-by: Đặng Minh Dũng <[email protected]>
Signed-off-by: Đặng Minh Dũng <[email protected]>
197b233 to
40354bb
Compare
| import org.apache.iceberg.StructLike; | ||
| import org.apache.iceberg.util.Tasks; | ||
|
|
||
| public class DirectTaskWriter<T> implements TaskWriter<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx are you have any naming suggestion for this class, DirectTaskWriter, AppendTaskWriter,...?
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This PR aim to:
BaseTaskWriterinner classes (BaseRollingWriter,RollingFileWriter,RollingEqDeleteWriter)by the respective implementations of
RollingFileWriterinterface.TaskWriterthat can handle both partition & un-partition data (by delegate to thePartitioningWriter)Here is my approach, from top down:
TaskWriter | V PartitioningWriter | V RollingFileWriter | V FileWriter | V FileAppenderTaskWriteris used to handle diffent kinds of record.TaskWriterbasically callPartitioningWriter.write. SeeDirectTaskWriterfor more details.TaskWritercan have 3PartitioningWriters forinsertWriter,equalityDeleteWriterandpositionDeleteWriter. So, for each incomming record, base on its type (insert, update or delete), TaskWriter will call corresponding writer to write data. SeeFlinkTaskWriterfor more details.PartitioningWriteris used to write to multiple specs and partitions.Note that for unpartitioned tables,
partition = nullis passed toPartitioningWriter.writePartitioningWriteralready useRollingFileWriterfor rolling to new file if current file is large.RollingFileWriteris a just wrapper of otherFileWriter.FileWriteris used to write to single file.Why we need that
BaseTaskWriterinstance) that make the code more complex and hard to read and understand.ClusteredPartitionWriterbecause it take less resources.Result
With new 2 TaskWriters
DirectTaskWriterandFlinkTaskWriter, it can cover all Flink and Spark cases.Bellow is equal code of Flink's DeltaTaskWriter:
How do I test it
Pass all unit-tests and run with some sample datasets in my local machine.