Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,24 @@ statement
| ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields
| ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch
| ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier #dropBranch
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DROP TAG (IF EXISTS)? identifier #dropTag
;

createReplaceBranchClause
: (CREATE OR)? REPLACE BRANCH identifier branchOptions
| CREATE BRANCH (IF NOT EXISTS)? identifier branchOptions
;

createReplaceTagClause
: (CREATE OR)? REPLACE TAG identifier tagOptions
| CREATE TAG (IF NOT EXISTS)? identifier tagOptions
;

tagOptions
: (AS OF VERSION snapshotId)? (refRetain)?
;

branchOptions
: (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)?
;
Expand Down Expand Up @@ -200,7 +211,7 @@ fieldList
nonReserved
: ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | EXISTS | FIELD | FIRST | HOURS | IF | LAST | NOT | NULLS | OF | OR | ORDERED | PARTITION | TABLE | WRITE
| DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | RETENTION | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS
| TRUE | FALSE
| TAG | TRUE | FALSE
| MAP
;

Expand Down Expand Up @@ -250,6 +261,7 @@ SET: 'SET';
SNAPSHOT: 'SNAPSHOT';
SNAPSHOTS: 'SNAPSHOTS';
TABLE: 'TABLE';
TAG: 'TAG';
UNORDERED: 'UNORDERED';
VERSION: 'VERSION';
WITH: 'WITH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
private def isSnapshotRefDdl(normalized: String): Boolean = {
normalized.contains("create branch") ||
normalized.contains("replace branch") ||
normalized.contains("drop branch")
normalized.contains("drop branch") ||
normalized.contains("create tag") ||
normalized.contains("replace tag") ||
normalized.contains("drop tag")
}

protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,18 @@ import org.apache.spark.sql.catalyst.plans.logical.BranchOptions
import org.apache.spark.sql.catalyst.plans.logical.CallArgument
import org.apache.spark.sql.catalyst.plans.logical.CallStatement
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
import org.apache.spark.sql.catalyst.plans.logical.DropBranch
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.DropTag
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.TagOptions
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.connector.expressions
Expand Down Expand Up @@ -137,6 +140,44 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
DropBranch(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null)
}

/**
* Create an CREATE OR REPLACE TAG logical command.
*/
override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): CreateOrReplaceTag = withOrigin(ctx) {
val createTagClause = ctx.createReplaceTagClause()

val tagName = createTagClause.identifier().getText

val tagOptionsContext = Option(createTagClause.tagOptions())
val snapshotId = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.snapshotId()))
.map(_.getText.toLong)
val tagRetain = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.refRetain()))
val tagRefAgeMs = tagRetain.map(retain =>
TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong))
val tagOptions = TagOptions(
snapshotId,
tagRefAgeMs
)

val create = createTagClause.CREATE() != null
val replace = createTagClause.REPLACE() != null
val ifNotExists = createTagClause.EXISTS() != null

CreateOrReplaceTag(typedVisit[Seq[String]](ctx.multipartIdentifier),
tagName,
tagOptions,
create,
replace,
ifNotExists)
}

/**
* Create an DROP TAG logical command.
*/
override def visitDropTag(ctx: DropTagContext): DropTag = withOrigin(ctx) {
DropTag(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null)
}

/**
* Create an REPLACE PARTITION FIELD logical command.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute

case class CreateOrReplaceTag(
table: Seq[String],
tag: String,
tagOptions: TagOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean) extends Command {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"CreateOrReplaceTag tag: ${tag} for table: ${table.quoted}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute

case class DropTag(table: Seq[String], tag: String, ifExists: Boolean) extends Command {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"DropTag tag: ${tag} for table: ${table.quoted}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical

case class TagOptions(snapshotId: Option[Long], snapshotRefRetain: Option[Long])
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.relocated.com.google.common.base.Preconditions
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.TagOptions
import org.apache.spark.sql.connector.catalog._

case class CreateOrReplaceTagExec(
catalog: TableCatalog,
ident: Identifier,
tag: String,
tagOptions: TagOptions,
create: Boolean,
replace: Boolean,
ifNotExists: Boolean) extends V2CommandExec {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable =>
val snapshotId: java.lang.Long = tagOptions.snapshotId
.orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId()))
.map(java.lang.Long.valueOf)
.orNull

Preconditions.checkArgument(snapshotId != null,
"Cannot complete create or replace tag operation on %s, main has no snapshot", ident)

val manageSnapshot = iceberg.table.manageSnapshots()
val refExists = null != iceberg.table().refs().get(tag)

if (create && replace && !refExists) {
manageSnapshot.createTag(tag, snapshotId)
} else if (replace) {
manageSnapshot.replaceTag(tag, snapshotId)
} else {
if (refExists && ifNotExists) {
return Nil
}

manageSnapshot.createTag(tag, snapshotId)
}

if (tagOptions.snapshotRefRetain.nonEmpty) {
manageSnapshot.setMaxRefAgeMs(tag, tagOptions.snapshotRefRetain.get)
}

manageSnapshot.commit()

case table =>
throw new UnsupportedOperationException(s"Cannot create tag to non-Iceberg table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"Create tag: $tag for table: ${ident.quoted}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog

case class DropTagExec(
catalog: TableCatalog,
ident: Identifier,
tag: String,
ifExists: Boolean) extends V2CommandExec {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable =>
val ref = iceberg.table().refs().get(tag)
if (ref != null || !ifExists) {
iceberg.table().manageSnapshots().removeTag(tag).commit()
}

case table =>
throw new UnsupportedOperationException(s"Cannot drop tag on non-Iceberg table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"DropTag tag: ${tag} for table: ${ident.quoted}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.Call
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
import org.apache.spark.sql.catalyst.plans.logical.DropBranch
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.DropTag
import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilter
import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilterWithCardinalityCheck
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -77,6 +79,13 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy {
case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) =>
DropBranchExec(catalog, ident, branch, ifExists) :: Nil

case CreateOrReplaceTag(
IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, create, replace, ifNotExists) =>
CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace, ifNotExists) :: Nil

case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) =>
DropTagExec(catalog, ident, tag, ifExists) :: Nil

case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
SetIdentifierFieldsExec(catalog, ident, fields) :: Nil

Expand Down
Loading