Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.
- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowNonemptyManagedTableLocation` restores the previous behavior. This option will be removed in Spark 3.0.

## Upgrading From Spark SQL 2.2 to 2.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ class SessionCatalog(
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
val tableIdentifier = TableIdentifier(table, Some(db))
validateName(table)

val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
Expand All @@ -298,15 +299,32 @@ class SessionCatalog(
makeQualifiedPath(tableDefinition.storage.locationUri.get)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
identifier = TableIdentifier(table, Some(db)))
identifier = tableIdentifier)
} else {
tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
tableDefinition.copy(identifier = tableIdentifier)
}

requireDbExists(db)
if (!ignoreIfExists) {
validateTableLocation(newTableDefinition)
}
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}

def validateTableLocation(table: CatalogTable): Unit = {
// SPARK-19724: the default location of a managed table should be non-existent or empty.
if (table.tableType == CatalogTableType.MANAGED && !conf.allowNonemptyManagedTableLocation) {
val tableLocation =
new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier)))
val fs = tableLocation.getFileSystem(hadoopConf)

if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) {
throw new AnalysisException(s"Can not create the managed table('${table.identifier}')" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can not -> Not allowed to

s". The associated location('${tableLocation.toString}') already exists.")
}
}
}

/**
* Alter the metadata of an existing metastore table identified by `tableDefinition`.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val ALLOW_NONEMPTY_MANAGED_TABLE_LOCATION =
buildConf("spark.sql.allowNonemptyManagedTableLocation")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.allowCreateManagedTableUsingNonemptyLocation

Also this should be an internal conf

.doc("When this option is set to true, creating managed tables with nonempty location " +
"is allowed. Otherwise, an analysis exception is thrown. ")
.booleanConf
.createWithDefault(false)

val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.executorQueueSize")
.internal()
Expand Down Expand Up @@ -1572,6 +1579,8 @@ class SQLConf extends Serializable with Logging {

def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)

def allowNonemptyManagedTableLocation: Boolean = getConf(ALLOW_NONEMPTY_MANAGED_TABLE_LOCATION)

def partitionOverwriteMode: PartitionOverwriteMode.Value =
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ case class CreateDataSourceTableAsSelectCommand(
sparkSession, table, table.storage.locationUri, child, SaveMode.Append, tableExists = true)
} else {
assert(table.schema.isEmpty)

sparkSession.sessionState.catalog.validateTableLocation(table)
val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
Some(sessionState.catalog.defaultTablePath(table.identifier))
} else {
Expand All @@ -181,7 +181,8 @@ case class CreateDataSourceTableAsSelectCommand(
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
schema = result.schema)
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
// Table location is already validated. No need to check it again during table creation.
sessionState.catalog.createTable(newTable, ignoreIfExists = true)

result match {
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import java.io.File

import scala.collection.mutable

import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -26,6 +28,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData.ArrayData
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils


/**
Expand Down Expand Up @@ -242,6 +245,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared

test("change stats after set location command") {
val table = "change_stats_set_location_table"
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier(table)))
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) {
withTable(table) {
Expand Down Expand Up @@ -269,6 +273,9 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
assert(fetched3.get.sizeInBytes == fetched1.get.sizeInBytes)
} else {
checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
// SPARK-19724: clean up the previous table location.
waitForTasksToFinish()
Utils.deleteRecursively(tableLoc)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {

private val escapedIdentifier = "`(.+)`".r

private def dataSource: String = {
if (isUsingHiveMetastore) {
"HIVE"
} else {
"PARQUET"
}
}
protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = table

private def normalizeSerdeProp(props: Map[String, String]): Map[String, String] = {
Expand Down Expand Up @@ -365,6 +372,66 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}

test("CTAS a managed table with the existing empty directory") {
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
try {
tableLoc.mkdir()
withTable("tab1") {
sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'")
checkAnswer(spark.table("tab1"), Row(1, "a"))
}
} finally {
waitForTasksToFinish()
Utils.deleteRecursively(tableLoc)
}
}

test("create a managed table with the existing empty directory") {
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
try {
tableLoc.mkdir()
withTable("tab1") {
sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}")
sql("INSERT INTO tab1 VALUES (1, 'a')")
checkAnswer(spark.table("tab1"), Row(1, "a"))
}
} finally {
waitForTasksToFinish()
Utils.deleteRecursively(tableLoc)
}
}

test("create a managed table with the existing non-empty directory") {
withTable("tab1") {
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
try {
// create an empty hidden file
tableLoc.mkdir()
val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage")
hiddenGarbageFile.createNewFile()
val exMsg = "Can not create the managed table('`tab1`'). The associated location"
val exMsgWithDefaultDB =
"Can not create the managed table('`default`.`tab1`'). The associated location"
var ex = intercept[AnalysisException] {
sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'")
}.getMessage
if (isUsingHiveMetastore) {
assert(ex.contains(exMsgWithDefaultDB))
} else {
assert(ex.contains(exMsg))
}

ex = intercept[AnalysisException] {
sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}")
}.getMessage
assert(ex.contains(exMsgWithDefaultDB))
} finally {
waitForTasksToFinish()
Utils.deleteRecursively(tableLoc)
}
}
}

private def checkSchemaInCreatedDataSourceTable(
path: File,
userSpecifiedSchema: Option[String],
Expand Down