Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,18 @@ public String property() {
}
}

interface ColumnChange extends TableChange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add docs for this please?

String[] fieldNames();
}

/**
* A TableChange to add a field.
* <p>
* If the field already exists, the change must result in an {@link IllegalArgumentException}.
* If the new field is nested and its parent does not exist or is not a struct, the change must
* result in an {@link IllegalArgumentException}.
*/
final class AddColumn implements TableChange {
final class AddColumn implements ColumnChange {
private final String[] fieldNames;
private final DataType dataType;
private final boolean isNullable;
Expand All @@ -247,6 +251,7 @@ private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, St
this.comment = comment;
}

@Override
public String[] fieldNames() {
return fieldNames;
}
Expand All @@ -272,7 +277,7 @@ public String comment() {
* <p>
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
*/
final class RenameColumn implements TableChange {
final class RenameColumn implements ColumnChange {
private final String[] fieldNames;
private final String newName;

Expand All @@ -281,6 +286,7 @@ private RenameColumn(String[] fieldNames, String newName) {
this.newName = newName;
}

@Override
public String[] fieldNames() {
return fieldNames;
}
Expand All @@ -297,7 +303,7 @@ public String newName() {
* <p>
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
*/
final class UpdateColumnType implements TableChange {
final class UpdateColumnType implements ColumnChange {
private final String[] fieldNames;
private final DataType newDataType;
private final boolean isNullable;
Expand All @@ -308,6 +314,7 @@ private UpdateColumnType(String[] fieldNames, DataType newDataType, boolean isNu
this.isNullable = isNullable;
}

@Override
public String[] fieldNames() {
return fieldNames;
}
Expand All @@ -328,7 +335,7 @@ public boolean isNullable() {
* <p>
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
*/
final class UpdateColumnComment implements TableChange {
final class UpdateColumnComment implements ColumnChange {
private final String[] fieldNames;
private final String newComment;

Expand All @@ -337,6 +344,7 @@ private UpdateColumnComment(String[] fieldNames, String newComment) {
this.newComment = newComment;
}

@Override
public String[] fieldNames() {
return fieldNames;
}
Expand All @@ -351,13 +359,14 @@ public String newComment() {
* <p>
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
*/
final class DeleteColumn implements TableChange {
final class DeleteColumn implements ColumnChange {
private final String[] fieldNames;

private DeleteColumn(String[] fieldNames) {
this.fieldNames = fieldNames;
}

@Override
public String[] fieldNames() {
return fieldNames;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}

object CatalogV2Util {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
Expand Down Expand Up @@ -132,16 +132,45 @@ object CatalogV2Util {
val pos = struct.getFieldIndex(fieldNames.head)
.getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}"))
val field = struct.fields(pos)
val replacement: Option[StructField] = if (fieldNames.tail.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to perform name resolution according to case sensitivity instead of strict equality when calling getFieldIndex?

update(field)
} else {
field.dataType match {
case nestedStruct: StructType =>
val updatedType: StructType = replace(nestedStruct, fieldNames.tail, update)
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${fieldNames.head}")
}
val replacement: Option[StructField] = (fieldNames.tail, field.dataType) match {
case (Seq(), _) =>
update(field)

case (names, struct: StructType) =>
val updatedType: StructType = replace(struct, names, update)
Some(StructField(field.name, updatedType, field.nullable, field.metadata))

case (Seq("key"), map @ MapType(keyType, _, _)) =>
val updated = update(StructField("key", keyType, nullable = false))
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map key"))
Some(field.copy(dataType = map.copy(keyType = updated.dataType)))

case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _, _)) =>
Some(field.copy(dataType = map.copy(keyType = replace(keyStruct, names, update))))

case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) =>
val updated = update(StructField("value", mapValueType, nullable = isNullable))
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map value"))
Some(field.copy(dataType = map.copy(
valueType = updated.dataType,
valueContainsNull = updated.nullable)))

case (Seq("value", names @ _*), map @ MapType(_, valueStruct: StructType, _)) =>
Some(field.copy(dataType = map.copy(valueType = replace(valueStruct, names, update))))

case (Seq("element"), array @ ArrayType(elementType, isNullable)) =>
val updated = update(StructField("element", elementType, nullable = isNullable))
.getOrElse(throw new IllegalArgumentException(s"Cannot delete array element"))
Some(field.copy(dataType = array.copy(
elementType = updated.dataType,
containsNull = updated.nullable)))

case (Seq("element", names @ _*), array @ ArrayType(elementStruct: StructType, _)) =>
Some(field.copy(dataType = array.copy(elementType = replace(elementStruct, names, update))))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a pretty cool feature to me. Can we document it in TableChange? Tell users that they can use a.b.mapField.key to change map type's key type, etc.

case (names, dataType) =>
throw new IllegalArgumentException(
s"Cannot find field: ${names.head} in ${dataType.simpleString}")
}

val newFields = struct.fields.zipWithIndex.flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog}
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog, TableChange}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
Expand All @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

import org.apache.spark.sql.catalyst.plans.logical.sql._

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I've noted in several places that this is a bad practice because it causes namespace problems (duplicate names accidentally imported) and commit conflicts.

import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
Expand Down Expand Up @@ -165,6 +166,7 @@ class Analyzer(
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveAlterTable ::
ResolveTables ::
ResolveRelations ::
ResolveReferences ::
Expand Down Expand Up @@ -787,6 +789,86 @@ class Analyzer(
}
}

/**
* Resolve ALTER TABLE statements that use a DSv2 catalog.
*
* This rule converts unresolved ALTER TABLE statements to v2 when a v2 catalog is responsible
* for the table identifier. A v2 catalog is responsible for an identifier when the identifier
* has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and
* the table identifier does not include a catalog.
*/
object ResolveAlterTable extends Rule[LogicalPlan] {
Copy link
Contributor

@cloud-fan cloud-fan Jul 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remind me where we handle the v1 code path for ALTER TABLE?

import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case alter @ AlterTableAddColumnsStatement(
CatalogObjectIdentifier(Some(v2Catalog), ident), cols) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a JIRA to follow up on changing these to use the V2SessionCatalog if a catalog is not defined?

val changes = cols.map { col =>
TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
}

AlterTable(
v2Catalog.asTableCatalog, ident,
UnresolvedRelation(alter.tableName),
changes)

case alter @ AlterTableAlterColumnStatement(
CatalogObjectIdentifier(Some(v2Catalog), ident), colName, dataType, comment) =>
val typeChange = dataType.map { newDataType =>
TableChange.updateColumnType(colName.toArray, newDataType, true)
}

val commentChange = comment.map { newComment =>
TableChange.updateColumnComment(colName.toArray, newComment)
}

AlterTable(
v2Catalog.asTableCatalog, ident,
UnresolvedRelation(alter.tableName),
typeChange.toSeq ++ commentChange.toSeq)

case alter @ AlterTableRenameColumnStatement(
CatalogObjectIdentifier(Some(v2Catalog), ident), col, newName) =>
AlterTable(
v2Catalog.asTableCatalog, ident,
UnresolvedRelation(alter.tableName),
Seq(TableChange.renameColumn(col.toArray, newName)))

case alter @ AlterTableDropColumnsStatement(
CatalogObjectIdentifier(Some(v2Catalog), ident), cols) =>
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
AlterTable(
v2Catalog.asTableCatalog, ident,
UnresolvedRelation(alter.tableName),
changes)

case alter @ AlterTableSetPropertiesStatement(
CatalogObjectIdentifier(Some(v2Catalog), ident), props) =>
val changes = props.map {
case (key, value) =>
TableChange.setProperty(key, value)
}

AlterTable(
v2Catalog.asTableCatalog, ident,
UnresolvedRelation(alter.tableName),
changes.toSeq)

case alter @ AlterTableUnsetPropertiesStatement(
CatalogObjectIdentifier(Some(v2Catalog), ident), keys, _) =>
AlterTable(
v2Catalog.asTableCatalog, ident,
UnresolvedRelation(alter.tableName),
keys.map(key => TableChange.removeProperty(key)))

case alter @ AlterTableSetLocationStatement(
CatalogObjectIdentifier(Some(v2Catalog), ident), newLoc) =>
AlterTable(
v2Catalog.asTableCatalog, ident,
UnresolvedRelation(alter.tableName),
Seq(TableChange.setProperty("location", newLoc)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I feel changing the location of a table deserves its own special TableChange operator, like ChangeLocation rather than a simple setProperty. When I think property, I think TBLPROPERTIES, and the location is separate from that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all tables have locations and we're using the convention of passing optional metadata as table properties. I think that given that we don't want to build special support for everything, this is the right way to pass the change.

}
}

/**
* Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from
* a logical plan node's children.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
Expand Down Expand Up @@ -353,6 +354,59 @@ trait CheckAnalysis extends PredicateHelper {
case _ =>
}

case alter: AlterTable if alter.childrenResolved =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that some of these checks are here, and some in the resolved of AlterTable? It'd be nice to hide the complexity inside AlterTable if possible, and do a simpler check here on whether AlterTable is resolved.
However, I would much prefer the case where we can throw a better error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The meaning of resolved is that references are satisfied and there are no unresolved placeholders -- that's why resolved checks that the referenced fields exist. This checks whether that resolved plan has valid and reasonable changes, like whether the change updates a column from a string to a boolean.

val table = alter.table
def findField(operation: String, fieldName: Array[String]): StructField = {
// include collections because structs nested in maps and arrays may be altered
val field = table.schema.findNestedField(fieldName, includeCollections = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is case sensitivity handled here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it uses the resolver, so it should be case sensitive if the analyzer is.

if (field.isEmpty) {
throw new AnalysisException(
s"Cannot $operation missing field in ${table.name} schema: ${fieldName.quoted}")
}
field.get
}

alter.changes.foreach {
case add: AddColumn =>
val parent = add.fieldNames.init
if (parent.nonEmpty) {
findField("add to", parent)
}
case update: UpdateColumnType =>
val field = findField("update", update.fieldNames)
val fieldName = update.fieldNames.quoted
update.newDataType match {
case _: StructType =>
throw new AnalysisException(
s"Cannot update ${table.name} field $fieldName type: " +
s"update a struct by adding, deleting, or updating its fields")
case _: MapType =>
throw new AnalysisException(
s"Cannot update ${table.name} field $fieldName type: " +
s"update a map by updating $fieldName.key or $fieldName.value")
case _: ArrayType =>
throw new AnalysisException(
s"Cannot update ${table.name} field $fieldName type: " +
s"update the element by updating $fieldName.element")
case _: AtomicType =>
// update is okay
}
if (!Cast.canUpCast(field.dataType, update.newDataType)) {
throw new AnalysisException(
s"Cannot update ${table.name} field $fieldName: " +
s"${field.dataType.simpleString} cannot be cast to " +
s"${update.newDataType.simpleString}")
}
case rename: RenameColumn =>
findField("rename", rename.fieldNames)
case update: UpdateColumnComment =>
findField("update", update.fieldNames)
case delete: DeleteColumn =>
findField("delete", delete.fieldNames)
case _ =>
// no validation needed for set and remove property
}

case _ => // Fallbacks to the following checks
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
*
* @param multipartIdentifier table name
*/
case class UnresolvedRelation(multipartIdentifier: Seq[String]) extends LeafNode {
case class UnresolvedRelation(
multipartIdentifier: Seq[String]) extends LeafNode with NamedRelation {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

/** Returns a `.` separated name for this relation. */
def tableName: String = multipartIdentifier.quoted

override def name: String = tableName

override def output: Seq[Attribute] = Nil

override lazy val resolved = false
Expand Down
Loading