Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/flink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ The following properties can be set if using the REST catalog:
| credential | | | A credential to exchange for a token in the OAuth2 client credentials flow. |
| token | | | A token which will be used to interact with the server. |


## Runtime configuration

### Read options
Expand Down Expand Up @@ -133,7 +132,8 @@ env.getConfig()
| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
| limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
| max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3 | Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. |

| watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. If this option is present, the `splitAssignerFactory` will be overridden with `OrderedSplitAssignerFactory`. |
| watermark-column-time-unit | connector.iceberg.watermark-column-time-unit | N/A | TimeUnit.MICROSECONDS | Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS. |

### Write options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -190,4 +191,22 @@ public int maxAllowedPlanningFailures() {
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}

public String watermarkColumn() {
return confParser
.stringConf()
.option(FlinkReadOptions.WATERMARK_COLUMN)
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
.parseOptional();
}

public TimeUnit watermarkColumnTimeUnit() {
return confParser
.enumConfParser(TimeUnit.class)
.option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT)
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink;

import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -109,4 +110,14 @@ private FlinkReadOptions() {}
public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);

public static final String WATERMARK_COLUMN = "watermark-column";
public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().noDefaultValue();

public static final String WATERMARK_COLUMN_TIME_UNIT = "watermark-column-time-unit";
public static final ConfigOption<TimeUnit> WATERMARK_COLUMN_TIME_UNIT_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT)
.enumType(TimeUnit.class)
.defaultValue(TimeUnit.MICROSECONDS);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadConf;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
Expand Down Expand Up @@ -219,8 +220,6 @@ public static class Builder<T> {
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private String watermarkColumn;
private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
Expand All @@ -242,9 +241,6 @@ public Builder<T> table(Table newTable) {
}

public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.splitAssignerFactory = assignerFactory;
return this;
}
Expand Down Expand Up @@ -441,7 +437,7 @@ public Builder<T> setAll(Map<String, String> properties) {
* Emits watermarks once per split based on the min value of column statistics from files
* metadata in the given split. The generated watermarks are also used for ordering the splits
* for read. Accepted column types are timestamp/timestamptz/long. For long columns consider
* setting {@link #watermarkTimeUnit(TimeUnit)}.
* setting {@link #watermarkColumnTimeUnit(TimeUnit)}.
*
* <p>Consider setting `read.split.open-file-cost` to prevent combining small files to a single
* split when the watermark is used for watermark alignment.
Expand All @@ -450,7 +446,7 @@ public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.watermarkColumn = columnName;
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
return this;
}

Expand All @@ -459,8 +455,8 @@ public Builder<T> watermarkColumn(String columnName) {
* org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
this.watermarkTimeUnit = timeUnit;
public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT, timeUnit.name());
return this;
}

Expand All @@ -482,13 +478,16 @@ public IcebergSource<T> build() {
}

contextBuilder.resolveConfig(table, readOptions, flinkConfig);

Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig);
String watermarkColumn = flinkReadConf.watermarkColumn();
TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit();

if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class ScanContext implements Serializable {
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;
private final String watermarkColumn;
private final TimeUnit watermarkColumnTimeUnit;

private ScanContext(
boolean caseSensitive,
Expand All @@ -91,6 +94,8 @@ private ScanContext(
Integer planParallelism,
int maxPlanningSnapshotCount,
int maxAllowedPlanningFailures,
String watermarkColumn,
TimeUnit watermarkColumnTimeUnit,
String branch,
String tag,
String startTag,
Expand Down Expand Up @@ -122,6 +127,8 @@ private ScanContext(
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
this.watermarkColumn = watermarkColumn;
this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;

validate();
}
Expand Down Expand Up @@ -272,6 +279,14 @@ public int maxAllowedPlanningFailures() {
return maxAllowedPlanningFailures;
}

public String watermarkColumn() {
return watermarkColumn;
}

public TimeUnit watermarkColumnTimeUnit() {
return watermarkColumnTimeUnit;
}

public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -298,6 +313,8 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}

Expand Down Expand Up @@ -327,6 +344,8 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.build();
}

Expand Down Expand Up @@ -367,6 +386,9 @@ public static class Builder {
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private int maxAllowedPlanningFailures =
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
private TimeUnit watermarkColumnTimeUnit =
FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();

private Builder() {}

Expand Down Expand Up @@ -500,6 +522,16 @@ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
return this;
}

public Builder watermarkColumn(String newWatermarkColumn) {
this.watermarkColumn = newWatermarkColumn;
return this;
}

public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
this.watermarkColumnTimeUnit = newWatermarkTimeUnit;
return this;
}

public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
Expand All @@ -525,7 +557,9 @@ public Builder resolveConfig(
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
.maxAllowedPlanningFailures(maxAllowedPlanningFailures);
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(flinkReadConf.watermarkColumn())
.watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit());
}

public ScanContext build() {
Expand All @@ -552,6 +586,8 @@ public ScanContext build() {
planParallelism,
maxPlanningSnapshotCount,
maxAllowedPlanningFailures,
watermarkColumn,
watermarkColumnTimeUnit,
branch,
tag,
startTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static List<Row> convertRowDataToRow(List<RowData> rowDataList, RowType r
.collect(Collectors.toList());
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
private static List<Row> convertRecordToRow(List<Record> expectedRecords, Schema schema) {
List<Row> expected = Lists.newArrayList();
@SuppressWarnings("unchecked")
DataStructureConverter<RowData, Row> converter =
Expand All @@ -135,6 +135,17 @@ public static void assertRecords(List<Row> results, List<Record> expectedRecords
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
expectedRecords.forEach(
r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
return expected;
}

public static void assertRecordsWithOrder(
List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRowsWithOrder(results, expected);
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = convertRecordToRow(expectedRecords, schema);
assertRows(results, expected);
}

Expand All @@ -146,6 +157,10 @@ public static void assertRows(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}

public static void assertRowsWithOrder(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).containsExactlyElementsOf(expected);
}

public static void assertRowData(Schema schema, StructLike expected, RowData actual) {
assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), expected, actual);
}
Expand Down
Loading