Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ private[spark] class EventLoggingListener(
}

val workingPath = logPath + IN_PROGRESS
val uri = new URI(workingPath)
val path = new Path(workingPath)
val uri = path.toUri
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

Expand Down Expand Up @@ -320,7 +320,7 @@ private[spark] object EventLoggingListener extends Logging {
appId: String,
appAttemptId: Option[String],
compressionCodecName: Option[String] = None): String = {
val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
val base = new Path(logBaseDir).toString.stripSuffix("/") + "/" + sanitize(appId)
val codec = compressionCodecName.map("." + _).getOrElse("")
if (appAttemptId.isDefined) {
base + "_" + sanitize(appAttemptId.get) + codec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy.history

import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.zip.{ZipInputStream, ZipOutputStream}
Expand All @@ -27,7 +26,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
Expand Down Expand Up @@ -63,7 +62,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
val logPath = new URI(logUri).getPath + ip
val logPath = new Path(logUri).toUri.getPath + ip
new File(logPath)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler

import java.io.{File, FileOutputStream, InputStream, IOException}
import java.net.URI

import scala.collection.mutable
import scala.io.Source
Expand Down Expand Up @@ -50,7 +49,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
private var testDirPath: Path = _

before {
testDir = Utils.createTempDir()
testDir = Utils.createTempDir(namePrefix = s"history log")
testDir.deleteOnExit()
testDirPath = new Path(testDir.getAbsolutePath())
}
Expand Down Expand Up @@ -109,7 +108,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit

test("Log overwriting") {
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
val logPath = new URI(logUri).getPath
val logPath = new Path(logUri).toUri.getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
// Expected IOException, since we haven't enabled log overwrite.
Expand Down Expand Up @@ -290,7 +289,7 @@ object EventLoggingListenerSuite {
val conf = new SparkConf
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.testing", "true")
conf.set("spark.eventLog.dir", logDir.toUri.toString)
conf.set("spark.eventLog.dir", logDir.toString)
compressionCodec.foreach { codec =>
conf.set("spark.eventLog.compress", "true")
conf.set("spark.io.compression.codec", codec)
Expand Down