Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,25 @@ default AppendFiles newFastAppend() {

/** Returns a {@link LocationProvider} to provide locations for new data files. */
LocationProvider locationProvider();

/**
* Returns the current refs for the table
*
* @return the current refs for the table
*/
Map<String, SnapshotRef> refs();

/**
* Returns the snapshot referenced by the given name or null if no such reference exists.
*
* @return the snapshot which is referenced by the given name or null if no such reference exists.
*/
default Snapshot snapshot(String name) {
SnapshotRef ref = refs().get(name);
if (ref != null) {
return snapshot(ref.snapshotId());
}

return null;
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ public List<HistoryEntry> history() {
return table().history();
}

@Override
public Map<String, SnapshotRef> refs() {
return table().refs();
}

@Override
public UpdateSchema updateSchema() {
throw new UnsupportedOperationException("Cannot update the schema of a metadata table");
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ public LocationProvider locationProvider() {
return operations().locationProvider();
}

@Override
public Map<String, SnapshotRef> refs() {
return ops.current().refs();
}

@Override
public String toString() {
return name();
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ public LocationProvider locationProvider() {
return transactionOps.locationProvider();
}

@Override
public Map<String, SnapshotRef> refs() {
return current.refs();
}

@Override
public String toString() {
return name();
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class SerializableTable implements Table, Serializable {
private final FileIO io;
private final EncryptionManager encryption;
private final LocationProvider locationProvider;
private final Map<String, SnapshotRef> refs;

private transient volatile Table lazyTable = null;
private transient volatile Schema lazySchema = null;
Expand All @@ -81,6 +82,7 @@ protected SerializableTable(Table table) {
this.io = fileIO(table);
this.encryption = table.encryption();
this.locationProvider = table.locationProvider();
this.refs = table.refs();
}

/**
Expand Down Expand Up @@ -235,6 +237,11 @@ public LocationProvider locationProvider() {
return locationProvider;
}

@Override
public Map<String, SnapshotRef> refs() {
return refs;
}

@Override
public void refresh() {
throw new UnsupportedOperationException(errorMsg("refresh"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ statement
| ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering
| ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields
| ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields
| USE BRANCH identifier #useBranch
| USE TAG identifier #useTag
;

writeSpec
Expand Down Expand Up @@ -170,14 +172,15 @@ fieldList
nonReserved
: ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE
| DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET
| TRUE | FALSE
| TRUE | FALSE | USE
| MAP
;

ADD: 'ADD';
ALTER: 'ALTER';
AS: 'AS';
ASC: 'ASC';
BRANCH: 'BRANCH';
BY: 'BY';
CALL: 'CALL';
DESC: 'DESC';
Expand All @@ -195,7 +198,9 @@ REPLACE: 'REPLACE';
IDENTIFIER_KW: 'IDENTIFIER';
SET: 'SET';
TABLE: 'TABLE';
TAG: 'TAG';
UNORDERED: 'UNORDERED';
USE: 'USE';
WITH: 'WITH';
WRITE: 'WRITE';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
// comments that span multiple lines are caught.
.replaceAll("/\\*.*?\\*/", " ")
.trim()
normalized.startsWith("call") || (
normalized.startsWith("call") ||
normalized.startsWith("use branch") ||
normalized.startsWith("use tag") || (
normalized.startsWith("alter table") && (
normalized.contains("add partition field") ||
normalized.contains("drop partition field") ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.UseRef
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.connector.expressions
Expand Down Expand Up @@ -90,6 +91,19 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
typedVisit[Transform](ctx.transform))
}

/**
* Create an USE BRANCH logical command.
*/
override def visitUseBranch(ctx: UseBranchContext): UseRef = withOrigin(ctx) {
UseRef(ctx.identifier().getText)
}

/**
* Create an USE TAG logical command.
*/
override def visitUseTag(ctx: UseTagContext): UseRef = withOrigin(ctx) {
UseRef(ctx.identifier().getText)
}

/**
* Create an REPLACE PARTITION FIELD logical command.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute

case class UseRef(indent: String) extends LeafCommand {
override lazy val output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"UseRef $indent"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.UseRef
import org.apache.spark.sql.catalyst.plans.logical.WriteDelta
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
Expand All @@ -58,6 +59,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
val input = buildInternalRow(args)
CallExec(c.output, procedure, input) :: Nil

case UseRef(ident) =>
UseRefExec(ident, spark) :: Nil

case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform, name) =>
AddPartitionFieldExec(catalog, ident, transform, name) :: Nil

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.SparkSQLProperties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute

case class UseRefExec(ident: String, spark: SparkSession) extends LeafV2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
spark.conf.set(SparkSQLProperties.SNAPSHOT_REF, ident)

Nil
}

override def simpleString(maxFields: Int): String = {
s"UseRef (${ident})";
}
}
Loading