From dee4b9e107cc84f67640523fa277b31947661042 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 27 Oct 2016 14:44:04 -0700 Subject: [PATCH 01/10] Thu Oct 27 14:44:04 PDT 2016 --- .../org/apache/spark/sql/catalyst/dsl/package.scala | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 12 ++++++++++-- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 5 +++-- .../execution/command/createDataSourceTables.scala | 1 + .../execution/datasources/DataSourceStrategy.scala | 5 +++-- .../datasources/InsertIntoDataSourceCommand.scala | 6 +++--- .../org/apache/spark/sql/hive/HiveStrategies.scala | 3 ++- .../execution/CreateHiveTableAsSelectCommand.scala | 5 +++-- 9 files changed, 27 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 66e52ca68af19..e901683be6854 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -367,7 +367,7 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan = InsertIntoTable( analysis.UnresolvedRelation(TableIdentifier(tableName)), - Map.empty, logicalPlan, overwrite, false) + Map.empty, logicalPlan, OverwriteOptions(overwrite), false) def as(alias: String): LogicalPlan = logicalPlan match { case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 38e9bb6c162ad..987aaf3e1493b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -182,7 +182,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { UnresolvedRelation(tableIdent, None), partitionKeys, query, - ctx.OVERWRITE != null, + OverwriteOptions(ctx.OVERWRITE != null), ctx.EXISTS != null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index a48974c6322ad..51f898c9fd36a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -345,18 +345,26 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true) } +case class OverwriteOptions( + enabled: Boolean, + specificPartition: Option[Map[String, String]] = None) { + if (specificPartition.isDefined) { + assert(enabled) + } +} + case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], child: LogicalPlan, - overwrite: Boolean, + overwrite: OverwriteOptions, ifNotExists: Boolean) extends LogicalPlan { override def children: Seq[LogicalPlan] = child :: Nil override def output: Seq[Attribute] = Seq.empty - assert(overwrite || !ifNotExists) + assert(overwrite.enabled || !ifNotExists) assert(partition.values.forall(_.nonEmpty) || !ifNotExists) override lazy val resolved: Boolean = childrenResolved && table.resolved diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 7ff3522f547d3..e333d54633780 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions, Union} import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation} import org.apache.spark.sql.types.StructType @@ -259,7 +259,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { table = UnresolvedRelation(tableIdent), partition = Map.empty[String, Option[String]], child = df.logicalPlan, - overwrite = mode == SaveMode.Overwrite, + overwrite = OverwriteOptions(mode == SaveMode.Overwrite), ifNotExists = false)).toRdd } @@ -391,6 +391,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val createCmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) val cmd = if (tableDesc.partitionColumnNames.nonEmpty && df.sparkSession.sqlContext.conf.manageFilesourcePartitions) { + println("alter table recover partitions") // Need to recover partitions into the metastore so our saved data is visible. val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier) Union(createCmd, recoverPartitionCmd) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 2a9743130d4c4..7609be4a13ed6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -241,6 +241,7 @@ case class CreateDataSourceTableAsSelectCommand( result match { case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && sparkSession.sqlContext.conf.manageFilesourcePartitions => + println("alter table recover partitions from createDataSourceTables") // Need to recover partitions into the metastore so our saved data is visible. sparkSession.sessionState.executePlan( AlterTableRecoverPartitionsCommand(table.identifier)).toRdd diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f0bcf94eadc96..56b47ac0e4abb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -173,8 +173,9 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths }.flatten - val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - if (overwrite && inputPaths.contains(outputPath)) { + // TODO(ekl) overwrite partition? + val mode = if (overwrite.enabled) SaveMode.Overwrite else SaveMode.Append + if (overwrite.enabled && inputPaths.contains(outputPath)) { throw new AnalysisException( "Cannot overwrite a path that is also being read from.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index b2ff68a833fea..2eba1e9986acd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OverwriteOptions} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.sources.InsertableRelation @@ -30,7 +30,7 @@ import org.apache.spark.sql.sources.InsertableRelation case class InsertIntoDataSourceCommand( logicalRelation: LogicalRelation, query: LogicalPlan, - overwrite: Boolean) + overwrite: OverwriteOptions) extends RunnableCommand { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) @@ -40,7 +40,7 @@ case class InsertIntoDataSourceCommand( val data = Dataset.ofRows(sparkSession, query) // Apply the schema of the existing table to the new data. val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) - relation.insert(df, overwrite) + relation.insert(df, overwrite.enabled) // Invalidate the cache. sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 9d2930948d6ba..ce1e3eb1a5bc9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -46,7 +46,8 @@ private[hive] trait HiveStrategies { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.InsertIntoTable( table: MetastoreRelation, partition, child, overwrite, ifNotExists) => - InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil + InsertIntoHiveTable( + table, partition, planLater(child), overwrite.enabled, ifNotExists) :: Nil case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => val newTableDesc = if (tableDesc.storage.serde.isEmpty) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index ef5a5a001fb6f..cac43597aef21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, OverwriteOptions} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.MetastoreRelation @@ -88,7 +88,8 @@ case class CreateHiveTableAsSelectCommand( } else { try { sparkSession.sessionState.executePlan(InsertIntoTable( - metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd + metastoreRelation, Map(), query, overwrite = OverwriteOptions(true), + ifNotExists = false)).toRdd } catch { case NonFatal(e) => // drop the created table. From d99afacfc80e731d17af4ae25c548e45d948cd11 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 28 Oct 2016 14:20:28 -0700 Subject: [PATCH 02/10] Fri Oct 28 14:20:28 PDT 2016 --- .../sql/catalyst/parser/AstBuilder.scala | 12 ++++++++++- .../execution/datasources/DataSource.scala | 3 ++- .../datasources/DataSourceStrategy.scala | 13 +++++++++++- .../InsertIntoHadoopFsRelationCommand.scala | 21 ++++++++++++++----- 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 987aaf3e1493b..4f331fbb07c71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -177,12 +177,22 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " + "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) } + val overwrite = ctx.OVERWRITE != null + val overwritePartition = if (overwrite) { + if (partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) { + Some(partitionKeys.map(t => (t._1, t._2.get))) + } else { + None + } + } else { + None + } InsertIntoTable( UnresolvedRelation(tableIdent, None), partitionKeys, query, - OverwriteOptions(ctx.OVERWRITE != null), + OverwriteOptions(overwrite, overwritePartition), ctx.EXISTS != null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5b8f05a396241..41f2650247ec5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -531,7 +531,8 @@ case class DataSource( () => Unit, // No existing table needs to be refreshed. options, data.logicalPlan, - mode) + mode, + None) sparkSession.sessionState.executePlan(plan).toRdd // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 56b47ac0e4abb..8a799c26245a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable.ArrayBuffer +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -180,6 +182,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { "Cannot overwrite a path that is also being read from.") } + val overwritePartitionPath = if (overwrite.specificPartition.isDefined) { + val partition = t.sparkSession.sessionState.catalog.getPartition( + l.catalogTable.get.identifier, overwrite.specificPartition.get) + Some(new Path(partition.storage.locationUri.get)) + } else { + None + } + val insertCmd = InsertIntoHadoopFsRelationCommand( outputPath, query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver), @@ -188,7 +198,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { () => t.location.refresh(), t.options, query, - mode) + mode, + overwritePartitionPath) if (l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty && l.catalogTable.get.partitionProviderIsHive) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 22dbe7149531c..0d1e3234ef5ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -43,9 +43,14 @@ case class InsertIntoHadoopFsRelationCommand( refreshFunction: () => Unit, options: Map[String, String], @transient query: LogicalPlan, - mode: SaveMode) + mode: SaveMode, + overwritePartitionPath: Option[Path]) extends RunnableCommand { + if (overwritePartitionPath.isDefined) { + assert(mode == SaveMode.Overwrite) + } + override protected def innerChildren: Seq[LogicalPlan] = query :: Nil override def run(sparkSession: SparkSession): Seq[Row] = { @@ -66,13 +71,19 @@ case class InsertIntoHadoopFsRelationCommand( val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(s"path $qualifiedOutputPath already exists.") - case (SaveMode.Overwrite, true) => - if (!fs.delete(qualifiedOutputPath, true /* recursively */)) { + case (SaveMode.Overwrite, _) => + val pathToDelete = if (overwritePartitionPath.isDefined) { + overwritePartitionPath.get.makeQualified(fs.getUri, fs.getWorkingDirectory) + } else { + qualifiedOutputPath + } + println("Delete: " + pathToDelete) + if (fs.exists(pathToDelete) && !fs.delete(pathToDelete, true /* recursively */)) { throw new IOException(s"Unable to clear output " + - s"directory $qualifiedOutputPath prior to writing to it") + s"directory $pathToDelete prior to writing to it") } true - case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => + case (SaveMode.Append, _) | (SaveMode.ErrorIfExists, false) => true case (SaveMode.Ignore, exists) => !exists From 8da8b0f7f70291cd86bd0a103758c5f88f714a27 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 31 Oct 2016 14:29:58 -0700 Subject: [PATCH 03/10] wip --- .../plans/logical/basicLogicalOperators.scala | 8 +++- .../sql/catalyst/parser/PlanParserSuite.scala | 6 +-- .../apache/spark/sql/DataFrameWriter.scala | 1 - .../command/createDataSourceTables.scala | 1 - .../datasources/DataSourceStrategy.scala | 4 +- .../InsertIntoHadoopFsRelationCommand.scala | 1 - .../PartitionProviderCompatibilitySuite.scala | 43 +++++++++++++++++++ 7 files changed, 55 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 51f898c9fd36a..1db3a33257799 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -345,11 +345,17 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true) } +/** + * Options for writing new data into a table. + * + * @param enabled whether to overwrite existing data in the table. + * @param specificPartition only data in the specified partition will be overwritten. + */ case class OverwriteOptions( enabled: Boolean, specificPartition: Option[Map[String, String]] = None) { if (specificPartition.isDefined) { - assert(enabled) + assert(enabled, "Overwrite must be enabled when specifying a partition to overwrite.") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index ca86304d4d400..74b4e8077b3d2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -180,7 +180,7 @@ class PlanParserSuite extends PlanTest { partition: Map[String, Option[String]], overwrite: Boolean = false, ifNotExists: Boolean = false): LogicalPlan = - InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists) + InsertIntoTable(table("s"), partition, plan, OverwriteOptions(true), ifNotExists) // Single inserts assertEqual(s"insert overwrite table s $sql", @@ -196,9 +196,9 @@ class PlanParserSuite extends PlanTest { val plan2 = table("t").where('x > 5).select(star()) assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5", InsertIntoTable( - table("s"), Map.empty, plan.limit(1), overwrite = false, ifNotExists = false).union( + table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union( InsertIntoTable( - table("u"), Map.empty, plan2, overwrite = false, ifNotExists = false))) + table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false))) } test ("insert with if not exists") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 622a261e6fd90..700f4835ac89a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -392,7 +392,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd if (tableDesc.partitionColumnNames.nonEmpty && df.sparkSession.sqlContext.conf.manageFilesourcePartitions) { - println("alter table recover partitions") // Need to recover partitions into the metastore so our saved data is visible. df.sparkSession.sessionState.executePlan( AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 7609be4a13ed6..2a9743130d4c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -241,7 +241,6 @@ case class CreateDataSourceTableAsSelectCommand( result match { case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && sparkSession.sqlContext.conf.manageFilesourcePartitions => - println("alter table recover partitions from createDataSourceTables") // Need to recover partitions into the metastore so our saved data is visible. sparkSession.sessionState.executePlan( AlterTableRecoverPartitionsCommand(table.identifier)).toRdd diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 8a799c26245a3..fa6a29433bdc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -175,14 +175,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths }.flatten - // TODO(ekl) overwrite partition? val mode = if (overwrite.enabled) SaveMode.Overwrite else SaveMode.Append if (overwrite.enabled && inputPaths.contains(outputPath)) { throw new AnalysisException( "Cannot overwrite a path that is also being read from.") } - val overwritePartitionPath = if (overwrite.specificPartition.isDefined) { + val overwritePartitionPath = if (overwrite.specificPartition.isDefined && + l.catalogTable.get.partitionProviderIsHive) { val partition = t.sparkSession.sessionState.catalog.getPartition( l.catalogTable.get.identifier, overwrite.specificPartition.get) Some(new Path(partition.storage.locationUri.get)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 0d1e3234ef5ea..4aa8db350a7b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -77,7 +77,6 @@ case class InsertIntoHadoopFsRelationCommand( } else { qualifiedOutputPath } - println("Delete: " + pathToDelete) if (fs.exists(pathToDelete) && !fs.delete(pathToDelete, true /* recursively */)) { throw new IOException(s"Unable to clear output " + s"directory $pathToDelete prior to writing to it") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index 5f16960fb1496..e0c74dbac20ca 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -134,4 +134,47 @@ class PartitionProviderCompatibilitySuite } } } + + test("insert overwrite partition of legacy datasource table overwrites entire table") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") { + withTable("test") { + withTempDir { dir => + setupPartitionedDatasourceTable("test", dir) + spark.sql( + """insert overwrite table test + |partition (partCol=1) + |select * from range(100)""".stripMargin) + assert(spark.sql("select * from test").count() == 100) + } + } + } + } + + test("insert overwrite partition of new datasource table overwrites just partition") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { + withTable("test") { + withTempDir { dir => + setupPartitionedDatasourceTable("test", dir) + sql("msck repair table test") + spark.sql( + """insert overwrite table test + |partition (partCol=1) + |select * from range(100)""".stripMargin) + assert(spark.sql("select * from test").count() == 104) +// TODO(ekl) enable this test for SPARK-18184 +// withTempDir { dir2 => +// spark.sql( +// s"""alter table test partition (partCol=1) +// |set location '${dir2.getAbsolutePath}'""".stripMargin) +// assert(spark.sql("select * from test").count() == 4) +// spark.sql( +// """insert overwrite table test +// |partition (partCol=1) +// |select * from range(50)""".stripMargin) +// assert(spark.sql("select * from test").count() == 54) +// } + } + } + } + } } From d5a7bd3931dc6f3f1673d9edff0f0aa77ad9d138 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 31 Oct 2016 16:14:56 -0700 Subject: [PATCH 04/10] implement custom locations --- .../datasources/CatalogFileIndex.scala | 5 +++- .../InsertIntoHadoopFsRelationCommand.scala | 25 +++++++++-------- .../PartitionProviderCompatibilitySuite.scala | 27 ++++++++++--------- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 092aabc89a36c..443a2ec033a98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -67,7 +67,10 @@ class CatalogFileIndex( val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( table.identifier, filters) val partitions = selectedPartitions.map { p => - PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get) + val path = new Path(p.storage.locationUri.get) + val fs = path.getFileSystem(hadoopConf) + PartitionPath( + p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } val partitionSpec = PartitionSpec(partitionSchema, partitions) new PrunedInMemoryFileIndex( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 4aa8db350a7b9..98f28e0e04921 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -65,24 +65,23 @@ case class InsertIntoHadoopFsRelationCommand( val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) val fs = outputPath.getFileSystem(hadoopConf) - val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val qualifiedOutputPath = if (overwritePartitionPath.isDefined) { + overwritePartitionPath.get.makeQualified(fs.getUri, fs.getWorkingDirectory) + } else { + outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + } val pathExists = fs.exists(qualifiedOutputPath) val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(s"path $qualifiedOutputPath already exists.") - case (SaveMode.Overwrite, _) => - val pathToDelete = if (overwritePartitionPath.isDefined) { - overwritePartitionPath.get.makeQualified(fs.getUri, fs.getWorkingDirectory) - } else { - qualifiedOutputPath - } - if (fs.exists(pathToDelete) && !fs.delete(pathToDelete, true /* recursively */)) { + case (SaveMode.Overwrite, true) => + if (!fs.delete(qualifiedOutputPath, true /* recursively */)) { throw new IOException(s"Unable to clear output " + - s"directory $pathToDelete prior to writing to it") + s"directory $qualifiedOutputPath prior to writing to it") } true - case (SaveMode.Append, _) | (SaveMode.ErrorIfExists, false) => + case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => true case (SaveMode.Ignore, exists) => !exists @@ -99,7 +98,11 @@ case class InsertIntoHadoopFsRelationCommand( fileFormat, qualifiedOutputPath, hadoopConf, - partitionColumns, + if (overwritePartitionPath.isDefined) { + Nil // write directly to only this partition directory + } else { + partitionColumns + }, bucketSpec, refreshFunction, options, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index e0c74dbac20ca..2aa9c0a5cf066 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -161,18 +161,21 @@ class PartitionProviderCompatibilitySuite |partition (partCol=1) |select * from range(100)""".stripMargin) assert(spark.sql("select * from test").count() == 104) -// TODO(ekl) enable this test for SPARK-18184 -// withTempDir { dir2 => -// spark.sql( -// s"""alter table test partition (partCol=1) -// |set location '${dir2.getAbsolutePath}'""".stripMargin) -// assert(spark.sql("select * from test").count() == 4) -// spark.sql( -// """insert overwrite table test -// |partition (partCol=1) -// |select * from range(50)""".stripMargin) -// assert(spark.sql("select * from test").count() == 54) -// } + withTempDir { dir2 => + sql( + s"""alter table test partition (partCol=1) + |set location '${dir2.getAbsolutePath}'""".stripMargin) + assert(sql("select * from test").count() == 4) + sql( + """insert overwrite table test + |partition (partCol=1) + |select * from range(30)""".stripMargin) + sql( + """insert overwrite table test + |partition (partCol=1) + |select * from range(20)""".stripMargin) + assert(sql("select * from test").count() == 24) + } } } } From a833a19d20f6b891cf42cf4a0f544c72abb982cd Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 31 Oct 2016 16:32:16 -0700 Subject: [PATCH 05/10] Mon Oct 31 16:32:15 PDT 2016 --- .../sql/hive/PartitionProviderCompatibilitySuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index 2aa9c0a5cf066..ac435bf6195b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -145,6 +145,10 @@ class PartitionProviderCompatibilitySuite |partition (partCol=1) |select * from range(100)""".stripMargin) assert(spark.sql("select * from test").count() == 100) + + // Dynamic partitions case + spark.sql("insert overwrite table test select id, id from range(10)".stripMargin) + assert(spark.sql("select * from test").count() == 10) } } } @@ -161,6 +165,8 @@ class PartitionProviderCompatibilitySuite |partition (partCol=1) |select * from range(100)""".stripMargin) assert(spark.sql("select * from test").count() == 104) + + // Test overwriting a partition that has a custom location withTempDir { dir2 => sql( s"""alter table test partition (partCol=1) From 587b85e83f569ad8acae3de82c695b0fad4041db Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 31 Oct 2016 16:34:47 -0700 Subject: [PATCH 06/10] Mon Oct 31 16:34:47 PDT 2016 --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 4f331fbb07c71..ac1577b3abb4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -178,15 +178,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) } val overwrite = ctx.OVERWRITE != null - val overwritePartition = if (overwrite) { - if (partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) { + val overwritePartition = + if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) { Some(partitionKeys.map(t => (t._1, t._2.get))) } else { None } - } else { - None - } InsertIntoTable( UnresolvedRelation(tableIdent, None), From 669e6cce48bfe903f473f14f20e1857ababc27be Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 31 Oct 2016 16:41:40 -0700 Subject: [PATCH 07/10] Mon Oct 31 16:41:40 PDT 2016 --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index fa6a29433bdc0..fb45157dfb8ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -182,6 +182,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { } val overwritePartitionPath = if (overwrite.specificPartition.isDefined && + sparkSession.sessionState.conf.manageFilesourcePartitions && l.catalogTable.get.partitionProviderIsHive) { val partition = t.sparkSession.sessionState.catalog.getPartition( l.catalogTable.get.identifier, overwrite.specificPartition.get) From fec7c9e9df5fc7ceb1231fa71303fbf5a1a6b3d9 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 31 Oct 2016 17:09:36 -0700 Subject: [PATCH 08/10] Mon Oct 31 17:09:36 PDT 2016 --- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 3 ++- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 1db3a33257799..6ff89992961ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -349,7 +349,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { * Options for writing new data into a table. * * @param enabled whether to overwrite existing data in the table. - * @param specificPartition only data in the specified partition will be overwritten. + * @param specificPartition only data in the specified partition will be overwritten. The map is + * from partition keys to values (cast to string). */ case class OverwriteOptions( enabled: Boolean, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index fb45157dfb8ba..12ad63236321d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -182,7 +182,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { } val overwritePartitionPath = if (overwrite.specificPartition.isDefined && - sparkSession.sessionState.conf.manageFilesourcePartitions && + t.sparkSession.sessionState.conf.manageFilesourcePartitions && l.catalogTable.get.partitionProviderIsHive) { val partition = t.sparkSession.sessionState.catalog.getPartition( l.catalogTable.get.identifier, overwrite.specificPartition.get) From 07c67876c372369def5128ce919cbb74e4f0d30d Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 31 Oct 2016 17:13:37 -0700 Subject: [PATCH 09/10] Mon Oct 31 17:13:37 PDT 2016 --- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 6ff89992961ce..7a15c2285d584 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ @@ -349,12 +350,11 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { * Options for writing new data into a table. * * @param enabled whether to overwrite existing data in the table. - * @param specificPartition only data in the specified partition will be overwritten. The map is - * from partition keys to values (cast to string). + * @param specificPartition only data in the specified partition will be overwritten. */ case class OverwriteOptions( enabled: Boolean, - specificPartition: Option[Map[String, String]] = None) { + specificPartition: Option[CatalogTypes.TablePartitionSpec] = None) { if (specificPartition.isDefined) { assert(enabled, "Overwrite must be enabled when specifying a partition to overwrite.") } From 0daff7475e456754538e65b9f324773218f4f943 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 31 Oct 2016 19:28:16 -0700 Subject: [PATCH 10/10] Mon Oct 31 19:28:16 PDT 2016 --- .../spark/sql/catalyst/parser/PlanParserSuite.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 74b4e8077b3d2..7400f3430e99c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -180,7 +180,16 @@ class PlanParserSuite extends PlanTest { partition: Map[String, Option[String]], overwrite: Boolean = false, ifNotExists: Boolean = false): LogicalPlan = - InsertIntoTable(table("s"), partition, plan, OverwriteOptions(true), ifNotExists) + InsertIntoTable( + table("s"), partition, plan, + OverwriteOptions( + overwrite, + if (overwrite && partition.nonEmpty) { + Some(partition.map(kv => (kv._1, kv._2.get))) + } else { + None + }), + ifNotExists) // Single inserts assertEqual(s"insert overwrite table s $sql",