Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.internal

import java.io.File
import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
Expand Down Expand Up @@ -65,9 +67,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
hadoopConf
}

// Automatically extract `spark.sql.*` entries and put it in our SQLConf
setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))

lazy val experimentalMethods = new ExperimentalMethods

/**
Expand Down Expand Up @@ -150,6 +149,12 @@ private[sql] class SessionState(sparkSession: SparkSession) {
new ContinuousQueryManager(sparkSession)
}

private val jarClassLoader: NonClosableMutableURLClassLoader =
sparkSession.sharedState.jarClassLoader

// Automatically extract `spark.sql.*` entries and put it in our SQLConf
// We need to call it after all of vals have been initialized.
setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))

// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
Expand Down Expand Up @@ -180,6 +185,17 @@ private[sql] class SessionState(sparkSession: SparkSession) {

def addJar(path: String): Unit = {
sparkSession.sparkContext.addJar(path)

val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
// `path` is a local file path without a URL scheme
new File(path).toURI.toURL
} else {
// `path` is a URL with a scheme
uri.toURL
}
jarClassLoader.addURL(jarURL)
Thread.currentThread().setContextClassLoader(jarClassLoader)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@

package org.apache.spark.sql.hive.execution

import java.io.IOException
import java.net.URI
import java.text.SimpleDateFormat
import java.util
import java.util.{Date, Random}

import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}

import org.apache.spark.rdd.RDD
Expand All @@ -46,6 +54,61 @@ case class InsertIntoHiveTable(

def output: Seq[Attribute] = Seq.empty

val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")

private def executionId: String = {
val rand: Random = new Random
val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
return executionId
}

private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
val stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}
val dir: Path =
fs.makeQualified(
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
try {
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
fs.deleteOnExit(dir)
}
catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)

}
return dir
}

private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
}

def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
getExtTmpPathRelTo(path.getParent, hadoopConf)
} else {
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
}
}

def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
}

private def saveAsHiveFile(
rdd: RDD[InternalRow],
valueClass: Class[_],
Expand Down Expand Up @@ -81,7 +144,7 @@ case class InsertIntoHiveTable(
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val hadoopConf = sessionState.newHadoopConf()
val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation)
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed =
sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
Expand Down