Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,7 @@ class Dataset[T] private[sql](
*/
@throws[AnalysisException]
def createTempView(viewName: String): Unit = withPlan {
createViewCommand(viewName, replace = false)
createTempViewCommand(viewName, replace = false)
}

/**
Expand All @@ -2440,19 +2440,18 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def createOrReplaceTempView(viewName: String): Unit = withPlan {
createViewCommand(viewName, replace = true)
createTempViewCommand(viewName, replace = true)
}

private def createViewCommand(viewName: String, replace: Boolean): CreateViewCommand = {
private def createTempViewCommand(viewName: String, replace: Boolean): CreateViewCommand = {
CreateViewCommand(
name = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = None,
child = logicalPlan,
allowExisting = false,
replace = replace,
if (replace) SaveMode.Overwrite else SaveMode.ErrorIfExists,
isTemporary = true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1261,9 +1261,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
userSpecifiedColumns,
ctx.query,
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty),
allowExisting = ctx.EXISTS != null,
ignoreIfExists = ctx.EXISTS != null,
replace = ctx.REPLACE != null,
isTemporary = ctx.TEMPORARY != null
isTemporary = ctx.TEMPORARY != null,
isAlterViewAsSelect = false
)
}
}
Expand All @@ -1279,9 +1280,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
userSpecifiedColumns = Seq.empty,
query = ctx.query,
properties = Map.empty,
allowExisting = false,
ignoreIfExists = false,
replace = true,
isTemporary = false)
isTemporary = false,
isAlterViewAsSelect = true)
}

/**
Expand All @@ -1294,20 +1296,29 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
userSpecifiedColumns: Seq[(String, Option[String])],
query: QueryContext,
properties: Map[String, String],
allowExisting: Boolean,
ignoreIfExists: Boolean,
replace: Boolean,
isTemporary: Boolean): LogicalPlan = {
isTemporary: Boolean,
isAlterViewAsSelect: Boolean): LogicalPlan = {
Copy link
Contributor

@cloud-fan cloud-fan Aug 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a viewType parameter which is an enum?

public enum ViewType {
  Temporary,
  Permanent,
  Any
}

We are going to add global temp view soon, so this enum will also be useful at that time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it. Thanks!

val mode = (replace, ignoreIfExists) match {
case (true, false) => SaveMode.Overwrite
case (false, true) => SaveMode.Ignore
case (false, false) => SaveMode.ErrorIfExists
case _ => throw new ParseException(
"CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.", ctx)
}
val originalText = source(query)

CreateViewCommand(
visitTableIdentifier(name),
userSpecifiedColumns,
comment,
properties,
Some(originalText),
plan(query),
allowExisting = allowExisting,
replace = replace,
isTemporary = isTemporary)
mode,
isTemporary = isTemporary,
isAlterViewAsSelect = isAlterViewAsSelect)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
query)
ExecutedCommandExec(cmd) :: Nil

case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil

case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command

import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
Expand All @@ -41,14 +41,16 @@ import org.apache.spark.sql.types.StructType
* Dataset API.
* @param child the logical plan that represents the view; this is used to generate a canonicalized
* version of the SQL that can be saved in the catalog.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
* already exists, throws analysis exception.
* @param mode only three modes are applicable here: Ignore, Overwrite and ErrorIfExists. If the
* view already exists, CreateViewCommand behaves based on the mode:
* 1) Overwrite: update the view;
* 2) Ignore: noop;
* 3) ErrorIfExists throws analysis exception.
* @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped
* at the end of current Spark session. Existing permanent relations with the same
* name are not visible to the current session while the temporary view exists,
* unless they are specified with full qualified table name with database prefix.
* @param isAlterViewAsSelect if true, this original DDL command is ALTER VIEW AS SELECT
*/
case class CreateViewCommand(
name: TableIdentifier,
Expand All @@ -57,9 +59,9 @@ case class CreateViewCommand(
properties: Map[String, String],
originalText: Option[String],
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
isTemporary: Boolean)
mode: SaveMode,
isTemporary: Boolean,
isAlterViewAsSelect: Boolean = false)
extends RunnableCommand {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
Expand All @@ -69,17 +71,16 @@ case class CreateViewCommand(

override def output: Seq[Attribute] = Seq.empty[Attribute]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originalText is empty only when the view is created through Dataset APIs, as shown in the code comment

This checking looks useless. If you think we should keep it, I can add it back. Thanks!


assert(mode != SaveMode.Append,
"CREATE or ALTER VIEW can only use ErrorIfExists, Ignore or Overwrite.")

if (!isTemporary) {
require(originalText.isDefined,
"The table to created with CREATE VIEW must have 'originalText'.")
}

if (allowExisting && replace) {
throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
}

// Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
if (allowExisting && isTemporary) {
if (mode == SaveMode.Ignore && isTemporary) {
throw new AnalysisException(
"It is not allowed to define a TEMPORARY view with IF NOT EXISTS.")
}
Expand All @@ -105,7 +106,13 @@ case class CreateViewCommand(
}
val sessionState = sparkSession.sessionState

if (isTemporary) {
// 1) CREATE VIEW: create a temp view when users explicitly specify the keyword TEMPORARY;
// otherwise, create a permanent view no matter whether the temporary view
// with the same name exists or not.
// 2) ALTER VIEW: alter the temporary view if the temp view exists; otherwise, try to alter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: how can you tell whether it's CREATE VIEW or ALTER VIEW?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! The only way is to pass a flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it already has 3 flags: allowExisting, replace, isTemporary. We should rethink about it and decide what flags we really need for this command.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, let me think about it. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
        identifierCommentList? (COMMENT STRING)?
        (PARTITIONED ON identifierList)?
        (TBLPROPERTIES tablePropertyList)? AS query

Each flag corresponds to a keyword in the CREATE VIEW DDL command.

  • OR REPLACE -> replace
  • TEMPORARY -> isTemporary
  • IF NOT EXISTS -> allowExisting

If we want to use the same command CreateViewCommand to process both CREATE VIEW and ALTER VIEW, it sounds reasonable to add a new flag. So far, I have not found a way to combine them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like CreateMode, can we use SaveMode for replace and allowExisting?

Copy link
Member Author

@gatorsmile gatorsmile Aug 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I got it. : ) Copied what @liancheng wrote one month ago and did the update in the last case.

allowExisting replace SaveMode
true false Ignore
false false ErrorIfExists
false true Overwrite
true true AnalysisException

Let me try it. Thanks!

// the permanent view. Here, it follows the same resolution like DROP VIEW,
// since users are unable to specify the keyword TEMPORARY.
if (isTemporary || (isAlterViewAsSelect && sessionState.catalog.isTemporaryTable(name))) {
createTemporaryView(sparkSession, analyzedPlan)
} else {
// Adds default database for permanent table if it doesn't exist, so that tableExists()
Expand All @@ -115,14 +122,14 @@ case class CreateViewCommand(

if (sessionState.catalog.tableExists(qualifiedName)) {
val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName)
if (allowExisting) {
if (mode == SaveMode.Ignore) {
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
} else if (tableMetadata.tableType != CatalogTableType.VIEW) {
throw new AnalysisException(
"Existing table is not a view. The following is an existing table, " +
s"not a view: $qualifiedName")
} else if (replace) {
} else if (mode == SaveMode.Overwrite) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
} else {
Expand Down Expand Up @@ -154,7 +161,7 @@ case class CreateViewCommand(
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}

catalog.createTempView(name.table, logicalPlan, replace)
catalog.createTempView(name.table, logicalPlan, mode == SaveMode.Overwrite)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,25 @@ class HiveDDLCommandSuite extends PlanTest {
test("create view -- basic") {
val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1"
val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
assert(!command.allowExisting)
assert(command.mode == SaveMode.ErrorIfExists)
assert(command.name.database.isEmpty)
assert(command.name.table == "view1")
assert(command.originalText == Some("SELECT * FROM tab1"))
assert(command.userSpecifiedColumns.isEmpty)
}

test("create view -- IF NOT EXISTS") {
val v1 = "CREATE VIEW IF NOT EXISTS view1 AS SELECT * FROM tab1"
val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
assert(command.mode == SaveMode.Ignore)

val v2 = "CREATE OR REPLACE VIEW IF NOT EXISTS view1 AS SELECT * FROM tab1"
val e = intercept[ParseException] {
parser.parsePlan(v2).asInstanceOf[CreateViewCommand]
}.getMessage
assert(e.contains("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed"))
}

test("create view - full") {
val v1 =
"""
Expand All @@ -485,6 +497,7 @@ class HiveDDLCommandSuite extends PlanTest {
|AS SELECT * FROM tab1
""".stripMargin
val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
assert(command.mode == SaveMode.Overwrite)
assert(command.name.database.isEmpty)
assert(command.name.table == "view1")
assert(command.userSpecifiedColumns == Seq("col1" -> None, "col3" -> Some("hello")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils

Expand Down Expand Up @@ -201,13 +203,89 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
withView("testView", "default.testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")
sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
checkAnswer(
sql("SHOW TABLES 'testView*'"),
Row("testview", true) :: Row("testview", false) :: Nil)
}

withView("testView", "default.testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")
sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id FROM jt")
checkAnswer(
sql("SHOW TABLES 'testView*'"),
Row("testview", true) :: Row("testview", false) :: Nil)
}

withView("testView", "default.testView") {
sql("CREATE VIEW testView AS SELECT id FROM jt")
sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
checkAnswer(
sql("SHOW TABLES 'testView*'"),
Row("testview", false) :: Nil)
}
}

test("should allow CREATE permanent VIEW when a TEMPORARY VIEW with same name exists") {
test("ALTER VIEW: alter a temporary view when a permanent VIEW with same name exists") {
alterTempView(isTempAlteredView = true)
}

test("ALTER VIEW: alter a persistent view when a temp VIEW with same name exists") {
alterTempView(isTempAlteredView = false)
}

private def alterTempView (isTempAlteredView: Boolean) = {
withView("testView", "default.testView") {
val catalog = spark.sessionState.catalog
val oldViewQuery = "SELECT id FROM jt"
val newViewQuery = "SELECT id, id1 FROM jt"
sql(s"CREATE VIEW default.testView AS $oldViewQuery")
sql(s"CREATE TEMPORARY VIEW testView AS $oldViewQuery")
if (isTempAlteredView) {
// When the database is not specified, we will first try to alter the temporary view
sql(s"ALTER VIEW testView AS $newViewQuery")
} else {
// When the database is specified, we will try to alter the permanent view, no matter
// whether the temporary view with the same name exists or not.
sql(s"ALTER VIEW default.testView AS $newViewQuery")
}

val persistentView = catalog.getTableMetadata(
TableIdentifier(table = "testView", database = Some("default")))
assert(persistentView.tableType == CatalogTableType.VIEW)
val tempView = catalog.getTableMetadata(TableIdentifier("testView"))
assert(tempView.tableType == CatalogTableType.VIEW)
assert(tempView.viewOriginalText.isEmpty)

if (isTempAlteredView) {
// View Text of the persistent view default.testView is not changed
assert(persistentView.viewOriginalText == Option(oldViewQuery))
// temp view testView is changed
checkAnswer(
sql(newViewQuery),
sql("select * from testView"))
} else {
// View Text of the persistent view default.testView is changed
assert(persistentView.viewOriginalText == Option(newViewQuery))
// temp view testView is not changed
checkAnswer(
sql(oldViewQuery),
sql("select * from testView"))
}
}
}

test("CREATE VIEW: should allow CREATE permanent VIEW when a temp VIEW with same name exists") {
withView("testView", "default.testView") {
sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
sql("CREATE VIEW testView AS SELECT id FROM jt")

// Both temporary and permanent view have been successfully created.
val catalog = spark.sessionState.catalog
val persistentView = catalog.getTableMetadata(
TableIdentifier(table = "testView", database = Some("default")))
assert(persistentView.tableType == CatalogTableType.VIEW)
val tempView = catalog.getTableMetadata(TableIdentifier("testView"))
assert(tempView.tableType == CatalogTableType.VIEW)
}
}

Expand Down