Skip to content

Conversation

@swapna267
Copy link
Contributor

Iceberg Source to support Source Watermark, so it can be used in Flink WINDOW functions. https://github.com/apache/flink/blob/release-1.18/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsSourceWatermark.java enables Flink to rely on the watermark strategy provided by the ScanTableSource itself.

CREATE TABLE table_wm (
      eventTS AS CAST(t1 AS TIMESTAMP(3)),
      WATERMARK FOR eventTS AS SOURCE_WATERMARK()
) WITH (
  'watermark-column'='t1'
) LIKE iceberg_catalog.db.table;

Reference:
#10219
#9346

Previous discussion in PR, #12116 . Split into separate PR for easy review.

@github-actions github-actions bot added the flink label Feb 7, 2025
}

protected static String toWithClause(Map<String, String> props) {
public static String toWithClause(Map<String, String> props) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change? Did this become a utility method which is shared between tests? Can we find a better place for it then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently being across Tests under org.apache.iceberg.flink. Wanted to use for test under org.apache.iceberg.flink.source. org.apache.iceberg.flink.source.SqlHelpers can be another option, where this can be done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

.isInstanceOf(NullPointerException.class)
.hasMessage("watermark-column needs to be configured to use source watermark.");
} finally {
SqlHelpers.sql(getStreamingTableEnv(), "DROP TABLE IF EXISTS %s", flinkTable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this try finally? Can we just do a cleanup in an after method? That would make the tests easier to read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink dynamic table is created only in specific tests in this class, so was handling here. But can do in after to improve readability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In TestSqlBase we have several tests where we create tables, and we expect that they are removed because of the @TempDir is removed. So we can simply just do the same here.

WDYT?

@pvary pvary changed the title support source watermark for flink sql windows Flink: Support source watermark for flink sql windows Feb 8, 2025
@pvary
Copy link
Contributor

pvary commented Feb 8, 2025

Nit: Renamed the PR to match the general patterns

@swapna267 swapna267 force-pushed the flink_sql_window_wm branch from 8255c66 to 4f9ef05 Compare March 10, 2025 17:18

@Override
public void applySourceWatermark() {
if (!readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not using Preconditions for the check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought it would be better to throw UnsupportedOperation Exception instead of IllegalArgument or IllegalStateException. If either of them is fine with current error message , I can move to Preconditions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to move to Preconditions.checkArgument - we usually stick to that for this kind of checks

TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newline

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @swapna267! Changes look good to me.

@pvary pvary merged commit 2c746e6 into apache:main Mar 19, 2025
20 checks passed
@pvary
Copy link
Contributor

pvary commented Mar 19, 2025

Merged to main
Thanks for the PR @swapna267!
Thanks for the review @mxm and @stevenzwu!

@swapna267
Copy link
Contributor Author

Thanks all. Will submit backport PR soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants