Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala

// this is used to maintain mapping forName to it's path. If the schema is `file`, then addedJars
// will replace schema to `spark` as shown below, for more details check addLocalJarFile and
// checkRemoteJarFile in addJar API
// e.g. add jar /opt/somepath/some.jar => spark://11.242.157.133:36723/jars/some.jar
private[spark] val jarsToPath = new ConcurrentHashMap[String, String]().asScala

// Keeps track of all persisted RDDs
private[spark] val persistentRdds = {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
Expand Down Expand Up @@ -1905,6 +1911,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
}
if (key != null) {
jarsToPath.putIfAbsent(new Path(path).getName, path)
val timestamp = System.currentTimeMillis
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
Expand All @@ -1917,11 +1924,31 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

/**
* Removes the jar from the addedJars, so that next batch of the taskSet will get the updated jars
*/
def deleteJar(path: String): Unit = {
val uri = new URI(path)
val key = uri.getScheme match {
// this code is inline with addJar, key is generated based on the file schema,
// refer checkRemoteJarFile and addLocalJarFile implementation
case null | "file" =>
val fName = new File(uri.getRawPath).getName
env.rpcEnv.fileServer.deleteJar(fName)
s"${env.rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(fName)}"
case _ => uri.toString
}
addedJars.remove(key)
jarsToPath.remove(new Path(path).getName)
}

/**
* Returns a list of jar files that are added to resources.
*/
def listJars(): Seq[String] = addedJars.keySet.toSeq

private[spark] def getPath(jarName: String): Option[String] = jarsToPath.get(jarName)

/**
* When stopping SparkContext inside Spark components, it's easy to cause dead-lock since Spark
* may wait for some internal threads to finish. It's better to use this method to stop
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ private[spark] trait RpcEnvFileServer {
*/
def addJar(file: File): String

/**
* Removes a jar from the RpcEnv.
*/
def deleteJar(jarName: String): Unit

/**
* Adds a local directory to be served via this file server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,8 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
s"${rpcEnv.address.toSparkURL}$fixedBaseUri"
}

override def deleteJar(jarName: String): Unit = {
jars.remove(jarName)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ statement
multipartIdentifier partitionSpec? #loadData
| TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE multipartIdentifier #repairTable
| op=(ADD | LIST) identifier (STRING | .*?) #manageResource
| op=(ADD | LIST | DELETE) identifier (STRING | .*?) #manageResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
| RESET #resetConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* {{{
* ADD (FILE[S] <filepath ...> | JAR[S] <jarpath ...>)
* LIST (FILE[S] [filepath ...] | JAR[S] [jarpath ...])
* DELETE (FILE[S] <filepath ...> | JAR[S] <jarpath ...>)
* }}}
*
* Note that filepath/jarpath can be given as follows;
Expand Down Expand Up @@ -339,6 +340,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
case other => operationNotAllowed(s"LIST with resource type '$other'", ctx)
}
case SqlBaseParser.DELETE =>
ctx.identifier.getText.toLowerCase(Locale.ROOT) match {
case "jar" => DeleteJarCommand(mayebePaths)
case other => operationNotAllowed(s"DELETE with resource type '$other'", ctx)
}
case _ => operationNotAllowed(s"Other types of operation on resources", ctx)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.command

import java.io.File
import java.io.{File, FileNotFoundException}
import java.net.URI

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -99,3 +99,20 @@ case class ListJarsCommand(jars: Seq[String] = Seq.empty[String]) extends Runnab
}
}
}

/**
* Deletes a jar from the current session, so it can be removed from the classPath
*/
case class DeleteJarCommand(path: String) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val jarName = new Path(path).getName
sparkSession.sparkContext.getPath(jarName) match {
case Some(jarPath) =>
sparkSession.sessionState.resourceLoader.deleteJar(jarPath)
Seq.empty[Row]
case None => throw new FileNotFoundException(s"${jarName} does not exists")
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.internal

import java.io.File
import java.net.URL

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -164,6 +165,12 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade
*/
def addJar(path: String): Unit = {
session.sparkContext.addJar(path)
val jarURL: URL = getJarURL(path)
session.sharedState.addJar(jarURL)
Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
}

private def getJarURL(path: String) = {
val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
// `path` is a local file path without a URL scheme
Expand All @@ -172,7 +179,16 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade
// `path` is a URL with a scheme
uri.toURL
}
session.sharedState.jarClassLoader.addURL(jarURL)
jarURL
}

/**
* Deletes a jar from [[SparkContext]] and the classloader
*/
def deleteJar(path: String): Unit = {
session.sparkContext.deleteJar(path)
val jarURL: URL = getJarURL(path)
session.sharedState.deleteJar(jarURL)
Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.Utils
import org.apache.spark.util.{MutableURLClassLoader, Utils}


/**
Expand Down Expand Up @@ -183,11 +183,24 @@ private[sql] class SharedState(
new GlobalTempViewManager(globalTempDB)
}

private val parentClassLoader = Utils.getContextOrSparkClassLoader
/**
* A classloader used to load all user-added jar.
*/
val jarClassLoader = new NonClosableMutableURLClassLoader(
org.apache.spark.util.Utils.getContextOrSparkClassLoader)
private var closeableJarClassLoader = new MutableURLClassLoader(Array.empty, parentClassLoader)

def jarClassLoader: MutableURLClassLoader = closeableJarClassLoader

def addJar(jarURL: URL): Unit = synchronized {
jarClassLoader.addURL(jarURL)
}

def deleteJar(jarURL: URL): Unit = synchronized {
val newJars = closeableJarClassLoader.getURLs.filter(!_.equals(jarURL))
closeableJarClassLoader.close()
closeableJarClassLoader = null
closeableJarClassLoader = new MutableURLClassLoader(newJars, parentClassLoader)
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
// scalastyle:off println
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ListResourceProcessor] ||
proc.isInstanceOf[ResetProcessor] ) {
proc.isInstanceOf[ResetProcessor] || proc.isInstanceOf[DeleteResourceProcessor]) {
val driver = new SparkSQLDriver

driver.init()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,9 @@ class HiveSessionResourceLoader(
client.addJar(path)
super.addJar(path)
}

override def deleteJar(path: String): Unit = {
client.deleteJar(path)
super.deleteJar(path)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ private[hive] trait HiveClient {
/** Add a jar into class loader */
def addJar(path: String): Unit

/** Deletes a jar from class loader */
def deleteJar(path: String): Unit

/** Return a [[HiveClient]] as new session, that will share the class loader and Hive client */
def newSession(): HiveClient

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client

import java.io.{File, PrintStream}
import java.lang.{Iterable => JIterable}
import java.net.URL
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{Locale, Map => JMap}
import java.util.concurrent.TimeUnit._
Expand Down Expand Up @@ -930,6 +931,12 @@ private[hive] class HiveClientImpl(
}

def addJar(path: String): Unit = {
val jarURL: URL = getJarURL(path)
clientLoader.addJar(jarURL)
runSqlHive(s"ADD JAR $path")
}

private def getJarURL(path: String) = {
val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
// `path` is a local file path without a URL scheme
Expand All @@ -938,8 +945,13 @@ private[hive] class HiveClientImpl(
// `path` is a URL with a scheme
uri.toURL
}
clientLoader.addJar(jarURL)
runSqlHive(s"ADD JAR $path")
jarURL
}

override def deleteJar(path: String): Unit = {
val jarURL: URL = getJarURL(path)
clientLoader.deleteJar(jarURL)
runSqlHive(s"DELETE JAR $path")
}

def newSession(): HiveClientImpl = {
Expand Down
Loading