-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Fix flaky test TestFlinkTableSink #2989
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@stevenzwu @openinx can you guys help check my understanding? Thanks |
|
@szehon-ho I am not sure why this failed. maybe it is what you said. records were split across two checkpoint/commit cycles. However, this change will defeat the purpose of this unit test. with hash distribution, this unit test is to make sure all records for the same partition key is distributed to a single writer task and hence should generate one file per partition (per commit cycle). Here is probably what we should do to fix the flakiness. we should assert that there is only one file per partition for each commit/snapshot. |
|
Yea this makes sense, let me look into it. |
…errors (test_partition already exists)
|
@stevenzwu thanks for the review. I put back the one-file-per-partition, per commit/checkpoint. Also I added some test cleanup as well in the failure case. |
flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Outdated
Show resolved
Hide resolved
flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Outdated
Show resolved
Hide resolved
flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Outdated
Show resolved
Hide resolved
| List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String, Object> partitionValues) { | ||
| Types.StructType spec = partitionSpec.partitionType(); | ||
| Record partitionRecord = GenericRecord.create(spec).copy(partitionValues); | ||
| StructLikeWrapper expected = StructLikeWrapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PartitionData implements the equals method. we can construct PartitionData using this API from DataFiles class. not sure if it is better. but at least it is more specific.
public static PartitionData copy(PartitionSpec spec, StructLike partition) {
return copyPartitionData(spec, partition, null);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea, but when I tried I found PartitionData is package protected so can't access it here unfortunately.
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
Outdated
Show resolved
Hide resolved
|
@szehon-ho thx a lot for fixing the flaky tests. overall, it looks good to me. Left a few minor comments. |
dd93f7b to
3269024
Compare
|
@stevenzwu thanks again for taking a look, I updated based on the feedback |
stevenzwu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @openinx can you take a look too and merge it when it is good from your side.
| public static Map<Long, List<DataFile>> snapshotToDataFiles( | ||
| Table table) | ||
| throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The method format looks quite strange for me, could we format it looks like the following?
public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table) throws IOException {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| List<ManifestFile> manifestFiles = table.currentSnapshot().dataManifests(); | ||
| for (ManifestFile manifestFile : manifestFiles) { | ||
| try (ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) { | ||
| List<DataFile> dataFiles = IteratorUtils.toList(reader.iterator()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a warning in IDE here, using the Lists.newArrayList(reader) could fix the warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, done
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me ! Thanks @szehon-ho for the contribution, and @stevenzwu for the reviewing. I just left few minor comments.
|
@openinx done, thanks for taking a look! |
|
LGTM, thanks for the contribution! |
Uh oh!
There was an error while loading. Please reload this page.