From a037917676a4eeb904276a32ff0b9c73bb7b6938 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Tue, 13 Jun 2017 14:40:57 +0800 Subject: [PATCH 1/2] [SPARK-20338]Spaces in spark.eventLog.dir are not correctly handled --- .../org/apache/spark/scheduler/EventLoggingListener.scala | 4 ++-- .../apache/spark/scheduler/EventLoggingListenerSuite.scala | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index f481436332249..35690b2783ad3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -96,8 +96,8 @@ private[spark] class EventLoggingListener( } val workingPath = logPath + IN_PROGRESS - val uri = new URI(workingPath) val path = new Path(workingPath) + val uri = path.toUri val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme val isDefaultLocal = defaultFs == null || defaultFs == "file" @@ -320,7 +320,7 @@ private[spark] object EventLoggingListener extends Logging { appId: String, appAttemptId: Option[String], compressionCodecName: Option[String] = None): String = { - val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId) + val base = new Path(logBaseDir).toString.stripSuffix("/") + "/" + sanitize(appId) val codec = compressionCodecName.map("." + _).getOrElse("") if (appAttemptId.isDefined) { base + "_" + sanitize(appAttemptId.get) + codec diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 4c3d0b102152c..619895a373d92 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -50,7 +50,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit private var testDirPath: Path = _ before { - testDir = Utils.createTempDir() + testDir = Utils.createTempDir(namePrefix = s"history log") testDir.deleteOnExit() testDirPath = new Path(testDir.getAbsolutePath()) } @@ -109,7 +109,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit test("Log overwriting") { val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None) - val logPath = new URI(logUri).getPath + val logPath = new Path(logUri).toUri.getPath // Create file before writing the event log new FileOutputStream(new File(logPath)).close() // Expected IOException, since we haven't enabled log overwrite. @@ -290,7 +290,7 @@ object EventLoggingListenerSuite { val conf = new SparkConf conf.set("spark.eventLog.enabled", "true") conf.set("spark.eventLog.testing", "true") - conf.set("spark.eventLog.dir", logDir.toUri.toString) + conf.set("spark.eventLog.dir", logDir.toString) compressionCodec.foreach { codec => conf.set("spark.eventLog.compress", "true") conf.set("spark.io.compression.codec", codec) From 536a4456637cc3b1db0445c61f4192520d27a9ef Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Fri, 16 Jun 2017 10:00:08 +0800 Subject: [PATCH 2/2] fix test suite --- .../apache/spark/deploy/history/FsHistoryProviderSuite.scala | 5 ++--- .../apache/spark/scheduler/EventLoggingListenerSuite.scala | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 9b3e4ec793825..7109146ece371 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.history import java.io._ -import java.net.URI import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} @@ -27,7 +26,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hdfs.DistributedFileSystem import org.json4s.jackson.JsonMethods._ import org.mockito.Matchers.any @@ -63,7 +62,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc codec: Option[String] = None): File = { val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else "" val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId) - val logPath = new URI(logUri).getPath + ip + val logPath = new Path(logUri).toUri.getPath + ip new File(logPath) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 619895a373d92..dca4f0ef25300 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.io.{File, FileOutputStream, InputStream, IOException} -import java.net.URI import scala.collection.mutable import scala.io.Source