Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ singleStatement
;

statement
: CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
| ALTER TABLE multipartIdentifier ADD PARTITION FIELD transform (AS name=identifier)? #addPartitionField
: ALTER TABLE multipartIdentifier ADD PARTITION FIELD transform (AS name=identifier)? #addPartitionField
| ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField
| ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH transform (AS name=identifier)? #replacePartitionField
| ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering
Expand Down Expand Up @@ -128,11 +127,6 @@ writeOrderingSpec
| UNORDERED
;

callArgument
: expression #positionalArgument
| identifier '=>' expression #namedArgument
;

singleOrder
: order EOF
;
Expand Down Expand Up @@ -213,7 +207,7 @@ fieldList
;

nonReserved
: ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | EXISTS | FIELD | FIRST | HOURS | IF | LAST | NOT | NULLS | OF | OR | ORDERED | PARTITION | TABLE | WRITE
: ADD | ALTER | AS | ASC | BRANCH | BY | CREATE | DAYS | DESC | DROP | EXISTS | FIELD | FIRST | HOURS | IF | LAST | NOT | NULLS | OF | OR | ORDERED | PARTITION | TABLE | WRITE
| DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS
| TAG | TRUE | FALSE
| MAP
Expand All @@ -239,7 +233,6 @@ AS: 'AS';
ASC: 'ASC';
BRANCH: 'BRANCH';
BY: 'BY';
CALL: 'CALL';
DAYS: 'DAYS';
DESC: 'DESC';
DISTRIBUTED: 'DISTRIBUTED';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ package org.apache.iceberg.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.CheckViews
import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion
import org.apache.spark.sql.catalyst.analysis.ResolveProcedures
import org.apache.spark.sql.catalyst.analysis.ResolveViews
import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser
Expand All @@ -35,9 +33,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectParser { case (_, parser) => new IcebergSparkSqlExtensionsParser(parser) }

// analyzer extensions
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectResolutionRule { spark => ResolveViews(spark) }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
extensions.injectCheckRule(_ => CheckViews)

// optimizer extensions
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,17 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
.replaceAll("`", "")
.trim()

isIcebergProcedure(normalized) || (
normalized.startsWith("alter table") && (
normalized.contains("add partition field") ||
normalized.contains("drop partition field") ||
normalized.contains("replace partition field") ||
normalized.contains("write ordered by") ||
normalized.contains("write locally ordered by") ||
normalized.contains("write distributed by") ||
normalized.contains("write unordered") ||
normalized.contains("set identifier fields") ||
normalized.contains("drop identifier fields") ||
isSnapshotRefDdl(normalized)))
}

// All builtin Iceberg procedures are under the 'system' namespace
private def isIcebergProcedure(normalized: String): Boolean = {
normalized.startsWith("call") &&
SparkProcedures.names().asScala.map("system." + _).exists(normalized.contains)
normalized.startsWith("alter table") && (
normalized.contains("add partition field") ||
normalized.contains("drop partition field") ||
normalized.contains("replace partition field") ||
normalized.contains("write ordered by") ||
normalized.contains("write locally ordered by") ||
normalized.contains("write distributed by") ||
normalized.contains("write unordered") ||
normalized.contains("set identifier fields") ||
normalized.contains("drop identifier fields") ||
isSnapshotRefDdl(normalized))
}

private def isSnapshotRefDdl(normalized: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,13 @@ import org.apache.spark.sql.catalyst.parser.extensions.IcebergParserUtils.withOr
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser._
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.BranchOptions
import org.apache.spark.sql.catalyst.plans.logical.CallArgument
import org.apache.spark.sql.catalyst.plans.logical.CallStatement
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
import org.apache.spark.sql.catalyst.plans.logical.DropBranch
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.DropTag
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
Expand All @@ -68,15 +64,6 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
private def toBuffer[T](list: java.util.List[T]): scala.collection.mutable.Buffer[T] = list.asScala
private def toSeq[T](list: java.util.List[T]): Seq[T] = toBuffer(list).toSeq

/**
* Create a [[CallStatement]] for a stored procedure call.
*/
override def visitCall(ctx: CallContext): CallStatement = withOrigin(ctx) {
val name = toSeq(ctx.multipartIdentifier.parts).map(_.getText)
val args = toSeq(ctx.callArgument).map(typedVisit[CallArgument])
CallStatement(name, args)
}

/**
* Create an ADD PARTITION FIELD logical command.
*/
Expand Down Expand Up @@ -315,23 +302,6 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
toSeq(ctx.order.fields).map(typedVisit[(Term, SortDirection, NullOrder)])
}

/**
* Create a positional argument in a stored procedure call.
*/
override def visitPositionalArgument(ctx: PositionalArgumentContext): CallArgument = withOrigin(ctx) {
val expr = typedVisit[Expression](ctx.expression)
PositionalArgument(expr)
}

/**
* Create a named argument in a stored procedure call.
*/
override def visitNamedArgument(ctx: NamedArgumentContext): CallArgument = withOrigin(ctx) {
val name = ctx.identifier.getText
val expr = typedVisit[Expression](ctx.expression)
NamedArgument(name, expr)
}

override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
visit(ctx.statement).asInstanceOf[LogicalPlan]
}
Expand Down
Loading