Skip to content

Commit 68e7330

Browse files
committed
Delete Jar command
1 parent cada5be commit 68e7330

File tree

14 files changed

+228
-65
lines changed

14 files changed

+228
-65
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,12 @@ class SparkContext(config: SparkConf) extends Logging {
283283
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
284284
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
285285

286+
// this is used to maintain mapping forName to it's path. If the schema is `file`, then addedJars
287+
// will replace schema to `spark` as shown below, for more details check addLocalJarFile and
288+
// checkRemoteJarFile in addJar API
289+
// e.g. add jar /opt/somepath/some.jar => spark://11.242.157.133:36723/jars/some.jar
290+
private[spark] val jarsToPath = new ConcurrentHashMap[String, String]().asScala
291+
286292
// Keeps track of all persisted RDDs
287293
private[spark] val persistentRdds = {
288294
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
@@ -1905,6 +1911,7 @@ class SparkContext(config: SparkConf) extends Logging {
19051911
}
19061912
}
19071913
if (key != null) {
1914+
jarsToPath.putIfAbsent(new Path(path).getName, path)
19081915
val timestamp = System.currentTimeMillis
19091916
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
19101917
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
@@ -1917,11 +1924,31 @@ class SparkContext(config: SparkConf) extends Logging {
19171924
}
19181925
}
19191926

1927+
/**
1928+
* Removes the jar from the addedJars, so that next batch of the taskSet will get the updated jars
1929+
*/
1930+
def deleteJar(path: String): Unit = {
1931+
val uri = new URI(path)
1932+
val key = uri.getScheme match {
1933+
// this code is inline with addJar, key is generated based on the file schema,
1934+
// refer checkRemoteJarFile and addLocalJarFile implementation
1935+
case null | "file" =>
1936+
val fName = new File(uri.getRawPath).getName
1937+
env.rpcEnv.fileServer.deleteJar(fName)
1938+
s"${env.rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(fName)}"
1939+
case _ => uri.toString
1940+
}
1941+
addedJars.remove(key)
1942+
jarsToPath.remove(new Path(path).getName)
1943+
}
1944+
19201945
/**
19211946
* Returns a list of jar files that are added to resources.
19221947
*/
19231948
def listJars(): Seq[String] = addedJars.keySet.toSeq
19241949

1950+
private[spark] def getPath(jarName: String): Option[String] = jarsToPath.get(jarName)
1951+
19251952
/**
19261953
* When stopping SparkContext inside Spark components, it's easy to cause dead-lock since Spark
19271954
* may wait for some internal threads to finish. It's better to use this method to stop

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ private[spark] trait RpcEnvFileServer {
175175
*/
176176
def addJar(file: File): String
177177

178+
/**
179+
* Removes a jar from the RpcEnv.
180+
*/
181+
def deleteJar(jarName: String): Unit
182+
178183
/**
179184
* Adds a local directory to be served via this file server.
180185
*

core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,8 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
8888
s"${rpcEnv.address.toSparkURL}$fixedBaseUri"
8989
}
9090

91+
override def deleteJar(jarName: String): Unit = {
92+
jars.remove(jarName)
93+
}
94+
9195
}

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ statement
222222
multipartIdentifier partitionSpec? #loadData
223223
| TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable
224224
| MSCK REPAIR TABLE multipartIdentifier #repairTable
225-
| op=(ADD | LIST) identifier (STRING | .*?) #manageResource
225+
| op=(ADD | LIST | DELETE) identifier (STRING | .*?) #manageResource
226226
| SET ROLE .*? #failNativeCommand
227227
| SET .*? #setConfiguration
228228
| RESET #resetConfiguration

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
307307
* {{{
308308
* ADD (FILE[S] <filepath ...> | JAR[S] <jarpath ...>)
309309
* LIST (FILE[S] [filepath ...] | JAR[S] [jarpath ...])
310+
* DELETE (FILE[S] <filepath ...> | JAR[S] <jarpath ...>)
310311
* }}}
311312
*
312313
* Note that filepath/jarpath can be given as follows;
@@ -339,6 +340,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
339340
}
340341
case other => operationNotAllowed(s"LIST with resource type '$other'", ctx)
341342
}
343+
case SqlBaseParser.DELETE =>
344+
ctx.identifier.getText.toLowerCase(Locale.ROOT) match {
345+
case "jar" => DeleteJarCommand(mayebePaths)
346+
case other => operationNotAllowed(s"DELETE with resource type '$other'", ctx)
347+
}
342348
case _ => operationNotAllowed(s"Other types of operation on resources", ctx)
343349
}
344350
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20-
import java.io.File
20+
import java.io.{File, FileNotFoundException}
2121
import java.net.URI
2222

2323
import org.apache.hadoop.fs.Path
@@ -99,3 +99,20 @@ case class ListJarsCommand(jars: Seq[String] = Seq.empty[String]) extends Runnab
9999
}
100100
}
101101
}
102+
103+
/**
104+
* Deletes a jar from the current session, so it can be removed from the classPath
105+
*/
106+
case class DeleteJarCommand(path: String) extends RunnableCommand {
107+
108+
override def run(sparkSession: SparkSession): Seq[Row] = {
109+
val jarName = new Path(path).getName
110+
sparkSession.sparkContext.getPath(jarName) match {
111+
case Some(jarPath) =>
112+
sparkSession.sessionState.resourceLoader.deleteJar(jarPath)
113+
Seq.empty[Row]
114+
case None => throw new FileNotFoundException(s"${jarName} does not exists")
115+
}
116+
}
117+
}
118+

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.internal
1919

2020
import java.io.File
21+
import java.net.URL
2122

2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.fs.Path
@@ -164,6 +165,12 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade
164165
*/
165166
def addJar(path: String): Unit = {
166167
session.sparkContext.addJar(path)
168+
val jarURL: URL = getJarURL(path)
169+
session.sharedState.addJar(jarURL)
170+
Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
171+
}
172+
173+
private def getJarURL(path: String) = {
167174
val uri = new Path(path).toUri
168175
val jarURL = if (uri.getScheme == null) {
169176
// `path` is a local file path without a URL scheme
@@ -172,7 +179,16 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade
172179
// `path` is a URL with a scheme
173180
uri.toURL
174181
}
175-
session.sharedState.jarClassLoader.addURL(jarURL)
182+
jarURL
183+
}
184+
185+
/**
186+
* Deletes a jar from [[SparkContext]] and the classloader
187+
*/
188+
def deleteJar(path: String): Unit = {
189+
session.sparkContext.deleteJar(path)
190+
val jarURL: URL = getJarURL(path)
191+
session.sharedState.deleteJar(jarURL)
176192
Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
177193
}
178194
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.streaming.StreamExecution
3737
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
3838
import org.apache.spark.sql.internal.StaticSQLConf._
3939
import org.apache.spark.status.ElementTrackingStore
40-
import org.apache.spark.util.Utils
40+
import org.apache.spark.util.{MutableURLClassLoader, Utils}
4141

4242

4343
/**
@@ -183,11 +183,24 @@ private[sql] class SharedState(
183183
new GlobalTempViewManager(globalTempDB)
184184
}
185185

186+
private val parentClassLoader = Utils.getContextOrSparkClassLoader
186187
/**
187188
* A classloader used to load all user-added jar.
188189
*/
189-
val jarClassLoader = new NonClosableMutableURLClassLoader(
190-
org.apache.spark.util.Utils.getContextOrSparkClassLoader)
190+
private var closeableJarClassLoader = new MutableURLClassLoader(Array.empty, parentClassLoader)
191+
192+
def jarClassLoader: MutableURLClassLoader = closeableJarClassLoader
193+
194+
def addJar(jarURL: URL): Unit = synchronized {
195+
jarClassLoader.addURL(jarURL)
196+
}
197+
198+
def deleteJar(jarURL: URL): Unit = synchronized {
199+
val newJars = closeableJarClassLoader.getURLs.filter(!_.equals(jarURL))
200+
closeableJarClassLoader.close()
201+
closeableJarClassLoader = null
202+
closeableJarClassLoader = new MutableURLClassLoader(newJars, parentClassLoader)
203+
}
191204

192205
}
193206

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
363363
// scalastyle:off println
364364
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
365365
proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ListResourceProcessor] ||
366-
proc.isInstanceOf[ResetProcessor] ) {
366+
proc.isInstanceOf[ResetProcessor] || proc.isInstanceOf[DeleteResourceProcessor]) {
367367
val driver = new SparkSQLDriver
368368

369369
driver.init()

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,9 @@ class HiveSessionResourceLoader(
117117
client.addJar(path)
118118
super.addJar(path)
119119
}
120+
121+
override def deleteJar(path: String): Unit = {
122+
client.deleteJar(path)
123+
super.deleteJar(path)
124+
}
120125
}

0 commit comments

Comments
 (0)