From c02e8ba1bf639a84c1d23ecb57d47fc646e186da Mon Sep 17 00:00:00 2001 From: liuxian Date: Tue, 23 Apr 2019 17:28:30 +0800 Subject: [PATCH] fix --- .../sql/hive/execution/SaveAsHiveFile.scala | 7 ++-- .../apache/spark/sql/hive/InsertSuite.scala | 32 +++++++++++++++++-- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 73b3f20c2cbe..22b1117df98e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -210,12 +210,11 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { stagingDir) } - private def getStagingDir( + private[hive] def getStagingDir( inputPath: Path, hadoopConf: Configuration, stagingDir: String): Path = { - val inputPathUri: URI = inputPath.toUri - val inputPathName: String = inputPathUri.getPath + val inputPathName: String = inputPath.toString val fs: FileSystem = inputPath.getFileSystem(hadoopConf) var stagingPathName: String = if (inputPathName.indexOf(stagingDir) == -1) { @@ -228,7 +227,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { // staging directory needs to avoid being deleted when users set hive.exec.stagingdir // under the table directory. if (isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { + !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + "directory.") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 510de3a7eab5..48d969c5ee9a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -19,12 +19,14 @@ package org.apache.spark.sql.hive import java.io.File -import org.scalatest.BeforeAndAfter +import org.apache.hadoop.fs.Path +import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkException import org.apache.spark.sql.{QueryTest, _} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable +import org.apache.spark.sql.hive.execution.InsertIntoHiveTable import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -36,7 +38,7 @@ case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter - with SQLTestUtils { + with SQLTestUtils with PrivateMethodTester { import spark.implicits._ override lazy val testData = spark.sparkContext.parallelize( @@ -550,6 +552,32 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } } + test("SPARK-27552: hive.exec.stagingdir is invalid on Windows OS") { + val conf = spark.sessionState.newHadoopConf() + val inputPath = new Path("/tmp/b/c") + var stagingDir = "tmp/b" + val saveHiveFile = InsertIntoHiveTable(null, Map.empty, null, false, false, null) + val getStagingDir = PrivateMethod[Path]('getStagingDir) + var path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + assert(path.toString.indexOf("/tmp/b_hive_") != -1) + + stagingDir = "tmp/b/c" + path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) + + stagingDir = "d/e" + path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) + + stagingDir = ".d/e" + path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + assert(path.toString.indexOf("/tmp/b/c/.d/e_hive_") != -1) + + stagingDir = "/tmp/c/" + path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + assert(path.toString.indexOf("/tmp/c_hive_") != -1) + } + test("insert overwrite to dir from hive metastore table") { withTempDir { dir => val path = dir.toURI.getPath