Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,33 @@ statement
| ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering
| ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields
| ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields
| ALTER TABLE multipartIdentifier CREATE BRANCH identifier (AS OF VERSION snapshotId)? (RETAIN snapshotRefRetain snapshotRefRetainTimeUnit)? (snapshotRetentionClause)? #createBranch
| ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch
;

snapshotRetentionClause
: WITH SNAPSHOT RETENTION numSnapshots SNAPSHOTS
| WITH SNAPSHOT RETENTION snapshotRetain snapshotRetainTimeUnit
| WITH SNAPSHOT RETENTION numSnapshots SNAPSHOTS snapshotRetain snapshotRetainTimeUnit
createReplaceBranchClause
: (CREATE OR)? REPLACE BRANCH identifier branchOptions
| CREATE BRANCH (IF NOT EXISTS)? identifier branchOptions
;

branchOptions
: (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)?;

snapshotRetention
: WITH SNAPSHOT RETENTION minSnapshotsToKeep
| WITH SNAPSHOT RETENTION maxSnapshotAge
| WITH SNAPSHOT RETENTION minSnapshotsToKeep maxSnapshotAge
;

refRetain
: RETAIN number timeUnit
;

maxSnapshotAge
: number timeUnit
;

minSnapshotsToKeep
: number SNAPSHOTS
;

writeSpec
Expand Down Expand Up @@ -175,7 +195,7 @@ fieldList
;

nonReserved
: ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | FIELD | FIRST | HOURS | LAST | NULLS | OF | ORDERED | PARTITION | TABLE | WRITE
: ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | EXISTS | FIELD | FIRST | HOURS | IF | LAST | NOT | NULLS | OF | OR | ORDERED | PARTITION | TABLE | WRITE
| DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS
| TRUE | FALSE
| MAP
Expand All @@ -189,22 +209,6 @@ numSnapshots
: number
;

snapshotRetain
: number
;

snapshotRefRetain
: number
;

snapshotRefRetainTimeUnit
: timeUnit
;

snapshotRetainTimeUnit
: timeUnit
;

timeUnit
: DAYS
| HOURS
Expand All @@ -222,17 +226,21 @@ DAYS: 'DAYS';
DESC: 'DESC';
DISTRIBUTED: 'DISTRIBUTED';
DROP: 'DROP';
EXISTS: 'EXISTS';
FIELD: 'FIELD';
FIELDS: 'FIELDS';
FIRST: 'FIRST';
HOURS: 'HOURS';
IF : 'IF';
LAST: 'LAST';
LOCALLY: 'LOCALLY';
MINUTES: 'MINUTES';
MONTHS: 'MONTHS';
CREATE: 'CREATE';
NOT: 'NOT';
NULLS: 'NULLS';
OF: 'OF';
OR: 'OR';
ORDERED: 'ORDERED';
PARTITION: 'PARTITION';
REPLACE: 'REPLACE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
normalized.contains("write unordered") ||
normalized.contains("set identifier fields") ||
normalized.contains("drop identifier fields") ||
normalized.contains("create branch")))
normalized.contains("create branch"))) ||
normalized.contains("replace branch")

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.parser.extensions.IcebergParserUtils.withOrigin
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser._
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.BranchOptions
import org.apache.spark.sql.catalyst.plans.logical.CallArgument
import org.apache.spark.sql.catalyst.plans.logical.CallStatement
import org.apache.spark.sql.catalyst.plans.logical.CreateBranch
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -91,25 +92,40 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
typedVisit[Transform](ctx.transform))
}

/**
* Create an ADD BRANCH logical command.
*/
override def visitCreateBranch(ctx: CreateBranchContext): CreateBranch = withOrigin(ctx) {
val snapshotRetention = Option(ctx.snapshotRetentionClause())

CreateBranch(
override def visitCreateOrReplaceBranch(ctx: CreateOrReplaceBranchContext): CreateOrReplaceBranch = withOrigin(ctx) {
val createOrReplaceBranchClause = ctx.createReplaceBranchClause()

val branchName = createOrReplaceBranchClause.identifier()
val branchOptionsContext = Option(createOrReplaceBranchClause.branchOptions())
val snapshotId = branchOptionsContext.flatMap(branchOptions => Option(branchOptions.snapshotId()))
.map(_.getText.toLong)
val snapshotRetention = branchOptionsContext.flatMap(branchOptions => Option(branchOptions.snapshotRetention()))
val minSnapshotsToKeep = snapshotRetention.flatMap(retention => Option(retention.minSnapshotsToKeep()))
.map(minSnapshots => minSnapshots.number().getText.toLong)
val maxSnapshotAgeMs = snapshotRetention
.flatMap(retention => Option(retention.maxSnapshotAge()))
.map(retention => TimeUnit.valueOf(retention.timeUnit().getText.toUpperCase(Locale.ENGLISH))
.toMillis(retention.number().getText.toLong))
val branchRetention = branchOptionsContext.flatMap(branchOptions => Option(branchOptions.refRetain()))
val branchRefAgeMs = branchRetention.map(retain =>
TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong))
val replace = ctx.createReplaceBranchClause().REPLACE() != null
val ifNotExists = createOrReplaceBranchClause.EXISTS() != null

val branchOptions = BranchOptions(
snapshotId,
minSnapshotsToKeep,
maxSnapshotAgeMs,
branchRefAgeMs
)

CreateOrReplaceBranch(
typedVisit[Seq[String]](ctx.multipartIdentifier),
ctx.identifier().getText,
Option(ctx.snapshotId()).map(_.getText.toLong),
snapshotRetention.flatMap(s => Option(s.numSnapshots())).map(_.getText.toLong),
snapshotRetention.flatMap(s => Option(s.snapshotRetain())).map(retain => {
TimeUnit.valueOf(ctx.snapshotRetentionClause().snapshotRetainTimeUnit().getText.toUpperCase(Locale.ENGLISH))
.toMillis(retain.getText.toLong)
}),
Option(ctx.snapshotRefRetain()).map(retain => {
TimeUnit.valueOf(ctx.snapshotRefRetainTimeUnit().getText.toUpperCase(Locale.ENGLISH))
.toMillis(retain.getText.toLong)
}))
branchName.getText,
branchOptions,
replace,
ifNotExists)

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

case class BranchOptions (snapshotId: Option[Long], numSnapshots: Option[Long],
snapshotRetain: Option[Long], snapshotRefRetain: Option[Long])
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute

case class CreateBranch(table: Seq[String], branch: String, snapshotId: Option[Long], numSnapshots: Option[Long],
snapshotRetain: Option[Long], snapshotRefRetain: Option[Long]) extends LeafCommand {
case class CreateOrReplaceBranch(table: Seq[String], branch: String,
branchOptions: BranchOptions, replace: Boolean, ifNotExists: Boolean)
extends LeafCommand {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"Create branch: ${branch} for table: ${table.quoted} "
s"CreateOrReplaceBranch branch: ${branch} for table: ${table.quoted}"
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.BranchOptions
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog

case class CreateOrReplaceBranchExec(
catalog: TableCatalog,
ident: Identifier,
branch: String,
branchOptions: BranchOptions,
replace: Boolean,
ifNotExists: Boolean) extends LeafV2CommandExec {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable =>
val snapshotId = branchOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId())
val manageSnapshots = iceberg.table().manageSnapshots()
if (!replace) {
val ref = iceberg.table().refs().get(branch);
if (ref != null && ifNotExists) {
return Nil
}

manageSnapshots.createBranch(branch, snapshotId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we are implicitly letting the API throw exception for branch already exists. It's fine to me, but would like to know what other people think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, it's implicits since the API provides those guarantees. We could also throw our own before calling createBranch but, I'll let @hililiwei @yyanyy @rdblue provide their thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

} else {
manageSnapshots.replaceBranch(branch, snapshotId)
}

if (branchOptions.numSnapshots.nonEmpty) {
manageSnapshots.setMinSnapshotsToKeep(branch, branchOptions.numSnapshots.get.toInt)
}

if (branchOptions.snapshotRetain.nonEmpty) {
manageSnapshots.setMaxSnapshotAgeMs(branch, branchOptions.snapshotRetain.get)
}

if (branchOptions.snapshotRefRetain.nonEmpty) {
manageSnapshots.setMaxRefAgeMs(branch, branchOptions.snapshotRefRetain.get)
}

manageSnapshots.commit()

case table =>
throw new UnsupportedOperationException(s"Cannot create or replace branch on non-Iceberg table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"CreateOrReplace branch: ${branch} for table: ${ident.quoted}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.Call
import org.apache.spark.sql.catalyst.plans.logical.CreateBranch
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
Expand Down Expand Up @@ -62,8 +62,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform, name) =>
AddPartitionFieldExec(catalog, ident, transform, name) :: Nil

case CreateBranch(IcebergCatalogAndIdentifier(catalog, ident), _, _, _, _, _) =>
CreateBranchExec(catalog, ident, plan.asInstanceOf[CreateBranch]) :: Nil
case CreateOrReplaceBranch(
IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) =>
CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil

case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) =>
DropPartitionFieldExec(catalog, ident, transform) :: Nil
Expand Down
Loading