Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,14 @@ private[spark] object SQLConf {
"possible, or you may get wrong result.",
isPublic = false)

val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical",
defaultValue = Some(true),
doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
"CREATE VIEW statement using SQL query string generated from view definition logical " +
"plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
"original native view implementation.",
isPublic = false)

val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
defaultValue = Some("_corrupt_record"),
doc = "The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
Expand Down Expand Up @@ -570,6 +578,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with ParserCon

private[spark] def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)

private[spark] def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)

def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

private[spark] def subexpressionEliminationEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,22 @@ private[sql] trait SQLTestUtils
}
}

/**
* Drops view `viewName` after calling `f`.
*/
protected def withView(viewNames: String*)(f: => Unit): Unit = {
try f finally {
viewNames.foreach { name =>
sqlContext.sql(s"DROP VIEW IF EXISTS $name")
}
}
}

/**
* Creates a temporary database and switches current database to it before executing `f`. This
* database is dropped after `f` returns.
*
* Note that this method doesn't switch current database before executing `f`.
*/
protected def withTempDatabase(f: String => Unit): Unit = {
val dbName = s"db_${UUID.randomUUID().toString.replace('-', '_')}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,25 +572,24 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p

case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
if (conf.nativeView) {
if (allowExisting && replace) {
throw new AnalysisException(
"It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
}
case CreateViewAsSelect(table, child, allowExisting, replace, sql) if conf.nativeView =>
if (allowExisting && replace) {
throw new AnalysisException(
"It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
}

val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)

execution.CreateViewAsSelect(
table.copy(
specifiedDatabase = Some(dbName),
name = tblName),
child.output,
allowExisting,
replace)
} else {
HiveNativeCommand(sql)
}
execution.CreateViewAsSelect(
table.copy(
specifiedDatabase = Some(dbName),
name = tblName),
child,
allowExisting,
replace)

case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
HiveNativeCommand(sql)

case p @ CreateTableAsSelect(table, child, allowExisting) =>
val schema = if (table.schema.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, SQLBuilder}
import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}

/**
Expand All @@ -32,10 +33,12 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
// from Hive and may not work for some cases like create view on self join.
private[hive] case class CreateViewAsSelect(
tableDesc: HiveTable,
childSchema: Seq[Attribute],
child: LogicalPlan,
allowExisting: Boolean,
orReplace: Boolean) extends RunnableCommand {

private val childSchema = child.output

assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
assert(tableDesc.viewText.isDefined)

Expand All @@ -44,55 +47,83 @@ private[hive] case class CreateViewAsSelect(
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]

if (hiveContext.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// view already exists, will do nothing, to keep consistent with Hive
} else if (orReplace) {
hiveContext.catalog.client.alertView(prepareTable())
} else {
hiveContext.catalog.tableExists(tableIdentifier) match {
case true if allowExisting =>
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.

case true if orReplace =>
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
hiveContext.catalog.client.alertView(prepareTable(sqlContext))

case true =>
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
throw new AnalysisException(s"View $tableIdentifier already exists. " +
"If you want to update the view definition, please use ALTER VIEW AS or " +
"CREATE OR REPLACE VIEW AS")
}
} else {
hiveContext.catalog.client.createView(prepareTable())

case false =>
hiveContext.catalog.client.createView(prepareTable(sqlContext))
}

Seq.empty[Row]
}

private def prepareTable(): HiveTable = {
// setup column types according to the schema of child.
val schema = if (tableDesc.schema == Nil) {
childSchema.map { attr =>
HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
}
private def prepareTable(sqlContext: SQLContext): HiveTable = {
val expandedText = if (sqlContext.conf.canonicalView) {
rebuildViewQueryString(sqlContext).getOrElse(wrapViewTextWithSelect)
} else {
childSchema.zip(tableDesc.schema).map { case (attr, col) =>
HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
wrapViewTextWithSelect
}

val viewSchema = {
if (tableDesc.schema.isEmpty) {
childSchema.map { attr =>
HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
}
} else {
childSchema.zip(tableDesc.schema).map { case (attr, col) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I missed this. How about we also have a check for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that we do the check at https://github.com/apache/spark/pull/10733/files#diff-074b1d8480e0d0d7c212bc4461f3d4acR43 (assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length))

HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
}
}
}

val columnNames = childSchema.map(f => verbose(f.name))
tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
}

private def wrapViewTextWithSelect: String = {
// When user specified column names for view, we should create a project to do the renaming.
// When no column name specified, we still need to create a project to declare the columns
// we need, to make us more robust to top level `*`s.
val projectList = if (tableDesc.schema == Nil) {
columnNames.mkString(", ")
} else {
columnNames.zip(tableDesc.schema.map(f => verbose(f.name))).map {
case (name, alias) => s"$name AS $alias"
}.mkString(", ")
val viewOutput = {
val columnNames = childSchema.map(f => quote(f.name))
if (tableDesc.schema.isEmpty) {
columnNames.mkString(", ")
} else {
columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
case (name, alias) => s"$name AS $alias"
}.mkString(", ")
}
}

val viewName = verbose(tableDesc.name)

val expandedText = s"SELECT $projectList FROM (${tableDesc.viewText.get}) $viewName"
val viewText = tableDesc.viewText.get
val viewName = quote(tableDesc.name)
s"SELECT $viewOutput FROM ($viewText) $viewName"
}

tableDesc.copy(schema = schema, viewText = Some(expandedText))
private def rebuildViewQueryString(sqlContext: SQLContext): Option[String] = {
val logicalPlan = if (tableDesc.schema.isEmpty) {
child
} else {
val projectList = childSchema.zip(tableDesc.schema).map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to call zip? We need to check the number of fields, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also have a test for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an invariant condition of CreateViewAsSelect that these two have the same size. Users can't construct a case that violates this condition, thus adding test for this might not be necessary. I'm for adding a check here though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case (attr, col) => Alias(attr, col.name)()
}
sqlContext.executePlan(Project(projectList, child)).analyzed
}
new SQLBuilder(logicalPlan, sqlContext).toSQL
}

// escape backtick with double-backtick in column name and wrap it with backtick.
private def verbose(name: String) = s"`${name.replaceAll("`", "``")}`"
private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {

// TODO Enable this
// Query plans transformed by DistinctAggregationRewriter are not recognized yet
ignore("distinct and non-distinct aggregation") {
ignore("multi-distinct columns") {
checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM t2 GROUP BY a")
}

Expand Down
Loading