-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: Watermark read options #9346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
Show resolved
Hide resolved
3afa06b to
e0cf6ef
Compare
e0cf6ef to
b8587c0
Compare
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
| ConfigOptions.key("table.exec.iceberg.split-assigner-type") | ||
| .enumType(SplitAssignerType.class) | ||
| .defaultValue(SplitAssignerType.SIMPLE) | ||
| .defaultValue(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intended? This would be a breaking change if a user did not provide a split assigner type before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch.
I changed that to null, as we had an assertion before:
if (splitAssignerFactory != null) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
}
however, after my work on this PR, it is safe to remove the above assertion, since we will override the factory with a OrderedSplitAssignerFactory when the watermarkColumn is specified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. It would be good to update the docs to reveal how the default is determined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users have not opt-ed into the new feature, I would expect their SQL queries to still work. Can you add tests to ensure that?
After some thinking, I'm totally not sure about the default change. Couldn't it be possible to use the watermark column, without the ordered split assigner factory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not according to the code.
if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
context = context.copyWithColumnStat(watermarkColumn);
SplitWatermarkExtractor watermarkExtractor =
new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit);
emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
splitAssignerFactory =
new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @pvary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the splits are not ordered, then we will have fluctuating watermarks. We do not emit those, which are not in order, but beats the purpose of the whole watermark generation feature.
Imagine a situation where we reading time series data, and read the latest file first. Every other file will contain late data in this case, and might be dropped.
So while technically possible, I rather not allow the users to shoot themselves in the foot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users have not opt-ed into the new feature, I would expect their SQL queries to still work. Can you add tests to ensure that?
Hi! I didn't get your suggestion about the unit test. Would you please rephrase? Thanks @mas-chen
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
9c73d9e to
0eba14a
Compare
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
0eba14a to
f8049f5
Compare
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
Outdated
Show resolved
Hide resolved
6583717 to
344325e
Compare
ba4204b to
5559cda
Compare
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
4b8c558 to
721ece6
Compare
83144b9 to
185171f
Compare
| recordsDataFile1.add(file1Record1); | ||
| recordsDataFile1.add(file1Record2); | ||
| DataFile dataFile1 = helper.writeFile(recordsDataFile1); | ||
| // File 2 - old timestamps, old longs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not correct:
// File 2 - late timestamps, old longs
| recordsDataFile2.add(file2Record1); | ||
| recordsDataFile2.add(file2Record2); | ||
|
|
||
| // early1 - early2 -- late1 late 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not get this comment.
Maybe something like this - feel free to reword if you feel so:
// Expected records if the splits are ordered
// - ascending (watermark from t1) - records from the split with early timestamps then records from the split with late timestamps
// - descending (watermark from t2) - records from the split with old longs then records from the split with new longs
pvary
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments for comments 😄
Otherwise +1 LGTM
185171f to
6c60929
Compare
|
Thanks @rodmeneses for the PR, and @stevenzwu and @mas-chen for the review! |
Now it is possible to pass the above parameters in the SQL statement: