diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java index e2cacc2adf50..baef57a8e74b 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java @@ -39,6 +39,22 @@ public Long snapshotId() { return confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional(); } + public String tag() { + return confParser.stringConf().option(FlinkReadOptions.TAG.key()).parseOptional(); + } + + public String startTag() { + return confParser.stringConf().option(FlinkReadOptions.START_TAG.key()).parseOptional(); + } + + public String endTag() { + return confParser.stringConf().option(FlinkReadOptions.END_TAG.key()).parseOptional(); + } + + public String branch() { + return confParser.stringConf().option(FlinkReadOptions.BRANCH.key()).parseOptional(); + } + public boolean caseSensitive() { return confParser .booleanConf() diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java index 54f64dbfa8c9..d75b2234d797 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java @@ -32,6 +32,18 @@ private FlinkReadOptions() {} public static final ConfigOption SNAPSHOT_ID = ConfigOptions.key("snapshot-id").longType().defaultValue(null); + public static final ConfigOption TAG = + ConfigOptions.key("tag").stringType().defaultValue(null); + + public static final ConfigOption BRANCH = + ConfigOptions.key("branch").stringType().defaultValue(null); + + public static final ConfigOption START_TAG = + ConfigOptions.key("start-tag").stringType().defaultValue(null); + + public static final ConfigOption END_TAG = + ConfigOptions.key("end-tag").stringType().defaultValue(null); + public static final String CASE_SENSITIVE = "case-sensitive"; public static final ConfigOption CASE_SENSITIVE_OPTION = ConfigOptions.key(PREFIX + CASE_SENSITIVE).booleanType().defaultValue(false); diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index 35004bad386f..fa1656c55278 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -145,6 +145,16 @@ public Builder snapshotId(Long snapshotId) { return this; } + public Builder branch(String branch) { + readOptions.put(FlinkReadOptions.BRANCH.key(), branch); + return this; + } + + public Builder tag(String tag) { + readOptions.put(FlinkReadOptions.TAG.key(), tag); + return this; + } + public Builder startSnapshotId(Long startSnapshotId) { readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(startSnapshotId)); return this; @@ -155,6 +165,16 @@ public Builder endSnapshotId(Long endSnapshotId) { return this; } + public Builder startTag(String startTag) { + readOptions.put(FlinkReadOptions.START_TAG.key(), startTag); + return this; + } + + public Builder endTag(String endTag) { + readOptions.put(FlinkReadOptions.END_TAG.key(), endTag); + return this; + } + public Builder asOfTimestamp(Long asOfTimestamp) { readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(asOfTimestamp)); return this; diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index 3ff349dd8bd8..38a55e437d59 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -34,6 +34,7 @@ import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.Tasks; @@ -86,11 +87,31 @@ static CloseableIterable planTasks( IncrementalAppendScan scan = table.newIncrementalAppendScan(); scan = refineScanWithBaseConfigs(scan, context, workerPool); + if (context.startTag() != null) { + Preconditions.checkArgument( + table.snapshot(context.startTag()) != null, + "Cannot find snapshot with tag %s", + context.startTag()); + scan = scan.fromSnapshotExclusive(table.snapshot(context.startTag()).snapshotId()); + } + if (context.startSnapshotId() != null) { + Preconditions.checkArgument( + context.startTag() == null, "START_SNAPSHOT_ID and START_TAG cannot both be set"); scan = scan.fromSnapshotExclusive(context.startSnapshotId()); } + if (context.endTag() != null) { + Preconditions.checkArgument( + table.snapshot(context.endTag()) != null, + "Cannot find snapshot with tag %s", + context.endTag()); + scan = scan.toSnapshot(table.snapshot(context.endTag()).snapshotId()); + } + if (context.endSnapshotId() != null) { + Preconditions.checkArgument( + context.endTag() == null, "END_SNAPSHOT_ID and END_TAG cannot both be set"); scan = scan.toSnapshot(context.endSnapshotId()); } @@ -101,6 +122,10 @@ static CloseableIterable planTasks( if (context.snapshotId() != null) { scan = scan.useSnapshot(context.snapshotId()); + } else if (context.tag() != null) { + scan = scan.useRef(context.tag()); + } else if (context.branch() != null) { + scan = scan.useRef(context.branch()); } if (context.asOfTimestamp() != null) { @@ -119,7 +144,9 @@ private enum ScanMode { private static ScanMode checkScanMode(ScanContext context) { if (context.isStreaming() || context.startSnapshotId() != null - || context.endSnapshotId() != null) { + || context.endSnapshotId() != null + || context.startTag() != null + || context.endTag() != null) { return ScanMode.INCREMENTAL_APPEND_SCAN; } else { return ScanMode.BATCH; diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 4ed74676aafb..718460ae8c8e 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -278,6 +278,26 @@ public Builder startSnapshotId(Long newStartSnapshotId) { return this; } + public Builder tag(String tag) { + readOptions.put(FlinkReadOptions.TAG.key(), tag); + return this; + } + + public Builder branch(String branch) { + readOptions.put(FlinkReadOptions.BRANCH.key(), branch); + return this; + } + + public Builder startTag(String startTag) { + readOptions.put(FlinkReadOptions.START_TAG.key(), startTag); + return this; + } + + public Builder endTag(String endTag) { + readOptions.put(FlinkReadOptions.END_TAG.key(), endTag); + return this; + } + public Builder endSnapshotId(Long newEndSnapshotId) { if (newEndSnapshotId != null) { readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(newEndSnapshotId)); diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 02c4943fe90f..23f33e6d2ee4 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -42,11 +42,15 @@ public class ScanContext implements Serializable { private final boolean caseSensitive; private final boolean exposeLocality; private final Long snapshotId; + private final String branch; + private final String tag; private final StreamingStartingStrategy startingStrategy; private final Long startSnapshotId; private final Long startSnapshotTimestamp; private final Long endSnapshotId; private final Long asOfTimestamp; + private final String startTag; + private final String endTag; private final Long splitSize; private final Integer splitLookback; private final Long splitOpenFileCost; @@ -81,14 +85,22 @@ private ScanContext( boolean includeColumnStats, boolean exposeLocality, Integer planParallelism, - int maxPlanningSnapshotCount) { + int maxPlanningSnapshotCount, + String branch, + String tag, + String startTag, + String endTag) { this.caseSensitive = caseSensitive; this.snapshotId = snapshotId; + this.tag = tag; + this.branch = branch; this.startingStrategy = startingStrategy; this.startSnapshotTimestamp = startSnapshotTimestamp; this.startSnapshotId = startSnapshotId; this.endSnapshotId = endSnapshotId; this.asOfTimestamp = asOfTimestamp; + this.startTag = startTag; + this.endTag = endTag; this.splitSize = splitSize; this.splitLookback = splitLookback; this.splitOpenFileCost = splitOpenFileCost; @@ -125,7 +137,24 @@ private void validate() { startSnapshotId == null, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); } + + Preconditions.checkArgument( + branch == null, + String.format( + "Cannot scan table using ref %s configured for streaming reader yet", branch)); + + Preconditions.checkArgument( + tag == null, + String.format("Cannot scan table using ref %s configured for streaming reader", tag)); } + + Preconditions.checkArgument( + !(startTag != null && startSnapshotId() != null), + "START_SNAPSHOT_ID and START_TAG cannot both be set."); + + Preconditions.checkArgument( + !(endTag != null && endSnapshotId() != null), + "END_SNAPSHOT_ID and END_TAG cannot both be set."); } public boolean caseSensitive() { @@ -136,6 +165,22 @@ public Long snapshotId() { return snapshotId; } + public String branch() { + return branch; + } + + public String tag() { + return tag; + } + + public String startTag() { + return startTag; + } + + public String endTag() { + return endTag; + } + public StreamingStartingStrategy streamingStartingStrategy() { return startingStrategy; } @@ -212,8 +257,12 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(null) + .useBranch(branch) + .useTag(null) .startSnapshotId(newStartSnapshotId) .endSnapshotId(newEndSnapshotId) + .startTag(null) + .endTag(null) .asOfTimestamp(null) .splitSize(splitSize) .splitLookback(splitLookback) @@ -235,8 +284,12 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(newSnapshotId) + .useBranch(branch) + .useTag(tag) .startSnapshotId(null) .endSnapshotId(null) + .startTag(null) + .endTag(null) .asOfTimestamp(null) .splitSize(splitSize) .splitLookback(splitLookback) @@ -261,6 +314,10 @@ public static Builder builder() { public static class Builder { private boolean caseSensitive = FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue(); private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue(); + private String branch = FlinkReadOptions.BRANCH.defaultValue(); + private String tag = FlinkReadOptions.TAG.defaultValue(); + private String startTag = FlinkReadOptions.START_TAG.defaultValue(); + private String endTag = FlinkReadOptions.END_TAG.defaultValue(); private StreamingStartingStrategy startingStrategy = FlinkReadOptions.STARTING_STRATEGY_OPTION.defaultValue(); private Long startSnapshotTimestamp = FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.defaultValue(); @@ -297,6 +354,16 @@ public Builder useSnapshotId(Long newSnapshotId) { return this; } + public Builder useTag(String newTag) { + this.tag = newTag; + return this; + } + + public Builder useBranch(String newBranch) { + this.branch = newBranch; + return this; + } + public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) { this.startingStrategy = newStartingStrategy; return this; @@ -317,6 +384,16 @@ public Builder endSnapshotId(Long newEndSnapshotId) { return this; } + public Builder startTag(String newStartTag) { + this.startTag = newStartTag; + return this; + } + + public Builder endTag(String newEndTag) { + this.endTag = newEndTag; + return this; + } + public Builder asOfTimestamp(Long newAsOfTimestamp) { this.asOfTimestamp = newAsOfTimestamp; return this; @@ -392,6 +469,10 @@ public Builder resolveConfig( FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig); return this.useSnapshotId(flinkReadConf.snapshotId()) + .useTag(flinkReadConf.tag()) + .useBranch(flinkReadConf.branch()) + .startTag(flinkReadConf.startTag()) + .endTag(flinkReadConf.endTag()) .caseSensitive(flinkReadConf.caseSensitive()) .asOfTimestamp(flinkReadConf.asOfTimestamp()) .startingStrategy(flinkReadConf.startingStrategy()) @@ -431,7 +512,11 @@ public ScanContext build() { includeColumnStats, exposeLocality, planParallelism, - maxPlanningSnapshotCount); + maxPlanningSnapshotCount, + branch, + tag, + startTag, + endTag); } } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java index 75791c95bd4a..c27e29613fed 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java @@ -87,6 +87,8 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext Preconditions.checkArgument( scanContext.endSnapshotId() == null, "Cannot set end-snapshot-id option for streaming reader"); + Preconditions.checkArgument( + scanContext.endTag() == null, "Cannot set end-tag option for streaming reader"); Preconditions.checkArgument( scanContext.maxPlanningSnapshotCount() > 0, "The max-planning-snapshot-count must be greater than zero"); @@ -124,17 +126,34 @@ public void initializeState(FunctionInitializationContext context) throws Except if (context.isRestored()) { LOG.info("Restoring state for the {}.", getClass().getSimpleName()); lastSnapshotId = lastSnapshotIdState.get().iterator().next(); - } else if (scanContext.startSnapshotId() != null) { + } else if (scanContext.startTag() != null || scanContext.startSnapshotId() != null) { + Preconditions.checkArgument( + !(scanContext.startTag() != null && scanContext.startSnapshotId() != null), + "START_SNAPSHOT_ID and START_TAG cannot both be set."); + Preconditions.checkArgument( + scanContext.branch() == null, + "Cannot scan table using ref %s configured for streaming reader yet."); Preconditions.checkNotNull( table.currentSnapshot(), "Don't have any available snapshot in table."); + long startSnapshotId; + if (scanContext.startTag() != null) { + Preconditions.checkArgument( + table.snapshot(scanContext.startTag()) != null, + "Cannot find snapshot with tag %s in table.", + scanContext.startTag()); + startSnapshotId = table.snapshot(scanContext.startTag()).snapshotId(); + } else { + startSnapshotId = scanContext.startSnapshotId(); + } + long currentSnapshotId = table.currentSnapshot().snapshotId(); Preconditions.checkState( - SnapshotUtil.isAncestorOf(table, currentSnapshotId, scanContext.startSnapshotId()), + SnapshotUtil.isAncestorOf(table, currentSnapshotId, startSnapshotId), "The option start-snapshot-id %s is not an ancestor of the current snapshot.", - scanContext.startSnapshotId()); + startSnapshotId); - lastSnapshotId = scanContext.startSnapshotId(); + lastSnapshotId = startSnapshotId; } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index 5e4154490f71..a6cdc212b769 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -28,6 +28,7 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; @@ -229,6 +230,149 @@ public void testSnapshotReads() throws Exception { TestFixtures.SCHEMA); } + @Test + public void testTagReads() throws Exception { + Table table = + catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List expectedRecords1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(expectedRecords1); + long snapshotId = table.currentSnapshot().snapshotId(); + + table.manageSnapshots().createTag("t1", snapshotId).commit(); + + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords1, TestFixtures.SCHEMA); + + List expectedRecords2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(expectedRecords2); + snapshotId = table.currentSnapshot().snapshotId(); + + table.manageSnapshots().replaceTag("t1", snapshotId).commit(); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(expectedRecords1); + expectedRecords.addAll(expectedRecords2); + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords, TestFixtures.SCHEMA); + } + + @Test + public void testBranchReads() throws Exception { + Table table = + catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List expectedRecordsBase = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(expectedRecordsBase); + long snapshotId = table.currentSnapshot().snapshotId(); + + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName, snapshotId).commit(); + + List expectedRecordsForBranch = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(branchName, expectedRecordsForBranch); + + List expectedRecordsForMain = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(expectedRecordsForMain); + + List branchExpectedRecords = Lists.newArrayList(); + branchExpectedRecords.addAll(expectedRecordsBase); + branchExpectedRecords.addAll(expectedRecordsForBranch); + + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("branch", branchName)), + branchExpectedRecords, + TestFixtures.SCHEMA); + + List mainExpectedRecords = Lists.newArrayList(); + mainExpectedRecords.addAll(expectedRecordsBase); + mainExpectedRecords.addAll(expectedRecordsForMain); + + TestHelpers.assertRecords(run(), mainExpectedRecords, TestFixtures.SCHEMA); + } + + @Test + public void testIncrementalReadViaTag() throws Exception { + Table table = + catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List records1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(records1); + long snapshotId1 = table.currentSnapshot().snapshotId(); + String startTag = "t1"; + table.manageSnapshots().createTag(startTag, snapshotId1).commit(); + + List records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 1L); + helper.appendToTable(records2); + + List records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 2L); + helper.appendToTable(records3); + long snapshotId3 = table.currentSnapshot().snapshotId(); + String endTag = "t2"; + table.manageSnapshots().createTag(endTag, snapshotId3).commit(); + + helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 3L)); + + List expected = Lists.newArrayList(); + expected.addAll(records2); + expected.addAll(records3); + + TestHelpers.assertRecords( + runWithOptions( + ImmutableMap.builder() + .put("start-tag", startTag) + .put("end-tag", endTag) + .buildOrThrow()), + expected, + TestFixtures.SCHEMA); + + TestHelpers.assertRecords( + runWithOptions( + ImmutableMap.builder() + .put("start-snapshot-id", Long.toString(snapshotId1)) + .put("end-tag", endTag) + .buildOrThrow()), + expected, + TestFixtures.SCHEMA); + + TestHelpers.assertRecords( + runWithOptions( + ImmutableMap.builder() + .put("start-tag", startTag) + .put("end-snapshot-id", Long.toString(snapshotId3)) + .buildOrThrow()), + expected, + TestFixtures.SCHEMA); + + AssertHelpers.assertThrows( + "START_SNAPSHOT_ID and START_TAG cannot both be set.", + Exception.class, + () -> + runWithOptions( + ImmutableMap.builder() + .put("start-tag", startTag) + .put("end-tag", endTag) + .put("start-snapshot-id", Long.toString(snapshotId1)) + .buildOrThrow())); + + AssertHelpers.assertThrows( + "END_SNAPSHOT_ID and END_TAG cannot both be set.", + Exception.class, + () -> + runWithOptions( + ImmutableMap.builder() + .put("start-tag", startTag) + .put("end-tag", endTag) + .put("end-snapshot-id", Long.toString(snapshotId3)) + .buildOrThrow())); + } + @Test public void testIncrementalRead() throws Exception { Table table = diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java index 64c84bbf5ce1..cebce61c0803 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java @@ -62,6 +62,10 @@ protected List runWithOptions(Map options) throws Exception FlinkSource.Builder builder = FlinkSource.forRowData(); Optional.ofNullable(options.get("snapshot-id")) .ifPresent(value -> builder.snapshotId(Long.parseLong(value))); + Optional.ofNullable(options.get("tag")).ifPresent(value -> builder.tag(value)); + Optional.ofNullable(options.get("branch")).ifPresent(value -> builder.branch(value)); + Optional.ofNullable(options.get("start-tag")).ifPresent(value -> builder.startTag(value)); + Optional.ofNullable(options.get("end-tag")).ifPresent(value -> builder.endTag(value)); Optional.ofNullable(options.get("start-snapshot-id")) .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value))); Optional.ofNullable(options.get("end-snapshot-id")) diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java index 10fa4ecf1329..abcce11e3699 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java @@ -30,6 +30,7 @@ import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TestHelpers; @@ -211,6 +212,24 @@ public void testConsumeFromBeginning() throws Exception { result.getJobClient().ifPresent(JobClient::cancel); } + @Test + public void testConsumeFilesWithBranch() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + Row row1 = Row.of(1, "aaa", "2021-01-01"); + Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row1, row2); + + AssertHelpers.assertThrows( + "Cannot scan table using ref for stream yet", + IllegalArgumentException.class, + "Cannot scan table using ref", + () -> + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/", + TABLE)); + } + @Test public void testConsumeFromStartSnapshotId() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); @@ -234,7 +253,47 @@ public void testConsumeFromStartSnapshotId() throws Exception { + "'start-snapshot-id'='%d')*/", TABLE, startSnapshotId); try (CloseableIterator iterator = result.collect()) { - // The row2 in start snapshot will be excluded. + // the start snapshot(row2) is exclusive. + assertRows(ImmutableList.of(row3, row4), iterator); + + Row row5 = Row.of(5, "eee", "2021-01-01"); + Row row6 = Row.of(6, "fff", "2021-01-01"); + insertRows(table, row5, row6); + assertRows(ImmutableList.of(row5, row6), iterator); + + Row row7 = Row.of(7, "ggg", "2021-01-01"); + insertRows(table, row7); + assertRows(ImmutableList.of(row7), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @Test + public void testConsumeFromStartTag() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + // Produce two snapshots. + Row row1 = Row.of(1, "aaa", "2021-01-01"); + Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row1); + insertRows(table, row2); + + String tagName = "t1"; + long startSnapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createTag(tagName, startSnapshotId).commit(); + + Row row3 = Row.of(3, "ccc", "2021-01-01"); + Row row4 = Row.of(4, "ddd", "2021-01-01"); + insertRows(table, row3, row4); + + TableResult result = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', " + + "'start-tag'='%s')*/", + TABLE, tagName); + try (CloseableIterator iterator = result.collect()) { + // the start snapshot(row2) is exclusive. assertRows(ImmutableList.of(row3, row4), iterator); Row row5 = Row.of(5, "eee", "2021-01-01"); @@ -247,5 +306,15 @@ public void testConsumeFromStartSnapshotId() throws Exception { assertRows(ImmutableList.of(row7), iterator); } result.getJobClient().ifPresent(JobClient::cancel); + + AssertHelpers.assertThrows( + "START_SNAPSHOT_ID and START_TAG cannot both be set.", + IllegalArgumentException.class, + "START_SNAPSHOT_ID and START_TAG cannot both be set.", + () -> + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s', " + + "'start-snapshot-id'='%d' )*/", + TABLE, tagName, startSnapshotId)); } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 6f8789c92bc5..a161645979b7 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -161,6 +161,42 @@ public void testConsumeFromStartSnapshotId() throws Exception { } } + @Test + public void testConsumeFromStartTag() throws Exception { + // Commit the first five transactions. + generateRecordsAndCommitTxn(5); + long startSnapshotId = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + table.manageSnapshots().createTag(tagName, startSnapshotId).commit(); + + // Commit the next five transactions. + List> recordsList = generateRecordsAndCommitTxn(5); + + ScanContext scanContext = + ScanContext.builder().monitorInterval(Duration.ofMillis(100)).startTag(tagName).build(); + + StreamingMonitorFunction function = createFunction(scanContext); + try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(1); + TestSourceContext sourceContext = new TestSourceContext(latch); + runSourceFunctionInTask(sourceContext, function); + + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Thread.sleep(1000L); + + // Stop the stream task. + function.close(); + + Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); + TestHelpers.assertRecords( + sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); + } + } + @Test public void testCheckpointRestore() throws Exception { List> recordsList = generateRecordsAndCommitTxn(10); diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java index e2cacc2adf50..baef57a8e74b 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java @@ -39,6 +39,22 @@ public Long snapshotId() { return confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional(); } + public String tag() { + return confParser.stringConf().option(FlinkReadOptions.TAG.key()).parseOptional(); + } + + public String startTag() { + return confParser.stringConf().option(FlinkReadOptions.START_TAG.key()).parseOptional(); + } + + public String endTag() { + return confParser.stringConf().option(FlinkReadOptions.END_TAG.key()).parseOptional(); + } + + public String branch() { + return confParser.stringConf().option(FlinkReadOptions.BRANCH.key()).parseOptional(); + } + public boolean caseSensitive() { return confParser .booleanConf() diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java index 54f64dbfa8c9..d75b2234d797 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java @@ -32,6 +32,18 @@ private FlinkReadOptions() {} public static final ConfigOption SNAPSHOT_ID = ConfigOptions.key("snapshot-id").longType().defaultValue(null); + public static final ConfigOption TAG = + ConfigOptions.key("tag").stringType().defaultValue(null); + + public static final ConfigOption BRANCH = + ConfigOptions.key("branch").stringType().defaultValue(null); + + public static final ConfigOption START_TAG = + ConfigOptions.key("start-tag").stringType().defaultValue(null); + + public static final ConfigOption END_TAG = + ConfigOptions.key("end-tag").stringType().defaultValue(null); + public static final String CASE_SENSITIVE = "case-sensitive"; public static final ConfigOption CASE_SENSITIVE_OPTION = ConfigOptions.key(PREFIX + CASE_SENSITIVE).booleanType().defaultValue(false); diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index 35004bad386f..fa1656c55278 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -145,6 +145,16 @@ public Builder snapshotId(Long snapshotId) { return this; } + public Builder branch(String branch) { + readOptions.put(FlinkReadOptions.BRANCH.key(), branch); + return this; + } + + public Builder tag(String tag) { + readOptions.put(FlinkReadOptions.TAG.key(), tag); + return this; + } + public Builder startSnapshotId(Long startSnapshotId) { readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(startSnapshotId)); return this; @@ -155,6 +165,16 @@ public Builder endSnapshotId(Long endSnapshotId) { return this; } + public Builder startTag(String startTag) { + readOptions.put(FlinkReadOptions.START_TAG.key(), startTag); + return this; + } + + public Builder endTag(String endTag) { + readOptions.put(FlinkReadOptions.END_TAG.key(), endTag); + return this; + } + public Builder asOfTimestamp(Long asOfTimestamp) { readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(asOfTimestamp)); return this; diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index 3ff349dd8bd8..38a55e437d59 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -34,6 +34,7 @@ import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.Tasks; @@ -86,11 +87,31 @@ static CloseableIterable planTasks( IncrementalAppendScan scan = table.newIncrementalAppendScan(); scan = refineScanWithBaseConfigs(scan, context, workerPool); + if (context.startTag() != null) { + Preconditions.checkArgument( + table.snapshot(context.startTag()) != null, + "Cannot find snapshot with tag %s", + context.startTag()); + scan = scan.fromSnapshotExclusive(table.snapshot(context.startTag()).snapshotId()); + } + if (context.startSnapshotId() != null) { + Preconditions.checkArgument( + context.startTag() == null, "START_SNAPSHOT_ID and START_TAG cannot both be set"); scan = scan.fromSnapshotExclusive(context.startSnapshotId()); } + if (context.endTag() != null) { + Preconditions.checkArgument( + table.snapshot(context.endTag()) != null, + "Cannot find snapshot with tag %s", + context.endTag()); + scan = scan.toSnapshot(table.snapshot(context.endTag()).snapshotId()); + } + if (context.endSnapshotId() != null) { + Preconditions.checkArgument( + context.endTag() == null, "END_SNAPSHOT_ID and END_TAG cannot both be set"); scan = scan.toSnapshot(context.endSnapshotId()); } @@ -101,6 +122,10 @@ static CloseableIterable planTasks( if (context.snapshotId() != null) { scan = scan.useSnapshot(context.snapshotId()); + } else if (context.tag() != null) { + scan = scan.useRef(context.tag()); + } else if (context.branch() != null) { + scan = scan.useRef(context.branch()); } if (context.asOfTimestamp() != null) { @@ -119,7 +144,9 @@ private enum ScanMode { private static ScanMode checkScanMode(ScanContext context) { if (context.isStreaming() || context.startSnapshotId() != null - || context.endSnapshotId() != null) { + || context.endSnapshotId() != null + || context.startTag() != null + || context.endTag() != null) { return ScanMode.INCREMENTAL_APPEND_SCAN; } else { return ScanMode.BATCH; diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 4ed74676aafb..718460ae8c8e 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -278,6 +278,26 @@ public Builder startSnapshotId(Long newStartSnapshotId) { return this; } + public Builder tag(String tag) { + readOptions.put(FlinkReadOptions.TAG.key(), tag); + return this; + } + + public Builder branch(String branch) { + readOptions.put(FlinkReadOptions.BRANCH.key(), branch); + return this; + } + + public Builder startTag(String startTag) { + readOptions.put(FlinkReadOptions.START_TAG.key(), startTag); + return this; + } + + public Builder endTag(String endTag) { + readOptions.put(FlinkReadOptions.END_TAG.key(), endTag); + return this; + } + public Builder endSnapshotId(Long newEndSnapshotId) { if (newEndSnapshotId != null) { readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(newEndSnapshotId)); diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 02c4943fe90f..23f33e6d2ee4 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -42,11 +42,15 @@ public class ScanContext implements Serializable { private final boolean caseSensitive; private final boolean exposeLocality; private final Long snapshotId; + private final String branch; + private final String tag; private final StreamingStartingStrategy startingStrategy; private final Long startSnapshotId; private final Long startSnapshotTimestamp; private final Long endSnapshotId; private final Long asOfTimestamp; + private final String startTag; + private final String endTag; private final Long splitSize; private final Integer splitLookback; private final Long splitOpenFileCost; @@ -81,14 +85,22 @@ private ScanContext( boolean includeColumnStats, boolean exposeLocality, Integer planParallelism, - int maxPlanningSnapshotCount) { + int maxPlanningSnapshotCount, + String branch, + String tag, + String startTag, + String endTag) { this.caseSensitive = caseSensitive; this.snapshotId = snapshotId; + this.tag = tag; + this.branch = branch; this.startingStrategy = startingStrategy; this.startSnapshotTimestamp = startSnapshotTimestamp; this.startSnapshotId = startSnapshotId; this.endSnapshotId = endSnapshotId; this.asOfTimestamp = asOfTimestamp; + this.startTag = startTag; + this.endTag = endTag; this.splitSize = splitSize; this.splitLookback = splitLookback; this.splitOpenFileCost = splitOpenFileCost; @@ -125,7 +137,24 @@ private void validate() { startSnapshotId == null, "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); } + + Preconditions.checkArgument( + branch == null, + String.format( + "Cannot scan table using ref %s configured for streaming reader yet", branch)); + + Preconditions.checkArgument( + tag == null, + String.format("Cannot scan table using ref %s configured for streaming reader", tag)); } + + Preconditions.checkArgument( + !(startTag != null && startSnapshotId() != null), + "START_SNAPSHOT_ID and START_TAG cannot both be set."); + + Preconditions.checkArgument( + !(endTag != null && endSnapshotId() != null), + "END_SNAPSHOT_ID and END_TAG cannot both be set."); } public boolean caseSensitive() { @@ -136,6 +165,22 @@ public Long snapshotId() { return snapshotId; } + public String branch() { + return branch; + } + + public String tag() { + return tag; + } + + public String startTag() { + return startTag; + } + + public String endTag() { + return endTag; + } + public StreamingStartingStrategy streamingStartingStrategy() { return startingStrategy; } @@ -212,8 +257,12 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(null) + .useBranch(branch) + .useTag(null) .startSnapshotId(newStartSnapshotId) .endSnapshotId(newEndSnapshotId) + .startTag(null) + .endTag(null) .asOfTimestamp(null) .splitSize(splitSize) .splitLookback(splitLookback) @@ -235,8 +284,12 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) .useSnapshotId(newSnapshotId) + .useBranch(branch) + .useTag(tag) .startSnapshotId(null) .endSnapshotId(null) + .startTag(null) + .endTag(null) .asOfTimestamp(null) .splitSize(splitSize) .splitLookback(splitLookback) @@ -261,6 +314,10 @@ public static Builder builder() { public static class Builder { private boolean caseSensitive = FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue(); private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue(); + private String branch = FlinkReadOptions.BRANCH.defaultValue(); + private String tag = FlinkReadOptions.TAG.defaultValue(); + private String startTag = FlinkReadOptions.START_TAG.defaultValue(); + private String endTag = FlinkReadOptions.END_TAG.defaultValue(); private StreamingStartingStrategy startingStrategy = FlinkReadOptions.STARTING_STRATEGY_OPTION.defaultValue(); private Long startSnapshotTimestamp = FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.defaultValue(); @@ -297,6 +354,16 @@ public Builder useSnapshotId(Long newSnapshotId) { return this; } + public Builder useTag(String newTag) { + this.tag = newTag; + return this; + } + + public Builder useBranch(String newBranch) { + this.branch = newBranch; + return this; + } + public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) { this.startingStrategy = newStartingStrategy; return this; @@ -317,6 +384,16 @@ public Builder endSnapshotId(Long newEndSnapshotId) { return this; } + public Builder startTag(String newStartTag) { + this.startTag = newStartTag; + return this; + } + + public Builder endTag(String newEndTag) { + this.endTag = newEndTag; + return this; + } + public Builder asOfTimestamp(Long newAsOfTimestamp) { this.asOfTimestamp = newAsOfTimestamp; return this; @@ -392,6 +469,10 @@ public Builder resolveConfig( FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig); return this.useSnapshotId(flinkReadConf.snapshotId()) + .useTag(flinkReadConf.tag()) + .useBranch(flinkReadConf.branch()) + .startTag(flinkReadConf.startTag()) + .endTag(flinkReadConf.endTag()) .caseSensitive(flinkReadConf.caseSensitive()) .asOfTimestamp(flinkReadConf.asOfTimestamp()) .startingStrategy(flinkReadConf.startingStrategy()) @@ -431,7 +512,11 @@ public ScanContext build() { includeColumnStats, exposeLocality, planParallelism, - maxPlanningSnapshotCount); + maxPlanningSnapshotCount, + branch, + tag, + startTag, + endTag); } } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java index 75791c95bd4a..c27e29613fed 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java @@ -87,6 +87,8 @@ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext Preconditions.checkArgument( scanContext.endSnapshotId() == null, "Cannot set end-snapshot-id option for streaming reader"); + Preconditions.checkArgument( + scanContext.endTag() == null, "Cannot set end-tag option for streaming reader"); Preconditions.checkArgument( scanContext.maxPlanningSnapshotCount() > 0, "The max-planning-snapshot-count must be greater than zero"); @@ -124,17 +126,34 @@ public void initializeState(FunctionInitializationContext context) throws Except if (context.isRestored()) { LOG.info("Restoring state for the {}.", getClass().getSimpleName()); lastSnapshotId = lastSnapshotIdState.get().iterator().next(); - } else if (scanContext.startSnapshotId() != null) { + } else if (scanContext.startTag() != null || scanContext.startSnapshotId() != null) { + Preconditions.checkArgument( + !(scanContext.startTag() != null && scanContext.startSnapshotId() != null), + "START_SNAPSHOT_ID and START_TAG cannot both be set."); + Preconditions.checkArgument( + scanContext.branch() == null, + "Cannot scan table using ref %s configured for streaming reader yet."); Preconditions.checkNotNull( table.currentSnapshot(), "Don't have any available snapshot in table."); + long startSnapshotId; + if (scanContext.startTag() != null) { + Preconditions.checkArgument( + table.snapshot(scanContext.startTag()) != null, + "Cannot find snapshot with tag %s in table.", + scanContext.startTag()); + startSnapshotId = table.snapshot(scanContext.startTag()).snapshotId(); + } else { + startSnapshotId = scanContext.startSnapshotId(); + } + long currentSnapshotId = table.currentSnapshot().snapshotId(); Preconditions.checkState( - SnapshotUtil.isAncestorOf(table, currentSnapshotId, scanContext.startSnapshotId()), + SnapshotUtil.isAncestorOf(table, currentSnapshotId, startSnapshotId), "The option start-snapshot-id %s is not an ancestor of the current snapshot.", - scanContext.startSnapshotId()); + startSnapshotId); - lastSnapshotId = scanContext.startSnapshotId(); + lastSnapshotId = startSnapshotId; } } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index 5e4154490f71..a6cdc212b769 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -28,6 +28,7 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; @@ -229,6 +230,149 @@ public void testSnapshotReads() throws Exception { TestFixtures.SCHEMA); } + @Test + public void testTagReads() throws Exception { + Table table = + catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List expectedRecords1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(expectedRecords1); + long snapshotId = table.currentSnapshot().snapshotId(); + + table.manageSnapshots().createTag("t1", snapshotId).commit(); + + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords1, TestFixtures.SCHEMA); + + List expectedRecords2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(expectedRecords2); + snapshotId = table.currentSnapshot().snapshotId(); + + table.manageSnapshots().replaceTag("t1", snapshotId).commit(); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(expectedRecords1); + expectedRecords.addAll(expectedRecords2); + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords, TestFixtures.SCHEMA); + } + + @Test + public void testBranchReads() throws Exception { + Table table = + catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List expectedRecordsBase = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(expectedRecordsBase); + long snapshotId = table.currentSnapshot().snapshotId(); + + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName, snapshotId).commit(); + + List expectedRecordsForBranch = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(branchName, expectedRecordsForBranch); + + List expectedRecordsForMain = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(expectedRecordsForMain); + + List branchExpectedRecords = Lists.newArrayList(); + branchExpectedRecords.addAll(expectedRecordsBase); + branchExpectedRecords.addAll(expectedRecordsForBranch); + + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("branch", branchName)), + branchExpectedRecords, + TestFixtures.SCHEMA); + + List mainExpectedRecords = Lists.newArrayList(); + mainExpectedRecords.addAll(expectedRecordsBase); + mainExpectedRecords.addAll(expectedRecordsForMain); + + TestHelpers.assertRecords(run(), mainExpectedRecords, TestFixtures.SCHEMA); + } + + @Test + public void testIncrementalReadViaTag() throws Exception { + Table table = + catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List records1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L); + helper.appendToTable(records1); + long snapshotId1 = table.currentSnapshot().snapshotId(); + String startTag = "t1"; + table.manageSnapshots().createTag(startTag, snapshotId1).commit(); + + List records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 1L); + helper.appendToTable(records2); + + List records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 2L); + helper.appendToTable(records3); + long snapshotId3 = table.currentSnapshot().snapshotId(); + String endTag = "t2"; + table.manageSnapshots().createTag(endTag, snapshotId3).commit(); + + helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 3L)); + + List expected = Lists.newArrayList(); + expected.addAll(records2); + expected.addAll(records3); + + TestHelpers.assertRecords( + runWithOptions( + ImmutableMap.builder() + .put("start-tag", startTag) + .put("end-tag", endTag) + .buildOrThrow()), + expected, + TestFixtures.SCHEMA); + + TestHelpers.assertRecords( + runWithOptions( + ImmutableMap.builder() + .put("start-snapshot-id", Long.toString(snapshotId1)) + .put("end-tag", endTag) + .buildOrThrow()), + expected, + TestFixtures.SCHEMA); + + TestHelpers.assertRecords( + runWithOptions( + ImmutableMap.builder() + .put("start-tag", startTag) + .put("end-snapshot-id", Long.toString(snapshotId3)) + .buildOrThrow()), + expected, + TestFixtures.SCHEMA); + + AssertHelpers.assertThrows( + "START_SNAPSHOT_ID and START_TAG cannot both be set.", + Exception.class, + () -> + runWithOptions( + ImmutableMap.builder() + .put("start-tag", startTag) + .put("end-tag", endTag) + .put("start-snapshot-id", Long.toString(snapshotId1)) + .buildOrThrow())); + + AssertHelpers.assertThrows( + "END_SNAPSHOT_ID and END_TAG cannot both be set.", + Exception.class, + () -> + runWithOptions( + ImmutableMap.builder() + .put("start-tag", startTag) + .put("end-tag", endTag) + .put("end-snapshot-id", Long.toString(snapshotId3)) + .buildOrThrow())); + } + @Test public void testIncrementalRead() throws Exception { Table table = diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java index 64c84bbf5ce1..cebce61c0803 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java @@ -62,6 +62,10 @@ protected List runWithOptions(Map options) throws Exception FlinkSource.Builder builder = FlinkSource.forRowData(); Optional.ofNullable(options.get("snapshot-id")) .ifPresent(value -> builder.snapshotId(Long.parseLong(value))); + Optional.ofNullable(options.get("tag")).ifPresent(value -> builder.tag(value)); + Optional.ofNullable(options.get("branch")).ifPresent(value -> builder.branch(value)); + Optional.ofNullable(options.get("start-tag")).ifPresent(value -> builder.startTag(value)); + Optional.ofNullable(options.get("end-tag")).ifPresent(value -> builder.endTag(value)); Optional.ofNullable(options.get("start-snapshot-id")) .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value))); Optional.ofNullable(options.get("end-snapshot-id")) diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java index 10fa4ecf1329..abcce11e3699 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java @@ -30,6 +30,7 @@ import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TestHelpers; @@ -211,6 +212,24 @@ public void testConsumeFromBeginning() throws Exception { result.getJobClient().ifPresent(JobClient::cancel); } + @Test + public void testConsumeFilesWithBranch() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + Row row1 = Row.of(1, "aaa", "2021-01-01"); + Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row1, row2); + + AssertHelpers.assertThrows( + "Cannot scan table using ref for stream yet", + IllegalArgumentException.class, + "Cannot scan table using ref", + () -> + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/", + TABLE)); + } + @Test public void testConsumeFromStartSnapshotId() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); @@ -234,7 +253,47 @@ public void testConsumeFromStartSnapshotId() throws Exception { + "'start-snapshot-id'='%d')*/", TABLE, startSnapshotId); try (CloseableIterator iterator = result.collect()) { - // The row2 in start snapshot will be excluded. + // the start snapshot(row2) is exclusive. + assertRows(ImmutableList.of(row3, row4), iterator); + + Row row5 = Row.of(5, "eee", "2021-01-01"); + Row row6 = Row.of(6, "fff", "2021-01-01"); + insertRows(table, row5, row6); + assertRows(ImmutableList.of(row5, row6), iterator); + + Row row7 = Row.of(7, "ggg", "2021-01-01"); + insertRows(table, row7); + assertRows(ImmutableList.of(row7), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @Test + public void testConsumeFromStartTag() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + // Produce two snapshots. + Row row1 = Row.of(1, "aaa", "2021-01-01"); + Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row1); + insertRows(table, row2); + + String tagName = "t1"; + long startSnapshotId = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createTag(tagName, startSnapshotId).commit(); + + Row row3 = Row.of(3, "ccc", "2021-01-01"); + Row row4 = Row.of(4, "ddd", "2021-01-01"); + insertRows(table, row3, row4); + + TableResult result = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', " + + "'start-tag'='%s')*/", + TABLE, tagName); + try (CloseableIterator iterator = result.collect()) { + // the start snapshot(row2) is exclusive. assertRows(ImmutableList.of(row3, row4), iterator); Row row5 = Row.of(5, "eee", "2021-01-01"); @@ -247,5 +306,15 @@ public void testConsumeFromStartSnapshotId() throws Exception { assertRows(ImmutableList.of(row7), iterator); } result.getJobClient().ifPresent(JobClient::cancel); + + AssertHelpers.assertThrows( + "START_SNAPSHOT_ID and START_TAG cannot both be set.", + IllegalArgumentException.class, + "START_SNAPSHOT_ID and START_TAG cannot both be set.", + () -> + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s', " + + "'start-snapshot-id'='%d' )*/", + TABLE, tagName, startSnapshotId)); } } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java index 6f8789c92bc5..a161645979b7 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java @@ -161,6 +161,42 @@ public void testConsumeFromStartSnapshotId() throws Exception { } } + @Test + public void testConsumeFromStartTag() throws Exception { + // Commit the first five transactions. + generateRecordsAndCommitTxn(5); + long startSnapshotId = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + table.manageSnapshots().createTag(tagName, startSnapshotId).commit(); + + // Commit the next five transactions. + List> recordsList = generateRecordsAndCommitTxn(5); + + ScanContext scanContext = + ScanContext.builder().monitorInterval(Duration.ofMillis(100)).startTag(tagName).build(); + + StreamingMonitorFunction function = createFunction(scanContext); + try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(1); + TestSourceContext sourceContext = new TestSourceContext(latch); + runSourceFunctionInTask(sourceContext, function); + + Assert.assertTrue( + "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); + Thread.sleep(1000L); + + // Stop the stream task. + function.close(); + + Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size()); + TestHelpers.assertRecords( + sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA); + } + } + @Test public void testCheckpointRestore() throws Exception { List> recordsList = generateRecordsAndCommitTxn(10);