Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ statement
| ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField
| ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH transform (AS name=identifier)? #replacePartitionField
| ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering
| ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields
| ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields
;

writeSpec
Expand Down Expand Up @@ -157,9 +159,13 @@ quotedIdentifier
: BACKQUOTED_IDENTIFIER
;

fieldList
: fields+=multipartIdentifier (',' fields+=multipartIdentifier)*
;

nonReserved
: ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE
| DISTRIBUTED | LOCALLY | UNORDERED
| DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET
| TRUE | FALSE
| MAP
;
Expand All @@ -174,13 +180,16 @@ DESC: 'DESC';
DISTRIBUTED: 'DISTRIBUTED';
DROP: 'DROP';
FIELD: 'FIELD';
FIELDS: 'FIELDS';
FIRST: 'FIRST';
LAST: 'LAST';
LOCALLY: 'LOCALLY';
NULLS: 'NULLS';
ORDERED: 'ORDERED';
PARTITION: 'PARTITION';
REPLACE: 'REPLACE';
IDENTIFIER_KW: 'IDENTIFIER';
SET: 'SET';
TABLE: 'TABLE';
UNORDERED: 'UNORDERED';
WITH: 'WITH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
normalized.contains("write ordered by") ||
normalized.contains("write locally ordered by") ||
normalized.contains("write distributed by") ||
normalized.contains("write unordered")))
normalized.contains("write unordered") ||
normalized.contains("set identifier fields") ||
normalized.contains("drop identifier fields")))
}

protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParse
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.CallArgument
import org.apache.spark.sql.catalyst.plans.logical.CallStatement
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
Expand Down Expand Up @@ -85,7 +87,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS


/**
* Create an CHANGE PARTITION FIELD logical command.
* Create an REPLACE PARTITION FIELD logical command.
*/
override def visitReplacePartitionField(ctx: ReplacePartitionFieldContext): ReplacePartitionField = withOrigin(ctx) {
ReplacePartitionField(
Expand All @@ -95,6 +97,24 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
Option(ctx.name).map(_.getText))
}

/**
* Create an SET IDENTIFIER FIELDS logical command.
*/
override def visitSetIdentifierFields(ctx: SetIdentifierFieldsContext): SetIdentifierFields = withOrigin(ctx) {
SetIdentifierFields(
typedVisit[Seq[String]](ctx.multipartIdentifier),
ctx.fieldList.fields.asScala.map(_.getText))
}

/**
* Create an DROP IDENTIFIER FIELDS logical command.
*/
override def visitDropIdentifierFields(ctx: DropIdentifierFieldsContext): DropIdentifierFields = withOrigin(ctx) {
DropIdentifierFields(
typedVisit[Seq[String]](ctx.multipartIdentifier),
ctx.fieldList.fields.asScala.map(_.getText))
}

/**
* Create a [[SetWriteDistributionAndOrdering]] for changing the write distribution and ordering.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute

case class DropIdentifierFields(
table: Seq[String],
fields: Seq[String]) extends Command {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"DropIdentifierFields ${table.quoted} (${fields.quoted})"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.expressions.Transform

case class SetIdentifierFields(
table: Seq[String],
fields: Seq[String]) extends Command {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"SetIdentifierFields ${table.quoted} (${fields.quoted})"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.relocated.com.google.common.base.Preconditions
import org.apache.iceberg.relocated.com.google.common.collect.Sets
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog

case class DropIdentifierFieldsExec(
catalog: TableCatalog,
ident: Identifier,
fields: Seq[String]) extends V2CommandExec {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable =>
val schema = iceberg.table.schema
val identifierFieldNames = Sets.newHashSet(schema.identifierFieldNames)

for (name <- fields) {
Preconditions.checkArgument(schema.findField(name) != null,
"Cannot complete drop identifier fields operation: field %s not found", name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can omit "complete" and "operation" to be more concise. "Cannot drop identifier field: %s (not found)" says basically the same thing.

Preconditions.checkArgument(identifierFieldNames.contains(name),
"Cannot complete drop identifier fields operation: %s is not an identifier field", name)
identifierFieldNames.remove(name)
}

iceberg.table.updateSchema()
.setIdentifierFields(identifierFieldNames)
.commit();
case table =>
throw new UnsupportedOperationException(s"Cannot drop identifier fields in non-Iceberg table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"DropIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.Call
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilter
import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilterWithCardinalityCheck
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.MergeInto
import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
Expand All @@ -66,6 +68,12 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy {
case ReplacePartitionField(IcebergCatalogAndIdentifier(catalog, ident), transformFrom, transformTo, name) =>
ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil

case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
SetIdentifierFieldsExec(catalog, ident, fields) :: Nil

case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
DropIdentifierFieldsExec(catalog, ident, fields) :: Nil

case SetWriteDistributionAndOrdering(
IcebergCatalogAndIdentifier(catalog, ident), distributionMode, ordering) =>
SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, ordering) :: Nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog

case class SetIdentifierFieldsExec(
catalog: TableCatalog,
ident: Identifier,
fields: Seq[String]) extends V2CommandExec {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable =>
iceberg.table.updateSchema()
.setIdentifierFields(scala.collection.JavaConverters.seqAsJavaList(fields))
.commit();
case table =>
throw new UnsupportedOperationException(s"Cannot set identifier fields in non-Iceberg table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"SetIdentifierFields ${catalog.name}.${ident.quoted} (${fields.quoted})";
}
}
Loading