Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
analysis.UnresolvedRelation(TableIdentifier(tableName)),
Map.empty, logicalPlan, overwrite, false)
Map.empty, logicalPlan, OverwriteOptions(overwrite), false)

def as(alias: String): LogicalPlan = logicalPlan match {
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
}
val overwrite = ctx.OVERWRITE != null
val overwritePartition =
if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) {
Some(partitionKeys.map(t => (t._1, t._2.get)))
} else {
None
}

InsertIntoTable(
UnresolvedRelation(tableIdent, None),
partitionKeys,
query,
ctx.OVERWRITE != null,
OverwriteOptions(overwrite, overwritePartition),
ctx.EXISTS != null)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTypes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -345,18 +346,32 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true)
}

/**
* Options for writing new data into a table.
*
* @param enabled whether to overwrite existing data in the table.
* @param specificPartition only data in the specified partition will be overwritten.
*/
case class OverwriteOptions(
enabled: Boolean,
specificPartition: Option[CatalogTypes.TablePartitionSpec] = None) {
if (specificPartition.isDefined) {
assert(enabled, "Overwrite must be enabled when specifying a partition to overwrite.")
}
}

case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean,
overwrite: OverwriteOptions,
ifNotExists: Boolean)
extends LogicalPlan {

override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty

assert(overwrite || !ifNotExists)
assert(overwrite.enabled || !ifNotExists)
assert(partition.values.forall(_.nonEmpty) || !ifNotExists)

override lazy val resolved: Boolean = childrenResolved && table.resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,16 @@ class PlanParserSuite extends PlanTest {
partition: Map[String, Option[String]],
overwrite: Boolean = false,
ifNotExists: Boolean = false): LogicalPlan =
InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
InsertIntoTable(
table("s"), partition, plan,
OverwriteOptions(
overwrite,
if (overwrite && partition.nonEmpty) {
Some(partition.map(kv => (kv._1, kv._2.get)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider dynamic partition here?

} else {
None
}),
ifNotExists)

// Single inserts
assertEqual(s"insert overwrite table s $sql",
Expand All @@ -196,9 +205,9 @@ class PlanParserSuite extends PlanTest {
val plan2 = table("t").where('x > 5).select(star())
assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
InsertIntoTable(
table("s"), Map.empty, plan.limit(1), overwrite = false, ifNotExists = false).union(
table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union(
InsertIntoTable(
table("u"), Map.empty, plan2, overwrite = false, ifNotExists = false)))
table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false)))
}

test ("insert with if not exists") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions, Union}
import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -259,7 +259,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
child = df.logicalPlan,
overwrite = mode == SaveMode.Overwrite,
overwrite = OverwriteOptions(mode == SaveMode.Overwrite),
ifNotExists = false)).toRdd
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ class CatalogFileIndex(
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
val path = new Path(p.storage.locationUri.get)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change? Doesn't new Path qualify the path string?

Copy link
Contributor Author

@ericl ericl Nov 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently not. The unit test actually fails if you do that, since the path seems to be missing the file: prefix and we fail to find the files in the partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we will qualify it before writing to it at here, doesn't it work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the user can store arbitrary string paths with ALTER TABLE PARTITIONS SET LOCATION. Therefore, we must manually qualify the locations that come from the catalog or else they might not necessarily match up with the paths read from the filesystem.

}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
new PrunedInMemoryFileIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
Expand Down Expand Up @@ -174,14 +176,32 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
}.flatten

val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
if (overwrite && inputPaths.contains(outputPath)) {
val mode = if (overwrite.enabled) SaveMode.Overwrite else SaveMode.Append
if (overwrite.enabled && inputPaths.contains(outputPath)) {
throw new AnalysisException(
"Cannot overwrite a path that is also being read from.")
}

val overwritingSinglePartition = (overwrite.specificPartition.isDefined &&
t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
l.catalogTable.get.partitionProviderIsHive)

val effectiveOutputPath = if (overwritingSinglePartition) {
val partition = t.sparkSession.sessionState.catalog.getPartition(
l.catalogTable.get.identifier, overwrite.specificPartition.get)
new Path(partition.storage.locationUri.get)
} else {
outputPath
}

val effectivePartitionSchema = if (overwritingSinglePartition) {
Nil
} else {
query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
}

def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
if (l.catalogTable.isDefined &&
if (l.catalogTable.isDefined && updatedPartitions.nonEmpty &&
l.catalogTable.get.partitionColumnNames.nonEmpty &&
l.catalogTable.get.partitionProviderIsHive) {
val metastoreUpdater = AlterTableAddPartitionCommand(
Expand All @@ -194,8 +214,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
}

val insertCmd = InsertIntoHadoopFsRelationCommand(
outputPath,
query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
effectiveOutputPath,
effectivePartitionSchema,
t.bucketSpec,
t.fileFormat,
refreshPartitionsCallback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OverwriteOptions}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.sources.InsertableRelation

Expand All @@ -30,7 +30,7 @@ import org.apache.spark.sql.sources.InsertableRelation
case class InsertIntoDataSourceCommand(
logicalRelation: LogicalRelation,
query: LogicalPlan,
overwrite: Boolean)
overwrite: OverwriteOptions)
extends RunnableCommand {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
Expand All @@ -40,7 +40,7 @@ case class InsertIntoDataSourceCommand(
val data = Dataset.ofRows(sparkSession, query)
// Apply the schema of the existing table to the new data.
val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)
relation.insert(df, overwrite.enabled)

// Invalidate the cache.
sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.InsertIntoTable(
table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil
InsertIntoHiveTable(
table, partition, planLater(child), overwrite.enabled, ifNotExists) :: Nil

case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, OverwriteOptions}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.MetastoreRelation

Expand Down Expand Up @@ -88,7 +88,8 @@ case class CreateHiveTableAsSelectCommand(
} else {
try {
sparkSession.sessionState.executePlan(InsertIntoTable(
metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
metastoreRelation, Map(), query, overwrite = OverwriteOptions(true),
ifNotExists = false)).toRdd
} catch {
case NonFatal(e) =>
// drop the created table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,56 @@ class PartitionProviderCompatibilitySuite
}
}
}

test("insert overwrite partition of legacy datasource table overwrites entire table") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable("test", dir)
spark.sql(
"""insert overwrite table test
|partition (partCol=1)
|select * from range(100)""".stripMargin)
assert(spark.sql("select * from test").count() == 100)

// Dynamic partitions case
spark.sql("insert overwrite table test select id, id from range(10)".stripMargin)
assert(spark.sql("select * from test").count() == 10)
}
}
}
}

test("insert overwrite partition of new datasource table overwrites just partition") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable("test", dir)
sql("msck repair table test")
spark.sql(
"""insert overwrite table test
|partition (partCol=1)
|select * from range(100)""".stripMargin)
assert(spark.sql("select * from test").count() == 104)

// Test overwriting a partition that has a custom location
withTempDir { dir2 =>
sql(
s"""alter table test partition (partCol=1)
|set location '${dir2.getAbsolutePath}'""".stripMargin)
assert(sql("select * from test").count() == 4)
sql(
"""insert overwrite table test
|partition (partCol=1)
|select * from range(30)""".stripMargin)
sql(
"""insert overwrite table test
|partition (partCol=1)
|select * from range(20)""".stripMargin)
assert(sql("select * from test").count() == 24)
}
}
}
}
}
}