diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 63e0dc7e7e4b..b114d6c50437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.internal +import java.io.File import java.util.Properties import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql._ @@ -65,9 +67,6 @@ private[sql] class SessionState(sparkSession: SparkSession) { hadoopConf } - // Automatically extract `spark.sql.*` entries and put it in our SQLConf - setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf)) - lazy val experimentalMethods = new ExperimentalMethods /** @@ -150,6 +149,12 @@ private[sql] class SessionState(sparkSession: SparkSession) { new ContinuousQueryManager(sparkSession) } + private val jarClassLoader: NonClosableMutableURLClassLoader = + sparkSession.sharedState.jarClassLoader + + // Automatically extract `spark.sql.*` entries and put it in our SQLConf + // We need to call it after all of vals have been initialized. + setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf)) // ------------------------------------------------------ // Helper methods, partially leftover from pre-2.0 days @@ -180,6 +185,17 @@ private[sql] class SessionState(sparkSession: SparkSession) { def addJar(path: String): Unit = { sparkSession.sparkContext.addJar(path) + + val uri = new Path(path).toUri + val jarURL = if (uri.getScheme == null) { + // `path` is a local file path without a URL scheme + new File(path).toURI.toURL + } else { + // `path` is a URL with a scheme + uri.toURL + } + jarClassLoader.addURL(jarURL) + Thread.currentThread().setContextClassLoader(jarClassLoader) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index cba10caf9861..73ccec2ee0ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,11 +17,19 @@ package org.apache.spark.sql.hive.execution +import java.io.IOException +import java.net.URI +import java.text.SimpleDateFormat import java.util +import java.util.{Date, Random} import scala.collection.JavaConverters._ -import org.apache.hadoop.hive.ql.{Context, ErrorMsg} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.exec.TaskRunner +import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.spark.rdd.RDD @@ -46,6 +54,61 @@ case class InsertIntoHiveTable( def output: Seq[Attribute] = Seq.empty + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + + private def executionId: String = { + val rand: Random = new Random + val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS") + val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + return executionId + } + + private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { + val inputPathUri: URI = inputPath.toUri + val inputPathName: String = inputPathUri.getPath + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + val stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + val dir: Path = + fs.makeQualified( + new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + try { + if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") + } + fs.deleteOnExit(dir) + } + catch { + case e: IOException => + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) + + } + return dir + } + + private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = { + getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf) + } + + def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = { + val extURI: URI = path.toUri + if (extURI.getScheme == "viewfs") { + getExtTmpPathRelTo(path.getParent, hadoopConf) + } else { + new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000") + } + } + + def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = { + new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000 + } + private def saveAsHiveFile( rdd: RDD[InternalRow], valueClass: Class[_], @@ -81,7 +144,7 @@ case class InsertIntoHiveTable( val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation val hadoopConf = sessionState.newHadoopConf() - val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation) + val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean