Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ public Long snapshotId() {
return confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional();
}

public String tag() {
return confParser.stringConf().option(FlinkReadOptions.TAG.key()).parseOptional();
}

public String startTag() {
return confParser.stringConf().option(FlinkReadOptions.START_TAG.key()).parseOptional();
}

public String endTag() {
return confParser.stringConf().option(FlinkReadOptions.END_TAG.key()).parseOptional();
}

public String branch() {
return confParser.stringConf().option(FlinkReadOptions.BRANCH.key()).parseOptional();
}

public boolean caseSensitive() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ private FlinkReadOptions() {}
public static final ConfigOption<Long> SNAPSHOT_ID =
ConfigOptions.key("snapshot-id").longType().defaultValue(null);

public static final ConfigOption<String> TAG =
ConfigOptions.key("tag").stringType().defaultValue(null);

public static final ConfigOption<String> BRANCH =
ConfigOptions.key("branch").stringType().defaultValue(null);

public static final ConfigOption<String> START_TAG =
ConfigOptions.key("start-tag").stringType().defaultValue(null);

public static final ConfigOption<String> END_TAG =
ConfigOptions.key("end-tag").stringType().defaultValue(null);

public static final String CASE_SENSITIVE = "case-sensitive";
public static final ConfigOption<Boolean> CASE_SENSITIVE_OPTION =
ConfigOptions.key(PREFIX + CASE_SENSITIVE).booleanType().defaultValue(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ public Builder snapshotId(Long snapshotId) {
return this;
}

public Builder branch(String branch) {
readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
return this;
}

public Builder tag(String tag) {
readOptions.put(FlinkReadOptions.TAG.key(), tag);
return this;
}

public Builder startSnapshotId(Long startSnapshotId) {
readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(startSnapshotId));
return this;
Expand All @@ -155,6 +165,16 @@ public Builder endSnapshotId(Long endSnapshotId) {
return this;
}

public Builder startTag(String startTag) {
readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
return this;
}

public Builder endTag(String endTag) {
readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
return this;
}

public Builder asOfTimestamp(Long asOfTimestamp) {
readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(asOfTimestamp));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;

Expand Down Expand Up @@ -86,11 +87,31 @@ static CloseableIterable<CombinedScanTask> planTasks(
IncrementalAppendScan scan = table.newIncrementalAppendScan();
scan = refineScanWithBaseConfigs(scan, context, workerPool);

if (context.startTag() != null) {
Preconditions.checkArgument(
table.snapshot(context.startTag()) != null,
"Cannot find snapshot with tag %s",
context.startTag());
scan = scan.fromSnapshotExclusive(table.snapshot(context.startTag()).snapshotId());
}

if (context.startSnapshotId() != null) {
Preconditions.checkArgument(
context.startTag() == null, "START_SNAPSHOT_ID and START_TAG cannot both be set");
scan = scan.fromSnapshotExclusive(context.startSnapshotId());
}

if (context.endTag() != null) {
Preconditions.checkArgument(
table.snapshot(context.endTag()) != null,
"Cannot find snapshot with tag %s",
context.endTag());
scan = scan.toSnapshot(table.snapshot(context.endTag()).snapshotId());
}

if (context.endSnapshotId() != null) {
Preconditions.checkArgument(
context.endTag() == null, "END_SNAPSHOT_ID and END_TAG cannot both be set");
scan = scan.toSnapshot(context.endSnapshotId());
}

Expand All @@ -101,6 +122,10 @@ static CloseableIterable<CombinedScanTask> planTasks(

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
} else if (context.tag() != null) {
scan = scan.useRef(context.tag());
} else if (context.branch() != null) {
scan = scan.useRef(context.branch());
}

if (context.asOfTimestamp() != null) {
Expand All @@ -119,7 +144,9 @@ private enum ScanMode {
private static ScanMode checkScanMode(ScanContext context) {
if (context.isStreaming()
|| context.startSnapshotId() != null
|| context.endSnapshotId() != null) {
|| context.endSnapshotId() != null
|| context.startTag() != null
|| context.endTag() != null) {
return ScanMode.INCREMENTAL_APPEND_SCAN;
} else {
return ScanMode.BATCH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,26 @@ public Builder<T> startSnapshotId(Long newStartSnapshotId) {
return this;
}

public Builder<T> tag(String tag) {
readOptions.put(FlinkReadOptions.TAG.key(), tag);
return this;
}

public Builder<T> branch(String branch) {
readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
return this;
}

public Builder<T> startTag(String startTag) {
readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
return this;
}

public Builder<T> endTag(String endTag) {
readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
return this;
}

public Builder<T> endSnapshotId(Long newEndSnapshotId) {
if (newEndSnapshotId != null) {
readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(newEndSnapshotId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ public class ScanContext implements Serializable {
private final boolean caseSensitive;
private final boolean exposeLocality;
private final Long snapshotId;
private final String branch;
private final String tag;
private final StreamingStartingStrategy startingStrategy;
private final Long startSnapshotId;
private final Long startSnapshotTimestamp;
private final Long endSnapshotId;
private final Long asOfTimestamp;
private final String startTag;
private final String endTag;
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
Expand Down Expand Up @@ -81,14 +85,22 @@ private ScanContext(
boolean includeColumnStats,
boolean exposeLocality,
Integer planParallelism,
int maxPlanningSnapshotCount) {
int maxPlanningSnapshotCount,
String branch,
String tag,
String startTag,
String endTag) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.tag = tag;
this.branch = branch;
this.startingStrategy = startingStrategy;
this.startSnapshotTimestamp = startSnapshotTimestamp;
this.startSnapshotId = startSnapshotId;
this.endSnapshotId = endSnapshotId;
this.asOfTimestamp = asOfTimestamp;
this.startTag = startTag;
this.endTag = endTag;
this.splitSize = splitSize;
this.splitLookback = splitLookback;
this.splitOpenFileCost = splitOpenFileCost;
Expand Down Expand Up @@ -125,7 +137,24 @@ private void validate() {
startSnapshotId == null,
"Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
}

Preconditions.checkArgument(
branch == null,
String.format(
"Cannot scan table using ref %s configured for streaming reader yet", branch));

Preconditions.checkArgument(
tag == null,
String.format("Cannot scan table using ref %s configured for streaming reader", tag));
}

Preconditions.checkArgument(
!(startTag != null && startSnapshotId() != null),
"START_SNAPSHOT_ID and START_TAG cannot both be set.");

Preconditions.checkArgument(
!(endTag != null && endSnapshotId() != null),
"END_SNAPSHOT_ID and END_TAG cannot both be set.");
}

public boolean caseSensitive() {
Expand All @@ -136,6 +165,22 @@ public Long snapshotId() {
return snapshotId;
}

public String branch() {
return branch;
}

public String tag() {
return tag;
}

public String startTag() {
return startTag;
}

public String endTag() {
return endTag;
}

public StreamingStartingStrategy streamingStartingStrategy() {
return startingStrategy;
}
Expand Down Expand Up @@ -212,8 +257,12 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
return ScanContext.builder()
.caseSensitive(caseSensitive)
.useSnapshotId(null)
.useBranch(branch)
.useTag(null)
.startSnapshotId(newStartSnapshotId)
.endSnapshotId(newEndSnapshotId)
.startTag(null)
.endTag(null)
.asOfTimestamp(null)
.splitSize(splitSize)
.splitLookback(splitLookback)
Expand All @@ -235,8 +284,12 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
.useSnapshotId(newSnapshotId)
.useBranch(branch)
.useTag(tag)
.startSnapshotId(null)
.endSnapshotId(null)
.startTag(null)
.endTag(null)
.asOfTimestamp(null)
.splitSize(splitSize)
.splitLookback(splitLookback)
Expand All @@ -261,6 +314,10 @@ public static Builder builder() {
public static class Builder {
private boolean caseSensitive = FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue();
private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue();
private String branch = FlinkReadOptions.BRANCH.defaultValue();
private String tag = FlinkReadOptions.TAG.defaultValue();
private String startTag = FlinkReadOptions.START_TAG.defaultValue();
private String endTag = FlinkReadOptions.END_TAG.defaultValue();
private StreamingStartingStrategy startingStrategy =
FlinkReadOptions.STARTING_STRATEGY_OPTION.defaultValue();
private Long startSnapshotTimestamp = FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.defaultValue();
Expand Down Expand Up @@ -297,6 +354,16 @@ public Builder useSnapshotId(Long newSnapshotId) {
return this;
}

public Builder useTag(String newTag) {
this.tag = newTag;
return this;
}

public Builder useBranch(String newBranch) {
this.branch = newBranch;
return this;
}

public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) {
this.startingStrategy = newStartingStrategy;
return this;
Expand All @@ -317,6 +384,16 @@ public Builder endSnapshotId(Long newEndSnapshotId) {
return this;
}

public Builder startTag(String newStartTag) {
this.startTag = newStartTag;
return this;
}

public Builder endTag(String newEndTag) {
this.endTag = newEndTag;
return this;
}

public Builder asOfTimestamp(Long newAsOfTimestamp) {
this.asOfTimestamp = newAsOfTimestamp;
return this;
Expand Down Expand Up @@ -392,6 +469,10 @@ public Builder resolveConfig(
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);

return this.useSnapshotId(flinkReadConf.snapshotId())
.useTag(flinkReadConf.tag())
.useBranch(flinkReadConf.branch())
.startTag(flinkReadConf.startTag())
.endTag(flinkReadConf.endTag())
.caseSensitive(flinkReadConf.caseSensitive())
.asOfTimestamp(flinkReadConf.asOfTimestamp())
.startingStrategy(flinkReadConf.startingStrategy())
Expand Down Expand Up @@ -431,7 +512,11 @@ public ScanContext build() {
includeColumnStats,
exposeLocality,
planParallelism,
maxPlanningSnapshotCount);
maxPlanningSnapshotCount,
branch,
tag,
startTag,
endTag);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext
Preconditions.checkArgument(
scanContext.endSnapshotId() == null,
"Cannot set end-snapshot-id option for streaming reader");
Preconditions.checkArgument(
scanContext.endTag() == null, "Cannot set end-tag option for streaming reader");
Preconditions.checkArgument(
scanContext.maxPlanningSnapshotCount() > 0,
"The max-planning-snapshot-count must be greater than zero");
Expand Down Expand Up @@ -124,17 +126,34 @@ public void initializeState(FunctionInitializationContext context) throws Except
if (context.isRestored()) {
LOG.info("Restoring state for the {}.", getClass().getSimpleName());
lastSnapshotId = lastSnapshotIdState.get().iterator().next();
} else if (scanContext.startSnapshotId() != null) {
} else if (scanContext.startTag() != null || scanContext.startSnapshotId() != null) {
Preconditions.checkArgument(
!(scanContext.startTag() != null && scanContext.startSnapshotId() != null),
"START_SNAPSHOT_ID and START_TAG cannot both be set.");
Preconditions.checkArgument(
scanContext.branch() == null,
"Cannot scan table using ref %s configured for streaming reader yet.");
Preconditions.checkNotNull(
table.currentSnapshot(), "Don't have any available snapshot in table.");

long startSnapshotId;
if (scanContext.startTag() != null) {
Preconditions.checkArgument(
table.snapshot(scanContext.startTag()) != null,
"Cannot find snapshot with tag %s in table.",
scanContext.startTag());
startSnapshotId = table.snapshot(scanContext.startTag()).snapshotId();
} else {
startSnapshotId = scanContext.startSnapshotId();
}

long currentSnapshotId = table.currentSnapshot().snapshotId();
Preconditions.checkState(
SnapshotUtil.isAncestorOf(table, currentSnapshotId, scanContext.startSnapshotId()),
SnapshotUtil.isAncestorOf(table, currentSnapshotId, startSnapshotId),
"The option start-snapshot-id %s is not an ancestor of the current snapshot.",
scanContext.startSnapshotId());
startSnapshotId);

lastSnapshotId = scanContext.startSnapshotId();
lastSnapshotId = startSnapshotId;
}
}

Expand Down
Loading