Skip to content

Commit 32b7662

Browse files
Use UIRoot directly in ApiRootResource. Also, use Response class to set headers.
1 parent 7b362b2 commit 32b7662

File tree

4 files changed

+37
-77
lines changed

4 files changed

+37
-77
lines changed

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,9 @@ class HistoryServer(
173173
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
174174
}
175175

176-
def copyEventLogsToDirectory(appId: String, destDir: File): Unit = {
176+
def copyEventLogsToDirectory(appId: String, destDir: File): Boolean = {
177177
provider.copyApplicationEventLogs(appId, destDir)
178+
true
178179
}
179180

180181

core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19+
import java.io.File
1920
import javax.servlet.ServletContext
2021
import javax.ws.rs._
2122
import javax.ws.rs.core.{Context, Response}
@@ -168,11 +169,8 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
168169
def getEventLogs(
169170
@PathParam("appId") appId: String,
170171
@PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
171-
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
172-
new EventLogDownloadResource(ui, appId)
173-
}
172+
new EventLogDownloadResource(uiRoot, appId)
174173
}
175-
176174
}
177175

178176
private[spark] object ApiRootResource {

core/src/main/scala/org/apache/spark/status/api/v1/DownloadMessageWriter.scala

Lines changed: 0 additions & 44 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,59 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19-
import java.io.{BufferedInputStream, FileInputStream, OutputStream, File, InputStream}
20-
import javax.ws.rs.ext.Provider
19+
import java.io.{BufferedInputStream, FileInputStream, OutputStream}
2120
import javax.ws.rs.{GET, Produces}
22-
import javax.ws.rs.core.{StreamingOutput, MultivaluedMap, MediaType}
21+
import javax.ws.rs.core.{Response, StreamingOutput, MediaType}
2322

2423
import org.apache.spark.deploy.history.HistoryServer
2524
import org.apache.spark.util.Utils
2625

27-
26+
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
2827
private[v1] class EventLogDownloadResource(val uIRoot: UIRoot, val appId: String) {
2928

30-
private def getErrorOutput(err: String): StreamingOutput = {
31-
new StreamingOutput {
32-
override def write(outputStream: OutputStream): Unit = {
33-
outputStream.write(
34-
s"File download not available for application : $appId due to $err".getBytes("utf-8"))
35-
}
36-
}
37-
}
38-
3929
@GET
40-
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
41-
def getEventLogs(): StreamingOutput = {
30+
def getEventLogs(): Response = {
4231
uIRoot match {
4332
case hs: HistoryServer =>
4433
val dir = Utils.createTempDir()
4534
Utils.chmod700(dir)
4635
hs.copyEventLogsToDirectory(appId, dir)
4736
dir.listFiles().headOption.foreach { file =>
48-
return new StreamingOutput {
37+
val stream = new StreamingOutput {
4938
override def write(output: OutputStream): Unit = {
50-
val inStream = new BufferedInputStream(new FileInputStream(file))
51-
val buffer = new Array[Byte](1024 * 1024)
52-
var dataRemains = true
53-
while (dataRemains) {
54-
val read = inStream.read(buffer)
55-
if (read > 0) {
56-
output.write(buffer, 0, read)
57-
} else {
58-
dataRemains = false
39+
try {
40+
val inStream = new BufferedInputStream(new FileInputStream(file))
41+
val buffer = new Array[Byte](1024 * 1024)
42+
var dataRemains = true
43+
while (dataRemains) {
44+
val read = inStream.read(buffer)
45+
if (read > 0) {
46+
output.write(buffer, 0, read)
47+
} else {
48+
dataRemains = false
49+
}
5950
}
51+
output.flush()
52+
} finally {
53+
Utils.deleteRecursively(dir)
6054
}
61-
output.flush()
6255
}
6356
}
57+
return Response.ok(stream)
58+
.header("Content-Length", file.length().toString)
59+
.header("Content-Disposition", s"attachment; filename=${file.getName}")
60+
.header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
61+
.build()
6462
}
65-
getErrorOutput("No files in dir.")
66-
case _ => getErrorOutput("hs not history server")
63+
Response.serverError()
64+
.entity(s"Event logs for $appId not found.")
65+
.status(Response.Status.NOT_FOUND)
66+
.build()
67+
case _ =>
68+
Response.serverError()
69+
.entity("History Server is not running - cannot return event logs.")
70+
.status(Response.Status.SERVICE_UNAVAILABLE)
71+
.build()
6772
}
6873
}
6974
}

0 commit comments

Comments
 (0)