diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java index 65adce77d9f9..662dc30e27ca 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java @@ -35,12 +35,14 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark; import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.FlinkFilters; +import org.apache.iceberg.flink.FlinkReadOptions; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.source.assigner.SplitAssignerType; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -53,7 +55,8 @@ public class IcebergTableSource implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, - SupportsLimitPushDown { + SupportsLimitPushDown, + SupportsSourceWatermark { private int[] projectedFields; private Long limit; @@ -175,6 +178,17 @@ public Result applyFilters(List flinkFilters) { return Result.of(acceptedFilters, flinkFilters); } + @Override + public void applySourceWatermark() { + Preconditions.checkArgument( + readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE), + "Source watermarks are supported only in flip-27 iceberg source implementation"); + + Preconditions.checkNotNull( + properties.get(FlinkReadOptions.WATERMARK_COLUMN), + "watermark-column needs to be configured to use source watermark."); + } + @Override public boolean supportsNestedProjection() { // TODO: support nested projection diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index c8f0b8172d45..0cdaf8371cbd 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.source; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; import java.time.Instant; @@ -40,6 +41,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase { @BeforeEach @Override public void before() throws IOException { - TableEnvironment tableEnvironment = getTableEnv(); + setUpTableEnv(getTableEnv()); + setUpTableEnv(getStreamingTableEnv()); + } + + private static void setUpTableEnv(TableEnvironment tableEnvironment) { Configuration tableConf = tableEnvironment.getConfig().getConfiguration(); tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); // Disable inferring parallelism to avoid interfering watermark tests @@ -72,6 +78,11 @@ public void before() throws IOException { tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); } + @AfterEach + public void after() throws IOException { + CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + private Record generateRecord(Instant t1, long t2) { Record record = GenericRecord.create(SCHEMA_TS); record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); @@ -178,4 +189,45 @@ public void testReadFlinkDynamicTable() throws Exception { expected, SCHEMA_TS); } + + @Test + public void testWatermarkInvalidConfig() { + CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS); + + String flinkTable = "`default_catalog`.`default_database`.flink_table"; + SqlHelpers.sql( + getStreamingTableEnv(), + "CREATE TABLE %s " + + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE iceberg_catalog.`default`.%s", + flinkTable, + TestFixtures.TABLE); + + assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable)) + .isInstanceOf(NullPointerException.class) + .hasMessage("watermark-column needs to be configured to use source watermark."); + } + + @Test + public void testWatermarkValidConfig() throws Exception { + List expected = generateExpectedRecords(true); + + String flinkTable = "`default_catalog`.`default_database`.flink_table"; + + SqlHelpers.sql( + getStreamingTableEnv(), + "CREATE TABLE %s " + + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), " + + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH ('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s", + flinkTable, + TestFixtures.TABLE); + + TestHelpers.assertRecordsWithOrder( + SqlHelpers.sql( + getStreamingTableEnv(), + "SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS), INTERVAL '1' SECOND))", + flinkTable), + expected, + SCHEMA_TS); + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java index f9b776397cfc..dd63154fe03b 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java @@ -63,6 +63,8 @@ public abstract class TestSqlBase { private volatile TableEnvironment tEnv; + private volatile TableEnvironment streamingTEnv; + protected TableEnvironment getTableEnv() { if (tEnv == null) { synchronized (this) { @@ -75,6 +77,19 @@ protected TableEnvironment getTableEnv() { return tEnv; } + protected TableEnvironment getStreamingTableEnv() { + if (streamingTEnv == null) { + synchronized (this) { + if (streamingTEnv == null) { + this.streamingTEnv = + TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build()); + } + } + } + + return streamingTEnv; + } + @BeforeEach public abstract void before() throws IOException;