Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.FileAppender;
Expand Down Expand Up @@ -56,10 +57,11 @@ public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder
this(table, fileFormat, tmp, null);
}

public void appendToTable(DataFile... dataFiles) {
public void appendToTable(String branch, DataFile... dataFiles) {
Preconditions.checkNotNull(table, "table not set");

AppendFiles append = table.newAppend();
AppendFiles append =
table.newAppend().toBranch(branch != null ? branch : SnapshotRef.MAIN_BRANCH);

for (DataFile dataFile : dataFiles) {
append = append.appendFile(dataFile);
Expand All @@ -68,8 +70,21 @@ public void appendToTable(DataFile... dataFiles) {
append.commit();
}

public void appendToTable(DataFile... dataFiles) {
appendToTable(null, dataFiles);
}

public void appendToTable(List<Record> records) throws IOException {
appendToTable(null, records);
appendToTable(null, null, records);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be appendToTable(null, records), right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it won't work because we have two methods:
appendToTable(String branch, List<Record> records)
and
appendToTable(StructLike partition, List<Record> records)

Unless I rearrange the order of branch and place it after records, but I prefer to leave it as it is currently, at the front.

}

public void appendToTable(String branch, List<Record> records) throws IOException {
appendToTable(null, branch, records);
}

public void appendToTable(StructLike partition, String branch, List<Record> records)
throws IOException {
appendToTable(branch, writeFile(partition, records));
}

public void appendToTable(StructLike partition, List<Record> records) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ public Long snapshotId() {
return confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional();
}

public String tag() {
return confParser.stringConf().option(FlinkReadOptions.TAG.key()).parseOptional();
}

public String startTag() {
return confParser.stringConf().option(FlinkReadOptions.START_TAG.key()).parseOptional();
}

public String endTag() {
return confParser.stringConf().option(FlinkReadOptions.END_TAG.key()).parseOptional();
}

public String branch() {
return confParser.stringConf().option(FlinkReadOptions.BRANCH.key()).parseOptional();
}

public boolean caseSensitive() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ private FlinkReadOptions() {}
public static final ConfigOption<Long> SNAPSHOT_ID =
ConfigOptions.key("snapshot-id").longType().defaultValue(null);

public static final ConfigOption<String> TAG =
ConfigOptions.key("tag").stringType().defaultValue(null);

public static final ConfigOption<String> BRANCH =
ConfigOptions.key("branch").stringType().defaultValue(null);

public static final ConfigOption<String> START_TAG =
ConfigOptions.key("start-tag").stringType().defaultValue(null);

public static final ConfigOption<String> END_TAG =
ConfigOptions.key("end-tag").stringType().defaultValue(null);

public static final String CASE_SENSITIVE = "case-sensitive";
public static final ConfigOption<Boolean> CASE_SENSITIVE_OPTION =
ConfigOptions.key(PREFIX + CASE_SENSITIVE).booleanType().defaultValue(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ public Builder snapshotId(Long snapshotId) {
return this;
}

public Builder branch(String branch) {
readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
return this;
}

public Builder tag(String tag) {
readOptions.put(FlinkReadOptions.TAG.key(), tag);
return this;
}

public Builder startSnapshotId(Long startSnapshotId) {
readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(startSnapshotId));
return this;
Expand All @@ -155,6 +165,16 @@ public Builder endSnapshotId(Long endSnapshotId) {
return this;
}

public Builder startTag(String startTag) {
readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
return this;
}

public Builder endTag(String endTag) {
readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
return this;
}

public Builder asOfTimestamp(Long asOfTimestamp) {
readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(asOfTimestamp));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;

Expand Down Expand Up @@ -86,11 +87,31 @@ static CloseableIterable<CombinedScanTask> planTasks(
IncrementalAppendScan scan = table.newIncrementalAppendScan();
scan = refineScanWithBaseConfigs(scan, context, workerPool);

if (context.startTag() != null) {
Preconditions.checkArgument(
table.snapshot(context.startTag()) != null,
"Cannot find snapshot with tag %s",
context.startTag());
scan = scan.fromSnapshotExclusive(table.snapshot(context.startTag()).snapshotId());
}

if (context.startSnapshotId() != null) {
Preconditions.checkArgument(
context.startTag() == null, "START_SNAPSHOT_ID and START_TAG cannot both be set");
scan = scan.fromSnapshotExclusive(context.startSnapshotId());
}

if (context.endTag() != null) {
Preconditions.checkArgument(
table.snapshot(context.endTag()) != null,
"Cannot find snapshot with tag %s",
context.endTag());
scan = scan.toSnapshot(table.snapshot(context.endTag()).snapshotId());
}

if (context.endSnapshotId() != null) {
Preconditions.checkArgument(
context.endTag() == null, "END_SNAPSHOT_ID and END_TAG cannot both be set");
scan = scan.toSnapshot(context.endSnapshotId());
}

Expand All @@ -101,6 +122,10 @@ static CloseableIterable<CombinedScanTask> planTasks(

if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
} else if (context.tag() != null) {
scan = scan.useRef(context.tag());
} else if (context.branch() != null) {
scan = scan.useRef(context.branch());
}

if (context.asOfTimestamp() != null) {
Expand All @@ -119,7 +144,9 @@ private enum ScanMode {
private static ScanMode checkScanMode(ScanContext context) {
if (context.isStreaming()
|| context.startSnapshotId() != null
|| context.endSnapshotId() != null) {
|| context.endSnapshotId() != null
|| context.startTag() != null
|| context.endTag() != null) {
return ScanMode.INCREMENTAL_APPEND_SCAN;
} else {
return ScanMode.BATCH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,26 @@ public Builder<T> startSnapshotId(Long newStartSnapshotId) {
return this;
}

public Builder<T> tag(String tag) {
readOptions.put(FlinkReadOptions.TAG.key(), tag);
return this;
}

public Builder<T> branch(String branch) {
readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
return this;
}

public Builder<T> startTag(String startTag) {
readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
return this;
}

public Builder<T> endTag(String endTag) {
readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
return this;
}

public Builder<T> endSnapshotId(Long newEndSnapshotId) {
if (newEndSnapshotId != null) {
readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(newEndSnapshotId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ public class ScanContext implements Serializable {
private final boolean caseSensitive;
private final boolean exposeLocality;
private final Long snapshotId;
private final String branch;
private final String tag;
private final StreamingStartingStrategy startingStrategy;
private final Long startSnapshotId;
private final Long startSnapshotTimestamp;
private final Long endSnapshotId;
private final Long asOfTimestamp;
private final String startTag;
private final String endTag;
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
Expand Down Expand Up @@ -81,14 +85,22 @@ private ScanContext(
boolean includeColumnStats,
boolean exposeLocality,
Integer planParallelism,
int maxPlanningSnapshotCount) {
int maxPlanningSnapshotCount,
String branch,
String tag,
String startTag,
String endTag) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.tag = tag;
this.branch = branch;
this.startingStrategy = startingStrategy;
this.startSnapshotTimestamp = startSnapshotTimestamp;
this.startSnapshotId = startSnapshotId;
this.endSnapshotId = endSnapshotId;
this.asOfTimestamp = asOfTimestamp;
this.startTag = startTag;
this.endTag = endTag;
this.splitSize = splitSize;
this.splitLookback = splitLookback;
this.splitOpenFileCost = splitOpenFileCost;
Expand Down Expand Up @@ -125,7 +137,24 @@ private void validate() {
startSnapshotId == null,
"Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
}

Preconditions.checkArgument(
branch == null,
String.format(
"Cannot scan table using ref %s configured for streaming reader yet", branch));
Copy link
Contributor Author

@hililiwei hililiwei Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5984 seems to be the prerequisite for flink to implement stream incremental read with branch.


Preconditions.checkArgument(
tag == null,
String.format("Cannot scan table using ref %s configured for streaming reader", tag));
}

Preconditions.checkArgument(
!(startTag != null && startSnapshotId() != null),
"START_SNAPSHOT_ID and START_TAG cannot both be set.");

Preconditions.checkArgument(
!(endTag != null && endSnapshotId() != null),
"END_SNAPSHOT_ID and END_TAG cannot both be set.");
}

public boolean caseSensitive() {
Expand All @@ -136,6 +165,22 @@ public Long snapshotId() {
return snapshotId;
}

public String branch() {
return branch;
}

public String tag() {
return tag;
}

public String startTag() {
return startTag;
}

public String endTag() {
return endTag;
}

public StreamingStartingStrategy streamingStartingStrategy() {
return startingStrategy;
}
Expand Down Expand Up @@ -212,8 +257,12 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
return ScanContext.builder()
.caseSensitive(caseSensitive)
.useSnapshotId(null)
.useBranch(branch)
.useTag(null)
.startSnapshotId(newStartSnapshotId)
.endSnapshotId(newEndSnapshotId)
.startTag(null)
.endTag(null)
.asOfTimestamp(null)
.splitSize(splitSize)
.splitLookback(splitLookback)
Expand All @@ -235,8 +284,12 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
.useSnapshotId(newSnapshotId)
.useBranch(branch)
.useTag(tag)
.startSnapshotId(null)
.endSnapshotId(null)
.startTag(null)
.endTag(null)
.asOfTimestamp(null)
.splitSize(splitSize)
.splitLookback(splitLookback)
Expand All @@ -261,6 +314,10 @@ public static Builder builder() {
public static class Builder {
private boolean caseSensitive = FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue();
private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue();
private String branch = FlinkReadOptions.BRANCH.defaultValue();
private String tag = FlinkReadOptions.TAG.defaultValue();
private String startTag = FlinkReadOptions.START_TAG.defaultValue();
private String endTag = FlinkReadOptions.END_TAG.defaultValue();
private StreamingStartingStrategy startingStrategy =
FlinkReadOptions.STARTING_STRATEGY_OPTION.defaultValue();
private Long startSnapshotTimestamp = FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.defaultValue();
Expand Down Expand Up @@ -297,6 +354,16 @@ public Builder useSnapshotId(Long newSnapshotId) {
return this;
}

public Builder useTag(String newTag) {
this.tag = newTag;
return this;
}

public Builder useBranch(String newBranch) {
this.branch = newBranch;
return this;
}

public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) {
this.startingStrategy = newStartingStrategy;
return this;
Expand All @@ -317,6 +384,16 @@ public Builder endSnapshotId(Long newEndSnapshotId) {
return this;
}

public Builder startTag(String newStartTag) {
this.startTag = newStartTag;
return this;
}

public Builder endTag(String newEndTag) {
this.endTag = newEndTag;
return this;
}

public Builder asOfTimestamp(Long newAsOfTimestamp) {
this.asOfTimestamp = newAsOfTimestamp;
return this;
Expand Down Expand Up @@ -392,6 +469,10 @@ public Builder resolveConfig(
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);

return this.useSnapshotId(flinkReadConf.snapshotId())
.useTag(flinkReadConf.tag())
.useBranch(flinkReadConf.branch())
.startTag(flinkReadConf.startTag())
.endTag(flinkReadConf.endTag())
.caseSensitive(flinkReadConf.caseSensitive())
.asOfTimestamp(flinkReadConf.asOfTimestamp())
.startingStrategy(flinkReadConf.startingStrategy())
Expand Down Expand Up @@ -431,7 +512,11 @@ public ScanContext build() {
includeColumnStats,
exposeLocality,
planParallelism,
maxPlanningSnapshotCount);
maxPlanningSnapshotCount,
branch,
tag,
startTag,
endTag);
}
}
}
Loading