-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Refactor flink source tests for FLIP-27 unified source. #2047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@openinx I haven't switched |
| import org.junit.runners.Parameterized; | ||
|
|
||
| @RunWith(Parameterized.class) | ||
| public abstract class TestFlinkReaderDeletesBase extends DeleteReadTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new base class is extracted out of the TestFlinkInputFormatReaderDeletes. FLIP-27 source will add a test extending from this base class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, sounds good to me to make it to be a separate base unit test class.
|
|
||
| protected static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) | ||
| .identity("dt") | ||
| .bucket("id", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FLIP-27 source will have a test extending from this base class. Logic for the current source is refactored into the TestFlinkSource class
9b33ace to
225ddfc
Compare
83017d0 to
cdd5580
Compare
0b34805 to
df8fe18
Compare
|
@stevenzwu , You mean this PR will be the next one for reviewing which was separated from this big one (#2105) ? For my understanding, this is just an unit tests refactor and we could pull request the next feature PR for reviewing, could just just improve the unit tests when iterating the unified flink source work. |
df8fe18 to
f9fda07
Compare
|
@openinx yes, please help review this unit test refactoring next. The main reason is to avoid constant needs of resolving merge conflicts after periodical rebasing. Other FLIP-27 source code and tests are more isolated and less likely to get merge conflicts during the probably long review process. |
f9fda07 to
af32922
Compare
| * because the from RowData may contains additional column for position deletes. | ||
| * Using {@link RowDataSerializer#copy(RowData, RowData)} will fail the arity check. | ||
| */ | ||
| public static RowData clone(RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How the FLIP-27 use this method ? How did they construct their TypeSerializer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding, this clone methods is really not friendly for developers to use. If we really need to introduce a copy with checking the length, then how about making this to be private and expose an more easy method to public :
public static RowData clone(RowData from, RowType rowType)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reason for adding TypeSerializer to the RowDataUtil#clone() method is to avoid constructing it for each clone call. In the constructor of RowDataIteratorBulkFormat, we construct TypeSerializer once from RowType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK , seems we're clone the rowData iterately, I saw the FieldGetter will be created for each row here, that should also not be the expected behavior. We may need to introduce an new RowDataCloner which will initialize all its TypeSerializer & FieldGetter once in instance constructor, when iterating the RowData we will just clone row by row don't need to create any extra instances.
Users don't have to interact with the internal TypeSerializer, they could just use the RowDataCloner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could do the refactor when we review the RowDataIteratorBulkFormat.
| helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L)); | ||
|
|
||
| TestHelpers.assertRecords( | ||
| runWithOptions(ImmutableMap.<String, String>builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: here we could just use:
ImmutableMap.of("snapshot-id", Long.toString(snapshotId);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx. updated
| .build()), | ||
| expectedRecords, TestFixtures.SCHEMA); | ||
| TestHelpers.assertRecords( | ||
| runWithOptions(ImmutableMap.<String, String>builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx. updated
|
|
||
| protected abstract List<Row> run(FlinkSource.Builder formatBuilder, Map<String, String> sqlOptions, String sqlFilter, | ||
| String... sqlSelectedFields) throws IOException; | ||
| protected abstract List<Row> runWithProjection(String... projected) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the implementation for those four methods in FLIP-27 ? Looks like we are just filling options in TestFlinkSource, will the FLIP-27 have those different implementations ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main difference is how they call the private/protected run method.
Current source: uses FlinkSource#Builder for everything
https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java#L49
FLIP-27 source: just construct and pass long the ScanContext
https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java#L69
|
@stevenzwu PR looks good to me overall, just left few comments. Sorry for the delay ( a bit busy for our internal things), thanks for the great work ! |
af32922 to
b15a3ba
Compare
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM , got this merged. Thanks @stevenzwu for contributing
…n extend from