diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java index 923c54981199..2533b3bd75b5 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java @@ -20,9 +20,12 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Stream; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -46,6 +49,8 @@ public class SparkCachedTableCatalog implements TableCatalog { private static final Splitter COMMA = Splitter.on(","); private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); + private static final Pattern BRANCH = Pattern.compile("branch_(.*)"); + private static final Pattern TAG = Pattern.compile("tag_(.*)"); private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); @@ -133,6 +138,8 @@ private Pair load(Identifier ident) throws NoSuchTableException { Long asOfTimestamp = null; Long snapshotId = null; + String branch = null; + String tag = null; for (String meta : metadata) { Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta); if (timeBasedMatcher.matches()) { @@ -143,13 +150,28 @@ private Pair load(Identifier ident) throws NoSuchTableException { Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta); if (snapshotBasedMatcher.matches()) { snapshotId = Long.parseLong(snapshotBasedMatcher.group(1)); + continue; + } + + Matcher branchBasedMatcher = BRANCH.matcher(meta); + if (branchBasedMatcher.matches()) { + branch = branchBasedMatcher.group(1); + continue; + } + + Matcher tagBasedMatcher = TAG.matcher(meta); + if (tagBasedMatcher.matches()) { + tag = tagBasedMatcher.group(1); } } Preconditions.checkArgument( - asOfTimestamp == null || snapshotId == null, - "Cannot specify both snapshot and timestamp for time travel: %s", - ident); + Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1, + "Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s)", + snapshotId, + asOfTimestamp, + branch, + tag); Table table = TABLE_CACHE.get(key); @@ -161,6 +183,16 @@ private Pair load(Identifier ident) throws NoSuchTableException { return Pair.of(table, snapshotId); } else if (asOfTimestamp != null) { return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + } else if (branch != null) { + Snapshot branchSnapshot = table.snapshot(branch); + Preconditions.checkArgument( + branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch); + return Pair.of(table, branchSnapshot.snapshotId()); + } else if (tag != null) { + Snapshot tagSnapshot = table.snapshot(tag); + Preconditions.checkArgument( + tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag); + return Pair.of(table, tagSnapshot.snapshotId()); } else { return Pair.of(table, null); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 9714ec3f4a88..258f8420f1e4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -24,11 +24,13 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.CatalogProperties; @@ -37,6 +39,7 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; @@ -105,6 +108,8 @@ public class SparkCatalog extends BaseCatalog { private static final Splitter COMMA = Splitter.on(","); private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); + private static final Pattern BRANCH = Pattern.compile("branch_(.*)"); + private static final Pattern TAG = Pattern.compile("tag_(.*)"); private String catalogName = null; private Catalog icebergCatalog = null; @@ -654,6 +659,22 @@ private Table load(Identifier ident) { return new SparkTable(table, snapshotId, !cacheEnabled); } + Matcher branch = BRANCH.matcher(ident.name()); + if (branch.matches()) { + Snapshot branchSnapshot = table.snapshot(branch.group(1)); + if (branchSnapshot != null) { + return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled); + } + } + + Matcher tag = TAG.matcher(ident.name()); + if (tag.matches()) { + Snapshot tagSnapshot = table.snapshot(tag.group(1)); + if (tagSnapshot != null) { + return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled); + } + } + // the name wasn't a valid snapshot selector and did not point to the changelog // throw the original exception throw e; @@ -678,6 +699,8 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { String metadataTableName = null; Long asOfTimestamp = null; Long snapshotId = null; + String branch = null; + String tag = null; boolean isChangelog = false; for (String meta : parsed.second()) { @@ -700,13 +723,28 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { Matcher id = SNAPSHOT_ID.matcher(meta); if (id.matches()) { snapshotId = Long.parseLong(id.group(1)); + continue; + } + + Matcher branchRef = BRANCH.matcher(meta); + if (branchRef.matches()) { + branch = branchRef.group(1); + continue; + } + + Matcher tagRef = TAG.matcher(meta); + if (tagRef.matches()) { + tag = tagRef.group(1); } } Preconditions.checkArgument( - asOfTimestamp == null || snapshotId == null, - "Cannot specify both snapshot-id and as-of-timestamp: %s", - ident.location()); + Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1, + "Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s)", + snapshotId, + asOfTimestamp, + branch, + tag); Preconditions.checkArgument( !isChangelog || (snapshotId == null && asOfTimestamp == null), @@ -722,6 +760,18 @@ private Table loadFromPathIdentifier(PathIdentifier ident) { long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp); return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled); + } else if (branch != null) { + Snapshot branchSnapshot = table.snapshot(branch); + Preconditions.checkArgument( + branchSnapshot != null, "Cannot find snapshot associated with branch name: %s", branch); + return new SparkTable(table, branchSnapshot.snapshotId(), !cacheEnabled); + + } else if (tag != null) { + Snapshot tagSnapshot = table.snapshot(tag); + Preconditions.checkArgument( + tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", tag); + return new SparkTable(table, tagSnapshot.snapshotId(), !cacheEnabled); + } else { return new SparkTable(table, snapshotId, !cacheEnabled); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 4a03d3a5b5ab..8975c7f32db1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -20,7 +20,9 @@ import java.util.Arrays; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Stream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.PathIdentifier; @@ -67,6 +69,8 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions "spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME; private static final String AT_TIMESTAMP = "at_timestamp_"; private static final String SNAPSHOT_ID = "snapshot_id_"; + private static final String BRANCH_PREFIX = "branch_"; + private static final String TAG_PREFIX = "tag_"; private static final String[] EMPTY_NAMESPACE = new String[0]; private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); @@ -124,11 +128,15 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID); Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP); + String branch = options.get(SparkReadOptions.BRANCH); + String tag = options.get(SparkReadOptions.TAG); Preconditions.checkArgument( - asOfTimestamp == null || snapshotId == null, - "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", + Stream.of(snapshotId, asOfTimestamp, branch, tag).filter(Objects::nonNull).count() <= 1, + "Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s)", snapshotId, - asOfTimestamp); + asOfTimestamp, + branch, + tag); String selector = null; @@ -140,6 +148,14 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri selector = AT_TIMESTAMP + asOfTimestamp; } + if (branch != null) { + selector = BRANCH_PREFIX + branch; + } + + if (tag != null) { + selector = TAG_PREFIX + tag; + } + CatalogManager catalogManager = spark.sessionState().catalogManager(); if (TABLE_CACHE.contains(path)) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 0208121f0027..da9160ffba7e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -368,6 +368,8 @@ private static CaseInsensitiveStringMap addSnapshotId( scanOptions.putAll(options.asCaseSensitiveMap()); scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value); scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP); + scanOptions.remove(SparkReadOptions.BRANCH); + scanOptions.remove(SparkReadOptions.TAG); return new CaseInsensitiveStringMap(scanOptions); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 0b7348fa078a..276fbcd592ae 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -223,8 +223,10 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) .load(tableLocation)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot specify both snapshot-id") - .hasMessageContaining("and as-of-timestamp"); + .hasMessageContaining("Can specify only one of snapshot-id") + .hasMessageContaining("as-of-timestamp") + .hasMessageContaining("branch") + .hasMessageContaining("tag"); } @Test @@ -325,7 +327,7 @@ public void testSnapshotSelectionByBranchAndTagFails() throws IOException { .load(tableLocation) .show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + .hasMessageStartingWith("Can specify only one of snapshot-id"); } @Test @@ -356,7 +358,7 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep .load(tableLocation) .show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + .hasMessageStartingWith("Can specify only one of snapshot-id"); Assertions.assertThatThrownBy( () -> @@ -368,6 +370,88 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep .load(tableLocation) .show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + .hasMessageStartingWith("Can specify only one of snapshot-id"); + } + + @Test + public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + + Dataset branchSnapshotResult = + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + List branchSnapshotRecords = + branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, branchSnapshotRecords); + + // Deleting a column to indicate schema change + table.updateSchema().deleteColumn("data").commit(); + + // The data should have the deleted column as it was captured in an earlier snapshot. + Dataset deletedColumnBranchSnapshotResult = + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + List deletedColumnBranchSnapshotRecords = + deletedColumnBranchSnapshotResult + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords); + } + + @Test + public void testSnapshotSelectionByTagWithSchemaChange() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + + Dataset tagSnapshotResult = + spark.read().format("iceberg").option("tag", "tag").load(tableLocation); + List tagSnapshotRecords = + tagSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + Assert.assertEquals("Current snapshot rows should match", expectedRecords, tagSnapshotRecords); + + // Deleting a column to indicate schema change + table.updateSchema().deleteColumn("data").commit(); + + // The data should have the deleted column as it was captured in an earlier snapshot. + Dataset deletedColumnTagSnapshotResult = + spark.read().format("iceberg").option("tag", "tag").load(tableLocation); + List deletedColumnTagSnapshotRecords = + deletedColumnTagSnapshotResult + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, deletedColumnTagSnapshotRecords); } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 9f074c6f9b61..e08bc4574dbf 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -441,7 +441,8 @@ public void testSpecifySnapshotAndTimestamp() { "Should not be able to specify both snapshot id and timestamp", IllegalArgumentException.class, String.format( - "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, timestamp), + "Can specify only one of snapshot-id (%s), as-of-timestamp (%s)", + snapshotId, timestamp), () -> { spark .read()