Skip to content

Commit 1100b40

Browse files
Ensure that Path does not appear in interfaces, by rafactoring interfaces.
1 parent 5a5f3e2 commit 1100b40

File tree

9 files changed

+178
-161
lines changed

9 files changed

+178
-161
lines changed

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import org.apache.hadoop.fs.Path
20+
import java.io.OutputStream
2121

22+
import org.apache.spark.SparkException
2223
import org.apache.spark.ui.SparkUI
2324

2425
private[spark] case class ApplicationAttemptInfo(
@@ -65,11 +66,11 @@ private[history] abstract class ApplicationHistoryProvider {
6566
def getConfig(): Map[String, String] = Map()
6667

6768
/**
68-
* Get the [[Path]]s to the Event log files. For legacy event log directories, directory path
69-
* itself is returned. The caller is responsible for listing the files and using them as needed.
70-
* If the attemptId is [[None]], event logs corresponding to all attempts for the given
71-
* application are downloaded as a single zip file.
69+
* Writes out the event logs to the output stream provided. The logs will be compressed into a
70+
* single zip file and written out.
71+
* @throws SparkException if the logs for the app id cannot be found.
7272
*/
73-
def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = Seq.empty
73+
@throws(classOf[SparkException])
74+
def writeEventLogs(appId: String, attemptId: Option[String], outputStream: OutputStream): Unit
7475

7576
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,18 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream}
20+
import java.io.{OutputStream, FileOutputStream, File, BufferedInputStream,
21+
FileNotFoundException, IOException, InputStream}
2122
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
2223

2324
import scala.collection.mutable
24-
import scala.collection.mutable.ArrayBuffer
25+
import scala.util.control.NonFatal
2526

2627
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
2728
import org.apache.hadoop.fs.{FileStatus, Path}
2829
import org.apache.hadoop.fs.permission.AccessControlException
2930

30-
import org.apache.spark.{Logging, SecurityManager, SparkConf}
31+
import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
3132
import org.apache.spark.deploy.SparkHadoopUtil
3233
import org.apache.spark.io.CompressionCodec
3334
import org.apache.spark.scheduler._
@@ -60,7 +61,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
6061
.map { d => Utils.resolveURI(d).toString }
6162
.getOrElse(DEFAULT_LOG_DIR)
6263

63-
private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
64+
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
65+
private val fs = Utils.getHadoopFileSystem(logDir, hadoopConf)
6466

6567
// Used by check event thread and clean log thread.
6668
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
@@ -220,16 +222,49 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
220222
}
221223
}
222224

223-
override def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = {
225+
override def writeEventLogs(
226+
appId: String,
227+
attemptId: Option[String],
228+
outputStream: OutputStream): Unit = {
224229

225-
val filePaths = new ArrayBuffer[Path]()
226-
applications.get(appId).foreach { appInfo =>
227-
// If no attempt is specified, or there is no attemptId for attempts, return all attempts
228-
appInfo.attempts.filter { attempt =>
229-
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
230-
}.foreach { attempt => filePaths += new Path(logDir, attempt.logPath) }
230+
applications.get(appId) match {
231+
case Some(appInfo) =>
232+
val dirsToClear = new mutable.ArrayBuffer[File]()
233+
try {
234+
// If no attempt is specified, or there is no attemptId for attempts, return all attempts
235+
val pathsToZip = appInfo.attempts.filter { attempt =>
236+
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
237+
}.map { attempt =>
238+
val logPath = new Path(logDir, attempt.logPath)
239+
if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
240+
val localDir = Utils.createTempDir()
241+
Utils.chmod700(localDir)
242+
dirsToClear += localDir
243+
val outputFile = new File(localDir, logPath.getName)
244+
val outputStream = new FileOutputStream(outputFile)
245+
val files = fs.listFiles(logPath, false)
246+
val paths = new mutable.ArrayBuffer[Path]()
247+
while (files.hasNext) {
248+
paths += files.next().getPath
249+
}
250+
Utils.zipFilesToStream(paths, hadoopConf, outputStream)
251+
new Path(outputFile.toURI)
252+
} else {
253+
new Path(logDir, attempt.logPath)
254+
}
255+
}
256+
Utils.zipFilesToStream(pathsToZip, hadoopConf, outputStream)
257+
} finally {
258+
dirsToClear.foreach { dir =>
259+
try {
260+
Utils.deleteRecursively(dir)
261+
} catch {
262+
case NonFatal(e) => logWarning(s"Error while attempting to delete $dir.")
263+
}
264+
}
265+
}
266+
case None => throw new SparkException(s"Logs for $appId not found.")
231267
}
232-
filePaths
233268
}
234269

235270

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.spark.deploy.history
1919

20+
import java.io.OutputStream
2021
import java.util.NoSuchElementException
2122
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
2223

2324
import com.google.common.cache._
24-
import org.apache.hadoop.fs.Path
2525
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
2626

2727
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -174,8 +174,11 @@ class HistoryServer(
174174
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
175175
}
176176

177-
override def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = {
178-
provider.getEventLogPaths(appId, attemptId)
177+
override def writeEventLogs(
178+
appId: String,
179+
attemptId: Option[String],
180+
outputStream: OutputStream): Unit = {
181+
provider.writeEventLogs(appId, attemptId, outputStream)
179182
}
180183

181184
/**

core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19+
import java.io.OutputStream
1920
import javax.servlet.ServletContext
2021
import javax.ws.rs._
2122
import javax.ws.rs.core.{Context, Response}
2223

2324
import com.sun.jersey.api.core.ResourceConfig
2425
import com.sun.jersey.spi.container.servlet.ServletContainer
25-
import org.apache.hadoop.fs
2626
import org.eclipse.jetty.server.handler.ContextHandler
2727
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
2828

@@ -165,13 +165,13 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
165165
}
166166
}
167167

168-
@Path("applications/{appId}/download")
168+
@Path("applications/{appId}/logs")
169169
def getEventLogs(
170170
@PathParam("appId") appId: String): EventLogDownloadResource = {
171171
new EventLogDownloadResource(uiRoot, appId, None)
172172
}
173173

174-
@Path("applications/{appId}/{attemptId}/download")
174+
@Path("applications/{appId}/{attemptId}/logs")
175175
def getEventLogs(
176176
@PathParam("appId") appId: String,
177177
@PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
@@ -205,7 +205,10 @@ private[spark] object ApiRootResource {
205205
private[spark] trait UIRoot {
206206
def getSparkUI(appKey: String): Option[SparkUI]
207207
def getApplicationInfoList: Iterator[ApplicationInfo]
208-
def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[fs.Path] = Seq.empty
208+
def writeEventLogs(
209+
appId: String,
210+
attemptId: Option[String],
211+
outputStream: OutputStream): Unit = { }
209212

210213
/**
211214
* Get the spark UI with the given appID, and apply a function

core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala

Lines changed: 19 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,15 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19-
import java.io.{File, FileOutputStream, OutputStream}
19+
import java.io.OutputStream
2020
import javax.ws.rs.{GET, Produces}
2121
import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
2222

23-
import scala.collection.mutable.ArrayBuffer
24-
25-
import org.apache.hadoop.fs.Path
23+
import scala.util.control.NonFatal
2624

2725
import org.apache.spark.{Logging, SparkConf}
2826
import org.apache.spark.deploy.SparkHadoopUtil
2927
import org.apache.spark.deploy.history.HistoryServer
30-
import org.apache.spark.util.Utils
3128

3229
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
3330
private[v1] class EventLogDownloadResource(
@@ -40,30 +37,29 @@ private[v1] class EventLogDownloadResource(
4037
def getEventLogs(): Response = {
4138
uIRoot match {
4239
case hs: HistoryServer =>
43-
var logsNotFound = false
44-
val fileName = {
45-
attemptId match {
46-
case Some(id) => s"eventLogs-$appId-$id.zip"
47-
case None => s"eventLogs-$appId.zip"
40+
try {
41+
val fileName = {
42+
attemptId match {
43+
case Some(id) => s"eventLogs-$appId-$id.zip"
44+
case None => s"eventLogs-$appId.zip"
45+
}
4846
}
49-
}
50-
val stream = new StreamingOutput {
51-
override def write(output: OutputStream): Unit = {
52-
val eventLogs = hs.getEventLogPaths(appId, attemptId)
53-
if (eventLogs.isEmpty) logsNotFound = true
54-
else zipLogFiles(eventLogs, output)
47+
48+
val stream = new StreamingOutput {
49+
override def write(output: OutputStream) = hs.writeEventLogs(appId, attemptId, output)
5550
}
56-
}
57-
if (logsNotFound) {
58-
Response.serverError()
59-
.entity(s"Event logs are not available for app: $appId.")
60-
.status(Response.Status.SERVICE_UNAVAILABLE)
61-
.build()
62-
} else {
51+
6352
Response.ok(stream)
6453
.header("Content-Disposition", s"attachment; filename=$fileName")
6554
.header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
6655
.build()
56+
57+
} catch {
58+
case NonFatal(e) =>
59+
Response.serverError()
60+
.entity(s"Event logs are not available for app: $appId.")
61+
.status(Response.Status.SERVICE_UNAVAILABLE)
62+
.build()
6763
}
6864
case _ =>
6965
Response.serverError()
@@ -72,39 +68,4 @@ private[v1] class EventLogDownloadResource(
7268
.build()
7369
}
7470
}
75-
76-
private def zipLogFiles(eventLogs: Seq[Path], output: OutputStream): Unit = {
77-
val areLegacyLogs = eventLogs.headOption.exists { path =>
78-
path.getFileSystem(conf).isDirectory(path)
79-
}
80-
val pathsToZip = if (areLegacyLogs) {
81-
new ArrayBuffer[Path]()
82-
} else {
83-
eventLogs
84-
}
85-
var tempDir: File = null
86-
try {
87-
if (areLegacyLogs) {
88-
tempDir = Utils.createTempDir()
89-
Utils.chmod700(tempDir)
90-
eventLogs.foreach { logPath =>
91-
// If the event logs are directories (legacy), then create a zip file for each
92-
// one and write each of these files to the eventual output.
93-
val fs = logPath.getFileSystem(conf)
94-
val logFiles = fs.listFiles(logPath, true)
95-
val zipFile = new File(tempDir, logPath.getName + ".zip")
96-
pathsToZip.asInstanceOf[ArrayBuffer[Path]] += new Path(zipFile.toURI)
97-
val outputStream = new FileOutputStream(zipFile)
98-
val paths = new ArrayBuffer[Path]()
99-
while (logFiles.hasNext) {
100-
paths += logFiles.next().getPath
101-
}
102-
Utils.zipFilesToStream(paths, conf, outputStream)
103-
}
104-
}
105-
Utils.zipFilesToStream(pathsToZip, conf, output)
106-
} finally {
107-
if (tempDir != null) Utils.deleteRecursively(tempDir)
108-
}
109-
}
11071
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -786,12 +786,6 @@ private[spark] object Utils extends Logging {
786786
files: Seq[Path],
787787
hadoopConf: Configuration,
788788
outputStream: OutputStream): Unit = {
789-
790-
// Passing in an output stream actually makes this more efficient since we don't have to
791-
// create an additional file to which the compressed data is written which has to be read
792-
// again by the reader, especially if the data needs to be sent over the wire via an
793-
// OutputStream - in which case the destination output stream can be directly passed in here.
794-
795789
val fs = FileSystem.get(hadoopConf)
796790
val buffer = new Array[Byte](64 * 1024)
797791
val zipStream = Some(new ZipOutputStream(outputStream))

0 commit comments

Comments
 (0)