Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkFilters;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -53,7 +55,8 @@ public class IcebergTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsFilterPushDown,
SupportsLimitPushDown {
SupportsLimitPushDown,
SupportsSourceWatermark {

private int[] projectedFields;
private Long limit;
Expand Down Expand Up @@ -175,6 +178,17 @@ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
return Result.of(acceptedFilters, flinkFilters);
}

@Override
public void applySourceWatermark() {
Preconditions.checkArgument(
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE),
"Source watermarks are supported only in flip-27 iceberg source implementation");

Preconditions.checkNotNull(
properties.get(FlinkReadOptions.WATERMARK_COLUMN),
"watermark-column needs to be configured to use source watermark.");
}

@Override
public boolean supportsNestedProjection() {
// TODO: support nested projection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source;

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -40,6 +41,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
@BeforeEach
@Override
public void before() throws IOException {
TableEnvironment tableEnvironment = getTableEnv();
setUpTableEnv(getTableEnv());
setUpTableEnv(getStreamingTableEnv());
}

private static void setUpTableEnv(TableEnvironment tableEnvironment) {
Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
// Disable inferring parallelism to avoid interfering watermark tests
Expand All @@ -72,6 +78,11 @@ public void before() throws IOException {
tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
}

@AfterEach
public void after() throws IOException {
CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
}

private Record generateRecord(Instant t1, long t2) {
Record record = GenericRecord.create(SCHEMA_TS);
record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
Expand Down Expand Up @@ -178,4 +189,45 @@ public void testReadFlinkDynamicTable() throws Exception {
expected,
SCHEMA_TS);
}

@Test
public void testWatermarkInvalidConfig() {
CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);

String flinkTable = "`default_catalog`.`default_database`.flink_table";
SqlHelpers.sql(
getStreamingTableEnv(),
"CREATE TABLE %s "
+ "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE iceberg_catalog.`default`.%s",
flinkTable,
TestFixtures.TABLE);

assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable))
.isInstanceOf(NullPointerException.class)
.hasMessage("watermark-column needs to be configured to use source watermark.");
}

@Test
public void testWatermarkValidConfig() throws Exception {
List<Record> expected = generateExpectedRecords(true);

String flinkTable = "`default_catalog`.`default_database`.flink_table";

SqlHelpers.sql(
getStreamingTableEnv(),
"CREATE TABLE %s "
+ "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH ('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s",
flinkTable,
TestFixtures.TABLE);

TestHelpers.assertRecordsWithOrder(
SqlHelpers.sql(
getStreamingTableEnv(),
"SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS), INTERVAL '1' SECOND))",
flinkTable),
expected,
SCHEMA_TS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public abstract class TestSqlBase {

private volatile TableEnvironment tEnv;

private volatile TableEnvironment streamingTEnv;

protected TableEnvironment getTableEnv() {
if (tEnv == null) {
synchronized (this) {
Expand All @@ -75,6 +77,19 @@ protected TableEnvironment getTableEnv() {
return tEnv;
}

protected TableEnvironment getStreamingTableEnv() {
if (streamingTEnv == null) {
synchronized (this) {
if (streamingTEnv == null) {
this.streamingTEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newline


return streamingTEnv;
}

@BeforeEach
public abstract void before() throws IOException;

Expand Down