Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -376,28 +376,6 @@ trait CheckAnalysis extends PredicateHelper {
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)

case InsertIntoTable(t, _, _, _, _)
if !t.isInstanceOf[LeafNode] ||
t.isInstanceOf[Range] ||
t == OneRowRelation ||
t.isInstanceOf[LocalRelation] =>
failAnalysis(s"Inserting into an RDD-based table is not allowed.")

case i @ InsertIntoTable(table, partitions, query, _, _) =>
val numStaticPartitions = partitions.values.count(_.isDefined)
if (table.output.size != (query.output.size + numStaticPartitions)) {
failAnalysis(
s"$table requires that the data to be inserted have the same number of " +
s"columns as the target table: target table has ${table.output.size} " +
s"column(s) but the inserted data has " +
s"${query.output.size + numStaticPartitions} column(s), including " +
s"$numStaticPartitions partition column(s) having constant value(s).")
}

case o if !o.resolved =>
failAnalysis(
s"unresolved operator ${operator.simpleString}")

case o if o.expressions.exists(!_.deterministic) &&
!o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
!o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] =>
Expand All @@ -413,6 +391,10 @@ trait CheckAnalysis extends PredicateHelper {
}
}
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To other reviewers: This is moved to PreprocessTableInsertion.

After the above two moves, def checkAnalysis(plan: LogicalPlan) does not have any DDL/DML error handling. It becomes cleaner

case _ =>
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This movement is great, since extendedCheckRules could output a better error message.


plan.foreach(_.setAnalyzed())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ object UnsupportedOperationChecker {
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")

case _: InsertIntoTable =>
throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")

case Join(left, right, joinType, _) =>

joinType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
}

/**
* Insert some data into a table.
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
* concrete implementations during analysis.
Copy link
Member

@gatorsmile gatorsmile Feb 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To other reviewers, InsertIntoTable is a unified unresolved node for representing INSERT. After completing the resolution, it will be replaced to InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand or InsertIntoHiveTable.

*
* @param table the logical plan representing the table. In the future this should be a
* [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
Expand All @@ -374,25 +375,24 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
* Map('a' -> Some('1'), 'b' -> Some('2')),
* and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
* would have Map('a' -> Some('1'), 'b' -> None).
* @param child the logical plan representing data to write to.
* @param query the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifNotExists If true, only write if the table or partition does not exist.
*/
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
query: LogicalPlan,
overwrite: Boolean,
ifNotExists: Boolean)
extends LogicalPlan {

override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty

assert(overwrite || !ifNotExists)
assert(partition.values.forall(_.nonEmpty) || !ifNotExists)

override lazy val resolved: Boolean = childrenResolved && table.resolved
// We don't want `table` in children as sometimes we don't want to transform it.
override def children: Seq[LogicalPlan] = query :: Nil
override def output: Seq[Attribute] = Seq.empty
override lazy val resolved: Boolean = false
Copy link
Member

@gatorsmile gatorsmile Feb 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan After this change, we are unable to reach the check in checkForStreaming. BTW, it sounds like we do not have any test case to cover these scenarios.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L110-L115

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we will resolve CreateTable, InsertIntoTable to concrete commands, so the check can still work.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
InsertIntoTable(
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
child = df.logicalPlan,
query = df.logicalPlan,
overwrite = mode == SaveMode.Overwrite,
ifNotExists = false)).toRdd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class SparkPlanner(
extraStrategies ++ (
FileSourceStrategy ::
DataSourceStrategy ::
DDLStrategy ::
SpecialLimits ::
Aggregation ::
JoinSelection ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,32 +405,4 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case _ => Nil
}
}

object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil

case CreateTable(tableDesc, mode, None) =>
val cmd =
CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil

// CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
// `CreateTables`

case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) =>
val cmd =
CreateDataSourceTableAsSelectCommand(
tableDesc,
mode,
query)
ExecutedCommandExec(cmd) :: Nil

case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil

case _ => Nil
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ case class CreateTableLikeCommand(
* [AS select_statement];
* }}}
*/
case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
case class CreateTableCommand(
table: CatalogTable,
ignoreIfExists: Boolean) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.catalog.createTable(table, ifNotExists)
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists)
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ import org.apache.spark.unsafe.types.UTF8String
* Replaces generic operations with specific variants that are designed to work with Spark
* SQL Data Sources.
*
* Note that, this rule must be run after [[PreprocessTableInsertion]].
* Note that, this rule must be run after `PreprocessTableCreation` and
* `PreprocessTableInsertion`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same updates are needed here.

*/
case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {

Expand Down Expand Up @@ -130,6 +131,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)

case CreateTable(tableDesc, mode, Some(query))
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)

case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _),
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

case InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
Expand Down Expand Up @@ -273,10 +285,6 @@ object DataSourceStrategy extends Strategy with Logging {
Map.empty,
None) :: Nil

case InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
part, query, overwrite, false) if part.isEmpty =>
ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil

case _ => Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,23 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.types._

/**
* Create a table and optionally insert some data into it. Note that this plan is unresolved and
* has to be replaced by the concrete implementations during analysis.
*
* @param tableDesc the metadata of the table to be created.
* @param mode the data writing mode
* @param query an optional logical plan representing data to write into the created table.
*/
case class CreateTable(
tableDesc: CatalogTable,
mode: SaveMode,
query: Option[LogicalPlan]) extends Command {
query: Option[LogicalPlan]) extends LogicalPlan {
assert(tableDesc.provider.isDefined, "The table to be created must have a provider.")

if (query.isEmpty) {
Expand All @@ -37,7 +45,9 @@ case class CreateTable(
"create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.")
}

override def innerChildren: Seq[QueryPlan[_]] = query.toSeq
override def children: Seq[LogicalPlan] = query.toSeq
override def output: Seq[Attribute] = Seq.empty
override lazy val resolved: Boolean = false
}

/**
Expand Down
Loading