Skip to content

Commit 4d34398

Browse files
authored
Flink: Watermark read options (#9346)
1 parent 9bd5dec commit 4d34398

File tree

7 files changed

+217
-21
lines changed

7 files changed

+217
-21
lines changed

docs/flink-configuration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ The following properties can be set if using the REST catalog:
7575
| credential | | | A credential to exchange for a token in the OAuth2 client credentials flow. |
7676
| token | | | A token which will be used to interact with the server. |
7777

78-
7978
## Runtime configuration
8079

8180
### Read options
@@ -133,7 +132,8 @@ env.getConfig()
133132
| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
134133
| limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
135134
| max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3 | Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. |
136-
135+
| watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. If this option is present, the `splitAssignerFactory` will be overridden with `OrderedSplitAssignerFactory`. |
136+
| watermark-column-time-unit | connector.iceberg.watermark-column-time-unit | N/A | TimeUnit.MICROSECONDS | Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS. |
137137

138138
### Write options
139139

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.time.Duration;
2222
import java.util.Map;
23+
import java.util.concurrent.TimeUnit;
2324
import org.apache.flink.configuration.ReadableConfig;
2425
import org.apache.flink.util.TimeUtils;
2526
import org.apache.iceberg.Table;
@@ -190,4 +191,22 @@ public int maxAllowedPlanningFailures() {
190191
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
191192
.parse();
192193
}
194+
195+
public String watermarkColumn() {
196+
return confParser
197+
.stringConf()
198+
.option(FlinkReadOptions.WATERMARK_COLUMN)
199+
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
200+
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
201+
.parseOptional();
202+
}
203+
204+
public TimeUnit watermarkColumnTimeUnit() {
205+
return confParser
206+
.enumConfParser(TimeUnit.class)
207+
.option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT)
208+
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION)
209+
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue())
210+
.parse();
211+
}
193212
}

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.flink;
2020

21+
import java.util.concurrent.TimeUnit;
2122
import org.apache.flink.configuration.ConfigOption;
2223
import org.apache.flink.configuration.ConfigOptions;
2324
import org.apache.iceberg.TableProperties;
@@ -109,4 +110,14 @@ private FlinkReadOptions() {}
109110
public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
110111
public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
111112
ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
113+
114+
public static final String WATERMARK_COLUMN = "watermark-column";
115+
public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
116+
ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().noDefaultValue();
117+
118+
public static final String WATERMARK_COLUMN_TIME_UNIT = "watermark-column-time-unit";
119+
public static final ConfigOption<TimeUnit> WATERMARK_COLUMN_TIME_UNIT_OPTION =
120+
ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT)
121+
.enumType(TimeUnit.class)
122+
.defaultValue(TimeUnit.MICROSECONDS);
112123
}

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.iceberg.TableProperties;
4747
import org.apache.iceberg.expressions.Expression;
4848
import org.apache.iceberg.flink.FlinkConfigOptions;
49+
import org.apache.iceberg.flink.FlinkReadConf;
4950
import org.apache.iceberg.flink.FlinkReadOptions;
5051
import org.apache.iceberg.flink.FlinkSchemaUtil;
5152
import org.apache.iceberg.flink.TableLoader;
@@ -219,8 +220,6 @@ public static class Builder<T> {
219220
private Table table;
220221
private SplitAssignerFactory splitAssignerFactory;
221222
private SerializableComparator<IcebergSourceSplit> splitComparator;
222-
private String watermarkColumn;
223-
private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
224223
private ReaderFunction<T> readerFunction;
225224
private ReadableConfig flinkConfig = new Configuration();
226225
private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -242,9 +241,6 @@ public Builder<T> table(Table newTable) {
242241
}
243242

244243
public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
245-
Preconditions.checkArgument(
246-
watermarkColumn == null,
247-
"Watermark column and SplitAssigner should not be set in the same source");
248244
this.splitAssignerFactory = assignerFactory;
249245
return this;
250246
}
@@ -441,7 +437,7 @@ public Builder<T> setAll(Map<String, String> properties) {
441437
* Emits watermarks once per split based on the min value of column statistics from files
442438
* metadata in the given split. The generated watermarks are also used for ordering the splits
443439
* for read. Accepted column types are timestamp/timestamptz/long. For long columns consider
444-
* setting {@link #watermarkTimeUnit(TimeUnit)}.
440+
* setting {@link #watermarkColumnTimeUnit(TimeUnit)}.
445441
*
446442
* <p>Consider setting `read.split.open-file-cost` to prevent combining small files to a single
447443
* split when the watermark is used for watermark alignment.
@@ -450,7 +446,7 @@ public Builder<T> watermarkColumn(String columnName) {
450446
Preconditions.checkArgument(
451447
splitAssignerFactory == null,
452448
"Watermark column and SplitAssigner should not be set in the same source");
453-
this.watermarkColumn = columnName;
449+
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
454450
return this;
455451
}
456452

@@ -459,8 +455,8 @@ public Builder<T> watermarkColumn(String columnName) {
459455
* org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the
460456
* value. The default value is {@link TimeUnit#MICROSECONDS}.
461457
*/
462-
public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
463-
this.watermarkTimeUnit = timeUnit;
458+
public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
459+
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT, timeUnit.name());
464460
return this;
465461
}
466462

@@ -482,13 +478,16 @@ public IcebergSource<T> build() {
482478
}
483479

484480
contextBuilder.resolveConfig(table, readOptions, flinkConfig);
485-
486481
Schema icebergSchema = table.schema();
487482
if (projectedFlinkSchema != null) {
488483
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
489484
}
490485

491486
SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
487+
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig);
488+
String watermarkColumn = flinkReadConf.watermarkColumn();
489+
TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit();
490+
492491
if (watermarkColumn != null) {
493492
// Column statistics is needed for watermark generation
494493
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collection;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.concurrent.TimeUnit;
2627
import org.apache.flink.annotation.Internal;
2728
import org.apache.flink.configuration.ReadableConfig;
2829
import org.apache.flink.util.Preconditions;
@@ -67,6 +68,8 @@ public class ScanContext implements Serializable {
6768
private final Integer planParallelism;
6869
private final int maxPlanningSnapshotCount;
6970
private final int maxAllowedPlanningFailures;
71+
private final String watermarkColumn;
72+
private final TimeUnit watermarkColumnTimeUnit;
7073

7174
private ScanContext(
7275
boolean caseSensitive,
@@ -91,6 +94,8 @@ private ScanContext(
9194
Integer planParallelism,
9295
int maxPlanningSnapshotCount,
9396
int maxAllowedPlanningFailures,
97+
String watermarkColumn,
98+
TimeUnit watermarkColumnTimeUnit,
9499
String branch,
95100
String tag,
96101
String startTag,
@@ -122,6 +127,8 @@ private ScanContext(
122127
this.planParallelism = planParallelism;
123128
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
124129
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
130+
this.watermarkColumn = watermarkColumn;
131+
this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;
125132

126133
validate();
127134
}
@@ -272,6 +279,14 @@ public int maxAllowedPlanningFailures() {
272279
return maxAllowedPlanningFailures;
273280
}
274281

282+
public String watermarkColumn() {
283+
return watermarkColumn;
284+
}
285+
286+
public TimeUnit watermarkColumnTimeUnit() {
287+
return watermarkColumnTimeUnit;
288+
}
289+
275290
public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
276291
return ScanContext.builder()
277292
.caseSensitive(caseSensitive)
@@ -298,6 +313,8 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
298313
.planParallelism(planParallelism)
299314
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
300315
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
316+
.watermarkColumn(watermarkColumn)
317+
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
301318
.build();
302319
}
303320

@@ -327,6 +344,8 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
327344
.planParallelism(planParallelism)
328345
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
329346
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
347+
.watermarkColumn(watermarkColumn)
348+
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
330349
.build();
331350
}
332351

@@ -367,6 +386,9 @@ public static class Builder {
367386
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
368387
private int maxAllowedPlanningFailures =
369388
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
389+
private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
390+
private TimeUnit watermarkColumnTimeUnit =
391+
FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();
370392

371393
private Builder() {}
372394

@@ -500,6 +522,16 @@ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
500522
return this;
501523
}
502524

525+
public Builder watermarkColumn(String newWatermarkColumn) {
526+
this.watermarkColumn = newWatermarkColumn;
527+
return this;
528+
}
529+
530+
public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
531+
this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
532+
return this;
533+
}
534+
503535
public Builder resolveConfig(
504536
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
505537
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
@@ -525,7 +557,9 @@ public Builder resolveConfig(
525557
.planParallelism(flinkReadConf.workerPoolSize())
526558
.includeColumnStats(flinkReadConf.includeColumnStats())
527559
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
528-
.maxAllowedPlanningFailures(maxAllowedPlanningFailures);
560+
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
561+
.watermarkColumn(flinkReadConf.watermarkColumn())
562+
.watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit());
529563
}
530564

531565
public ScanContext build() {
@@ -552,6 +586,8 @@ public ScanContext build() {
552586
planParallelism,
553587
maxPlanningSnapshotCount,
554588
maxAllowedPlanningFailures,
589+
watermarkColumn,
590+
watermarkColumnTimeUnit,
555591
branch,
556592
tag,
557593
startTag,

flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public static List<Row> convertRowDataToRow(List<RowData> rowDataList, RowType r
126126
.collect(Collectors.toList());
127127
}
128128

129-
public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
129+
private static List<Row> convertRecordToRow(List<Record> expectedRecords, Schema schema) {
130130
List<Row> expected = Lists.newArrayList();
131131
@SuppressWarnings("unchecked")
132132
DataStructureConverter<RowData, Row> converter =
@@ -135,6 +135,17 @@ public static void assertRecords(List<Row> results, List<Record> expectedRecords
135135
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
136136
expectedRecords.forEach(
137137
r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
138+
return expected;
139+
}
140+
141+
public static void assertRecordsWithOrder(
142+
List<Row> results, List<Record> expectedRecords, Schema schema) {
143+
List<Row> expected = convertRecordToRow(expectedRecords, schema);
144+
assertRowsWithOrder(results, expected);
145+
}
146+
147+
public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
148+
List<Row> expected = convertRecordToRow(expectedRecords, schema);
138149
assertRows(results, expected);
139150
}
140151

@@ -146,6 +157,10 @@ public static void assertRows(List<Row> results, List<Row> expected) {
146157
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
147158
}
148159

160+
public static void assertRowsWithOrder(List<Row> results, List<Row> expected) {
161+
Assertions.assertThat(results).containsExactlyElementsOf(expected);
162+
}
163+
149164
public static void assertRowData(Schema schema, StructLike expected, RowData actual) {
150165
assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), expected, actual);
151166
}

0 commit comments

Comments
 (0)