-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: refactor DataIterator to use composition (instead of inheritance) #2905
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@openinx @JingsongLi @rdblue can you help take a look? |
flink/src/main/java/org/apache/iceberg/flink/source/DecryptedInputFiles.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * Read a {@link FileScanTask} into a {@link CloseableIterator} | ||
| */ | ||
| public interface IteratorReader<T> extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name IteratorReader doesn't make sense to me because iterators aren't generally read. They are iterated through. It also seems strange to me to have a method that is essentially a way to create an iterator but is not a CloseableIterable. Is there a way to restructure this so that this is a CloseableIterable instead?
Last, why does this need to be Serializable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about FileReader as class name? Its purpose is to open/read a FileScanTask as a CloseableIterator or CloseableIterable.
It needs to be Serializable because it gets shipped/serialized from jobmanager to taksmanager during deployment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed it to FileReader
flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
Outdated
Show resolved
Hide resolved
9ddaa6b to
d96c737
Compare
core/src/main/java/org/apache/iceberg/encryption/InputFilesDecryptor.java
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/InputFilesDecryptor.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/InputFilesDecryptor.java
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/FileReader.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/FileReader.java
Outdated
Show resolved
Hide resolved
| this.io = io; | ||
| this.encryption = encryption; | ||
| this.context = context; | ||
| this.rowDataReader = new RowDataFileReader(tableSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there necessary to expose the RowDataFileReader to the FlinkInputFormat ? The calling chain is actually: FlinkInputFormat ( whole scan) -> DataIterator (CombinedScanTask reader) -> RowDataFileReader ( FileScanTask reader ). I mean the FlinkInputFormat won't call the FileScanTask reader directly, so I'd prefer to hidden the internal details about how to construct the RowDataFileReader inside DataIterator. This is more in line with the interface design of software design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataIterator is a generic type. The actual FileScanTaskReader needs to be provided when constructing DataIterator. Previously, FlinkInputFormat constructs the extended class RowDataIterator, which is removed in the PR. Now FlinkInputFormat constructs the RowDataFileScanTaskReader and pass it into DataIterator constructor for composition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean there will be other data type applied to the generic DataIterator ( For example for flip-24 ) ? There seems no other FileScanTaskReader implementation except the RowDataFileScanTaskReader. I think we may don't want to the generic type for the DataIterator, then we also don't need the extra FileScanTaskReader (we could just rename the RowDataFileScanTaskReader to FileScanTaskReader directly), and finally we construct the FileScanTaskReader inside the DataIterator. How do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is correct that in iceberg-flink, there is no other impl of FileScanTaskReader other than RowDataFileScanTaskReader. But at Netflix and Apple, we use Avro schema for the Kafka data and deserialize bytes into Avro Record. For Iceberg source (like for backfill or bootstrap), we also want to deserialize Iceberg data into Avro Record so that app code deals with the same data type no matter consuming from Kafka or Iceberg. Hence we will plug in AvroGenericRecordFileScanTaskReader. That is one of the motivation of this refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That make sense for providing an abstracted FIleScanTaskReader in the offical apache iceberg repo, what concerns me is: we may get this interface refactored or removed when other developers propose a new pull request , if we don't have other impl in apache repo. Will we have other impl for the flip-27 work in offical repo ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx I extracted AbstractFileScanTaskReader as you suggested.
In flip-27 source in apache iceberg repo, we won't have other FileScanTaskReader impl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I think I did not describe this more clear. I mean the newly introduced FileScanTaskReader interface is nice to have in apache repo, so that others could provides their own implementation, such as AvroGenericRecordFileScanTaskReader. I don't think the latest commit's AbstractFileScanTaskReader will help to share common code between the AvroGenericRecordFileScanTaskReader & RowDataFileScanTaskReader because its exposed classes such as DeleteFilter, newAvroIterator, InputFilesDecryptor are not stable enough, we may change/refactor them in the following PRs (The DeleteFilter will need a big refactor for supporting unified v2 compaction).
Let's just revert the commit 72348c7, all the other things look great to me now, I plan to merge this PR. Thanks for the great work @stevenzwu !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx sorry for the misunderstanding. I have reverted the last commit by hard reset and force push
flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileReader.java
Outdated
Show resolved
Hide resolved
…core module also removed an unnecessary whitespace change
7531913 to
7db0ec9
Compare
core/src/main/java/org/apache/iceberg/encryption/InputFilesDecryptor.java
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
Show resolved
Hide resolved
|
|
||
| @Override | ||
| protected CloseableIterator<RowData> openTaskIterator(FileScanTask task) { | ||
| public CloseableIterator<RowData> open(FileScanTask task, InputFilesDecryptor inputFilesDecryptor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we construct the RowDataFileScanTaskReader inside the DataIterator, then we don't need to pass the InputFilesDecryptor for every open FileScanTask method, we also don't need to pass the InputFilesDecryptor to the newXXXIterable methods (I mean those changes could be reverted) because they could just use the class' private InputFilesDecryptor instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also tied to the other comment where we need DataIterator to be generic.
What you described above is similar to the current status, where we have RowDataIterator extends from DataIterator. Due to inheritance, RowDataIterator can call protected methods/variables from base class.
If we switch to composition model, we need to pass in InputFilesDecryptor to the RowDataFileScanTaskReader
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @stevenzwu for the refactoring, this PR almost looks good to me. Just left several comments.
72348c7 to
7db0ec9
Compare
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
With this composition mode,
DataIteratordeals withCombinedScanTaskandIteratorReaderdeals with individualFileScanTask.DataIteratoruse composition to referenceIteratorReaderHere are the motivations behind this change
FooDataIterator extends RowDataIteratoras the generic type is already fixed toRowData. With this new composition model, we can define aFooIteratorReader implements IteratorReaderandFooIteratorReadercan useRowDataIteratorReaderby composition.