From 294d489efa570832a85c780da4ce02c83749f496 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Tue, 29 Mar 2022 21:20:02 -0700 Subject: [PATCH 1/2] Core, API: Add getting refs and snapshot by ref to the Table API --- .../main/java/org/apache/iceberg/Table.java | 21 +++++++++++++++++++ .../org/apache/iceberg/BaseMetadataTable.java | 5 +++++ .../java/org/apache/iceberg/BaseTable.java | 5 +++++ .../org/apache/iceberg/BaseTransaction.java | 5 +++++ .../org/apache/iceberg/SerializableTable.java | 7 +++++++ 5 files changed, 43 insertions(+) diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 8278c99bfc2d..7964bd22c076 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -305,4 +305,25 @@ default AppendFiles newFastAppend() { /** Returns a {@link LocationProvider} to provide locations for new data files. */ LocationProvider locationProvider(); + + /** + * Returns the current refs for the table + * + * @return the current refs for the table + */ + Map refs(); + + /** + * Returns the snapshot referenced by the given name or null if no such reference exists. + * + * @return the snapshot which is referenced by the given name or null if no such reference exists. + */ + default Snapshot snapshot(String name) { + SnapshotRef ref = refs().get(name); + if (ref != null) { + return snapshot(ref.snapshotId()); + } + + return null; + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index b065ade32bcb..c6615862de57 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -158,6 +158,11 @@ public List history() { return table().history(); } + @Override + public Map refs() { + return table().refs(); + } + @Override public UpdateSchema updateSchema() { throw new UnsupportedOperationException("Cannot update the schema of a metadata table"); diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 00842f6d77c6..4578b676055d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -228,6 +228,11 @@ public LocationProvider locationProvider() { return operations().locationProvider(); } + @Override + public Map refs() { + return ops.current().refs(); + } + @Override public String toString() { return name(); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 38dfa0aaf3ee..b162201cf567 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -747,6 +747,11 @@ public LocationProvider locationProvider() { return transactionOps.locationProvider(); } + @Override + public Map refs() { + return current.refs(); + } + @Override public String toString() { return name(); diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 37d7453c033b..ddffdae14edd 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -61,6 +61,7 @@ public class SerializableTable implements Table, Serializable { private final FileIO io; private final EncryptionManager encryption; private final LocationProvider locationProvider; + private final Map refs; private transient volatile Table lazyTable = null; private transient volatile Schema lazySchema = null; @@ -81,6 +82,7 @@ protected SerializableTable(Table table) { this.io = fileIO(table); this.encryption = table.encryption(); this.locationProvider = table.locationProvider(); + this.refs = table.refs(); } /** @@ -235,6 +237,11 @@ public LocationProvider locationProvider() { return locationProvider; } + @Override + public Map refs() { + return refs; + } + @Override public void refresh() { throw new UnsupportedOperationException(errorMsg("refresh")); From e413a7ab4b39e88b1f6bdedbc5da94cea91f32a5 Mon Sep 17 00:00:00 2001 From: liliwei Date: Tue, 26 Jul 2022 19:14:57 +0800 Subject: [PATCH 2/2] Spark3.2: Spark SQL Read from branch\tag --- .../IcebergSqlExtensions.g4 | 7 +- .../IcebergSparkSqlExtensionsParser.scala | 4 +- .../IcebergSqlExtensionsAstBuilder.scala | 14 ++ .../sql/catalyst/plans/logical/UseRef.scala | 30 ++++ .../v2/ExtendedDataSourceV2Strategy.scala | 4 + .../execution/datasources/v2/UseRefExec.scala | 40 +++++ .../spark/extensions/TestSnapshotRefSQL.java | 154 ++++++++++++++++++ .../apache/iceberg/spark/SparkCatalog.java | 39 ++++- .../apache/iceberg/spark/SparkReadConf.java | 9 + .../iceberg/spark/SparkReadOptions.java | 2 + .../iceberg/spark/SparkSQLProperties.java | 6 + .../org/apache/iceberg/spark/SparkUtil.java | 14 ++ .../spark/source/SparkBatchQueryScan.java | 9 +- .../spark/source/SparkScanBuilder.java | 10 +- 14 files changed, 337 insertions(+), 5 deletions(-) create mode 100644 spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UseRef.scala create mode 100644 spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UseRefExec.scala create mode 100644 spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotRefSQL.java diff --git a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index 6a21e79c2803..5df685f70bc3 100644 --- a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -73,6 +73,8 @@ statement | ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields + | USE BRANCH identifier #useBranch + | USE TAG identifier #useTag ; writeSpec @@ -170,7 +172,7 @@ fieldList nonReserved : ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE | DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET - | TRUE | FALSE + | TRUE | FALSE | USE | MAP ; @@ -178,6 +180,7 @@ ADD: 'ADD'; ALTER: 'ALTER'; AS: 'AS'; ASC: 'ASC'; +BRANCH: 'BRANCH'; BY: 'BY'; CALL: 'CALL'; DESC: 'DESC'; @@ -195,7 +198,9 @@ REPLACE: 'REPLACE'; IDENTIFIER_KW: 'IDENTIFIER'; SET: 'SET'; TABLE: 'TABLE'; +TAG: 'TAG'; UNORDERED: 'UNORDERED'; +USE: 'USE'; WITH: 'WITH'; WRITE: 'WRITE'; diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 0af1caf43ae7..8dbd5967cf5f 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -195,7 +195,9 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI // comments that span multiple lines are caught. .replaceAll("/\\*.*?\\*/", " ") .trim() - normalized.startsWith("call") || ( + normalized.startsWith("call") || + normalized.startsWith("use branch") || + normalized.startsWith("use tag") || ( normalized.startsWith("alter table") && ( normalized.contains("add partition field") || normalized.contains("drop partition field") || diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 2e2e15ed66a6..525a02a0a29e 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.UseRef import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.expressions @@ -90,6 +91,19 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS typedVisit[Transform](ctx.transform)) } + /** + * Create an USE BRANCH logical command. + */ + override def visitUseBranch(ctx: UseBranchContext): UseRef = withOrigin(ctx) { + UseRef(ctx.identifier().getText) + } + + /** + * Create an USE TAG logical command. + */ + override def visitUseTag(ctx: UseTagContext): UseRef = withOrigin(ctx) { + UseRef(ctx.identifier().getText) + } /** * Create an REPLACE PARTITION FIELD logical command. diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UseRef.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UseRef.scala new file mode 100644 index 000000000000..573d89d2a073 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UseRef.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class UseRef(indent: String) extends LeafCommand { + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"UseRef $indent" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index b5eb3e47b7cc..c0f5f3698ac0 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.UseRef import org.apache.spark.sql.catalyst.plans.logical.WriteDelta import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog @@ -58,6 +59,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi val input = buildInternalRow(args) CallExec(c.output, procedure, input) :: Nil + case UseRef(ident) => + UseRefExec(ident, spark) :: Nil + case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform, name) => AddPartitionFieldExec(catalog, ident, transform, name) :: Nil diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UseRefExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UseRefExec.scala new file mode 100644 index 000000000000..59b88c38318f --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UseRefExec.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.SparkSQLProperties +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class UseRefExec(ident: String, spark: SparkSession) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + spark.conf.set(SparkSQLProperties.SNAPSHOT_REF, ident) + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"UseRef (${ident})"; + } +} diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotRefSQL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotRefSQL.java new file mode 100644 index 000000000000..f0b1eab04c0e --- /dev/null +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotRefSQL.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.junit.After; +import org.junit.Assume; +import org.junit.Test; + +public class TestSnapshotRefSQL extends SparkExtensionsTestBase { + public TestSnapshotRefSQL(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testUseBranch() throws NoSuchTableException { + Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive")); + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append(); + + Table table = validationCatalog.loadTable(tableIdent); + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit(); + + spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append(); + + String prefix = "at_branch_"; + + // read the table at the branch + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "a"), row(2, "b")), + sql("SELECT * FROM %s.%s order by id", tableName, prefix + branchName)); + + String mainBranch = "main"; + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "a"), row(1, "a"), row(2, "b"), row(2, "b")), + sql("SELECT * FROM %s.%s order by id", tableName, prefix + mainBranch)); + + String branchNotExist = "b2"; + AssertHelpers.assertThrows( + "Cant not use a ref that doest not exist!", + IllegalArgumentException.class, + "Snapshot ref does not exist: b2", + () -> sql("SELECT * FROM %s.%s order by id", tableName, prefix + branchNotExist)); + + sql("USE BRANCH %s", branchName); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "a"), row(2, "b")), + sql("SELECT * FROM %s order by id", tableName)); + + sql("USE BRANCH %s", mainBranch); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "a"), row(1, "a"), row(2, "b"), row(2, "b")), + sql("SELECT * FROM %s order by id", tableName)); + + sql("USE BRANCH %s", branchNotExist); + AssertHelpers.assertThrows( + "Cant not use a ref that doest not exist!", + IllegalArgumentException.class, + "Snapshot ref does not exist: b2", + () -> sql("SELECT * FROM %s order by id", tableName)); + } + + @Test + public void testUseTag() throws NoSuchTableException { + Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive")); + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append(); + Table table = validationCatalog.loadTable(tableIdent); + String tagName = "t1"; + table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit(); + + spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append(); + + String prefix = "at_tag_"; + + // read the table at the branch + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "a"), row(2, "b")), + sql("SELECT * FROM %s.%s", tableName, prefix + tagName)); + + String tagNotExist = "b2"; + AssertHelpers.assertThrows( + "Cant not use a ref that doest not exist!", + IllegalArgumentException.class, + "Snapshot ref does not exist: b2", + () -> sql("SELECT * FROM %s.%s", tableName, prefix + tagNotExist)); + + sql("USE Tag %s", tagName); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "a"), row(2, "b")), + sql("SELECT * FROM %s ", tableName)); + + sql("USE Tag %s", tagNotExist); + AssertHelpers.assertThrows( + "Cant not use a ref that doest not exist!", + IllegalArgumentException.class, + "Snapshot ref does not exist: b2", + () -> sql("SELECT * FROM %s ", tableName)); + } + + private Table createDefaultTableAndInsert2Row() throws NoSuchTableException { + Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive")); + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + Table table = validationCatalog.loadTable(tableIdent); + return table; + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index bad2aca031c8..9052736feca0 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -36,6 +36,7 @@ import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; @@ -101,6 +102,7 @@ public class SparkCatalog extends BaseCatalog { private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); private static final Splitter COMMA = Splitter.on(","); private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); + private static final Pattern AT_REF = Pattern.compile("at_(branch|tag|ref)_(\\w+)"); private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); private String catalogName = null; @@ -617,6 +619,15 @@ private Pair load(Identifier ident) { return Pair.of(table, snapshotId); } + Matcher atRef = AT_REF.matcher(ident.name()); + if (atRef.matches()) { + String ref = atRef.group(2); + Long snapshotId = getSnapshotId(ref, table); + if (snapshotId != null) { + return Pair.of(table, snapshotId); + } + } + // the name wasn't a valid snapshot selector. throw the original exception throw e; } @@ -639,6 +650,7 @@ private Pair loadFromPathIdentifier(PathIdentifier ident) { String metadataTableName = null; Long asOfTimestamp = null; Long snapshotId = null; + String ref = null; for (String meta : parsed.second()) { if (MetadataTableType.from(meta) != null) { metadataTableName = meta; @@ -651,6 +663,12 @@ private Pair loadFromPathIdentifier(PathIdentifier ident) { continue; } + Matcher atRef = AT_REF.matcher(meta); + if (atRef.matches()) { + ref = atRef.group(1); + continue; + } + Matcher id = SNAPSHOT_ID.matcher(meta); if (id.matches()) { snapshotId = Long.parseLong(id.group(1)); @@ -661,6 +679,10 @@ private Pair loadFromPathIdentifier(PathIdentifier ident) { asOfTimestamp == null || snapshotId == null, "Cannot specify both snapshot-id and as-of-timestamp: %s", ident.location()); + Preconditions.checkArgument( + ref == null || snapshotId == null, + "Cannot specify both at_(branch|tag|ref) and as-of-timestamp: %s", + ident.location()); Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : "")); @@ -670,8 +692,23 @@ private Pair loadFromPathIdentifier(PathIdentifier ident) { } else if (asOfTimestamp != null) { return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); } else { - return Pair.of(table, null); + return Pair.of(table, getSnapshotId(ref, table)); + } + } + + private Long getSnapshotId(String ref, Table table) { + if (ref != null) { + if (ref.equalsIgnoreCase(SnapshotRef.MAIN_BRANCH)) { + return table.currentSnapshot().snapshotId(); + } + SnapshotRef snapshotRef = table.refs().get(ref); + if (snapshotRef != null) { + return snapshotRef.snapshotId(); + } else { + throw new IllegalArgumentException("Snapshot ref does not exist: " + ref); + } } + return null; } private Identifier namespaceToIdentifier(String[] namespace) { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index ef262e11f02b..02dee98144b1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -220,4 +220,13 @@ public Long streamFromTimestamp() { .defaultValue(Long.MIN_VALUE) .parse(); } + + public String snapshotRef() { + return confParser + .stringConf() + .option(SparkReadOptions.SNAPSHOT_REF) + .sessionConf(SparkSQLProperties.SNAPSHOT_REF) + .defaultValue(SparkSQLProperties.SNAPSHOT_REF_DEFAULT) + .parse(); + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 9515a48bc297..c9f2f44eb7ac 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -78,4 +78,6 @@ private SparkReadOptions() {} public static final String VERSION_AS_OF = "versionAsOf"; public static final String TIMESTAMP_AS_OF = "timestampAsOf"; + + public static final String SNAPSHOT_REF = "snapshot-ref"; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index fa8bd719f391..25b7c835574f 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark; +import org.apache.iceberg.SnapshotRef; + public class SparkSQLProperties { private SparkSQLProperties() {} @@ -42,4 +44,8 @@ private SparkSQLProperties() {} // Controls whether to check the order of fields during writes public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering"; public static final boolean CHECK_ORDERING_DEFAULT = true; + + // Controls which ref to use + public static final String SNAPSHOT_REF = "spark.sql.iceberg.snapshot-ref"; + public static final String SNAPSHOT_REF_DEFAULT = SnapshotRef.MAIN_BRANCH; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java index 950ed7bc87b8..62e1815d3395 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java @@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.io.FileIO; @@ -212,6 +213,19 @@ private static String hadoopConfPrefixForCatalog(String catalogName) { return String.format(SPARK_CATALOG_HADOOP_CONF_OVERRIDE_FMT_STR, catalogName); } + public static Long getSnapshotIdFromRef(String snapshotRef, Table table) { + if (snapshotRef != null + && !snapshotRef.equalsIgnoreCase(SnapshotRef.MAIN_BRANCH)) { + SnapshotRef ref = table.refs().get(snapshotRef); + if (ref == null) { + throw new IllegalArgumentException("Snapshot ref does not exist: " + snapshotRef); + } + + return ref.snapshotId(); + } + return null; + } + /** * Get a List of Spark filter Expression. * diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 59dd8759968f..68ac11464a91 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -48,6 +48,7 @@ import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; @@ -84,12 +85,18 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering super(spark, table, readConf, expectedSchema, filters); this.scan = scan; - this.snapshotId = readConf.snapshotId(); this.startSnapshotId = readConf.startSnapshotId(); this.endSnapshotId = readConf.endSnapshotId(); this.asOfTimestamp = readConf.asOfTimestamp(); this.runtimeFilterExpressions = Lists.newArrayList(); + String snapshotRef = readConf.snapshotRef(); + if (readConf.snapshotId() == null) { + this.snapshotId = SparkUtil.getSnapshotIdFromRef(snapshotRef, table); + } else { + this.snapshotId = readConf.snapshotId(); + } + if (scan == null) { this.specIds = Collections.emptySet(); this.files = Collections.emptyList(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 21c34ed6f628..024f966909d5 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -24,6 +24,7 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; @@ -38,6 +39,7 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.SparkSession; @@ -182,6 +184,7 @@ private Schema schemaWithMetadataColumns() { public Scan build() { Long snapshotId = readConf.snapshotId(); Long asOfTimestamp = readConf.asOfTimestamp(); + String snapshotRef = readConf.snapshotRef(); Preconditions.checkArgument( snapshotId == null || asOfTimestamp == null, @@ -192,6 +195,10 @@ public Scan build() { Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); + if (snapshotId == null) { + snapshotId = SparkUtil.getSnapshotIdFromRef(snapshotRef, table); + } + if (snapshotId != null || asOfTimestamp != null) { Preconditions.checkArgument( startSnapshotId == null && endSnapshotId == null, @@ -240,7 +247,8 @@ public Scan build() { public Scan buildMergeOnReadScan() { Preconditions.checkArgument( - readConf.snapshotId() == null && readConf.asOfTimestamp() == null, + readConf.snapshotId() == null && readConf.snapshotRef().equalsIgnoreCase(SnapshotRef.MAIN_BRANCH) && + readConf.asOfTimestamp() == null, "Cannot set time travel options %s and %s for row-level command scans", SparkReadOptions.SNAPSHOT_ID, SparkReadOptions.AS_OF_TIMESTAMP);