diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index 5ee325008a5c..28b5a2ff1010 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -18,12 +18,12 @@ package org.apache.spark.repl import java.io.{ByteArrayOutputStream, InputStream} -import java.net.{URI, URL, URLEncoder} +import java.net.{HttpURLConnection, URI, URL, URLEncoder} import java.util.concurrent.{Executors, ExecutorService} import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils import org.apache.spark.util.ParentClassLoader @@ -37,7 +37,8 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ * Allows the user to specify if user class path should be first */ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader, - userClassPathFirst: Boolean) extends ClassLoader { + userClassPathFirst: Boolean) extends ClassLoader with Logging { + val uri = new URI(classUri) val directory = uri.getPath @@ -71,27 +72,64 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader } } + private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = { + val url: URL = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) { + val uri = new URI(classUri + "/" + urlEncode(pathInDirectory)) + Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager).toURL + } else { + new URL(classUri + "/" + urlEncode(pathInDirectory)) + } + val connection = url.openConnection().asInstanceOf[HttpURLConnection] + if (connection.asInstanceOf[HttpURLConnection].getResponseCode != 200) { + connection.disconnect() + throw new ClassNotFoundException(s"Class file not found at URL $url") + } else { + connection.getInputStream + } + } + + private def getClassFileInputStreamFromFileSystem(pathInDirectory: String): InputStream = { + val path = new Path(directory, pathInDirectory) + if (fileSystem.exists(path)) { + fileSystem.open(path) + } else { + throw new ClassNotFoundException(s"Class file not found at path $path") + } + } + def findClassLocally(name: String): Option[Class[_]] = { + val pathInDirectory = name.replace('.', '/') + ".class" + var inputStream: InputStream = null try { - val pathInDirectory = name.replace('.', '/') + ".class" - val inputStream = { + inputStream = { if (fileSystem != null) { - fileSystem.open(new Path(directory, pathInDirectory)) + getClassFileInputStreamFromFileSystem(pathInDirectory) } else { - if (SparkEnv.get.securityManager.isAuthenticationEnabled()) { - val uri = new URI(classUri + "/" + urlEncode(pathInDirectory)) - val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager) - newuri.toURL().openStream() - } else { - new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream() - } + getClassFileInputStreamFromHttpServer(pathInDirectory) } } val bytes = readAndTransformClass(name, inputStream) - inputStream.close() Some(defineClass(name, bytes, 0, bytes.length)) } catch { - case e: Exception => None + case cnfe: ClassNotFoundException => + // This is an expected exception due to not being able to find a class. + // Therefore, we'll log it at debug level to avoid polluting the logs with noise + logDebug(s"Class $name not found", cnfe) + None + case e: Exception => + // This is an unexpected exception; maybe something went wrong while processing the + // class file. Therefore, log an error: + logError(s"Exception while loading class $name", e) + None + } finally { + if (inputStream != null) { + try { + inputStream.close() + } catch { + case e: Exception => + logError("Exception while closing inputStream", e) + } + } } }