Skip to content

Commit d2a86eb

Browse files
harishreedharansquito
authored andcommitted
[SPARK-7161] [HISTORY SERVER] Provide REST api to download event logs fro...
...m History Server This PR adds a new API that allows the user to download event logs for an application as a zip file. APIs have been added to download all logs for a given application or just for a specific attempt. This also add an additional method to the ApplicationHistoryProvider to get the raw files, zipped. Author: Hari Shreedharan <[email protected]> Closes #5792 from harishreedharan/eventlog-download and squashes the following commits: 221cc26 [Hari Shreedharan] Update docs with new API information. a131be6 [Hari Shreedharan] Fix style issues. 5528bd8 [Hari Shreedharan] Merge branch 'master' into eventlog-download 6e8156e [Hari Shreedharan] Simplify tests, use Guava stream copy methods. d8ddede [Hari Shreedharan] Remove unnecessary case in EventLogDownloadResource. ffffb53 [Hari Shreedharan] Changed interface to use zip stream. Added more tests. 1100b40 [Hari Shreedharan] Ensure that `Path` does not appear in interfaces, by rafactoring interfaces. 5a5f3e2 [Hari Shreedharan] Fix test ordering issue. 0b66948 [Hari Shreedharan] Minor formatting/import fixes. 4fc518c [Hari Shreedharan] Fix rat failures. a48b91f [Hari Shreedharan] Refactor to make attemptId optional in the API. Also added tests. 0fc1424 [Hari Shreedharan] File download now works for individual attempts and the entire application. 350d7e8 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into eventlog-download fd6ab00 [Hari Shreedharan] Fix style issues 32b7662 [Hari Shreedharan] Use UIRoot directly in ApiRootResource. Also, use `Response` class to set headers. 7b362b2 [Hari Shreedharan] Almost working. 3d18ebc [Hari Shreedharan] [WIP] Try getting the event log download to work.
1 parent d053a31 commit d2a86eb

File tree

14 files changed

+367
-19
lines changed

14 files changed

+367
-19
lines changed

.rat-excludes

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ local-1425081759269/*
8080
local-1426533911241/*
8181
local-1426633911242/*
8282
local-1430917381534/*
83+
local-1430917381535_1
84+
local-1430917381535_2
8385
DESCRIPTION
8486
NAMESPACE
8587
test_support/*

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.deploy.history
1919

20+
import java.util.zip.ZipOutputStream
21+
22+
import org.apache.spark.SparkException
2023
import org.apache.spark.ui.SparkUI
2124

2225
private[spark] case class ApplicationAttemptInfo(
@@ -62,4 +65,12 @@ private[history] abstract class ApplicationHistoryProvider {
6265
*/
6366
def getConfig(): Map[String, String] = Map()
6467

68+
/**
69+
* Writes out the event logs to the output stream provided. The logs will be compressed into a
70+
* single zip file and written out.
71+
* @throws SparkException if the logs for the app id cannot be found.
72+
*/
73+
@throws(classOf[SparkException])
74+
def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit
75+
6576
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream}
20+
import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
2121
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
22+
import java.util.zip.{ZipEntry, ZipOutputStream}
2223

2324
import scala.collection.mutable
2425

26+
import com.google.common.io.ByteStreams
2527
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
26-
import org.apache.hadoop.fs.{FileStatus, Path}
28+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2729
import org.apache.hadoop.fs.permission.AccessControlException
2830

29-
import org.apache.spark.{Logging, SecurityManager, SparkConf}
31+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
3032
import org.apache.spark.deploy.SparkHadoopUtil
3133
import org.apache.spark.io.CompressionCodec
3234
import org.apache.spark.scheduler._
@@ -59,7 +61,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
5961
.map { d => Utils.resolveURI(d).toString }
6062
.getOrElse(DEFAULT_LOG_DIR)
6163

62-
private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
64+
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
65+
private val fs = Utils.getHadoopFileSystem(logDir, hadoopConf)
6366

6467
// Used by check event thread and clean log thread.
6568
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
@@ -219,6 +222,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
219222
}
220223
}
221224

225+
override def writeEventLogs(
226+
appId: String,
227+
attemptId: Option[String],
228+
zipStream: ZipOutputStream): Unit = {
229+
230+
/**
231+
* This method compresses the files passed in, and writes the compressed data out into the
232+
* [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being
233+
* the name of the file being compressed.
234+
*/
235+
def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
236+
val fs = FileSystem.get(hadoopConf)
237+
val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
238+
try {
239+
outputStream.putNextEntry(new ZipEntry(entryName))
240+
ByteStreams.copy(inputStream, outputStream)
241+
outputStream.closeEntry()
242+
} finally {
243+
inputStream.close()
244+
}
245+
}
246+
247+
applications.get(appId) match {
248+
case Some(appInfo) =>
249+
try {
250+
// If no attempt is specified, or there is no attemptId for attempts, return all attempts
251+
appInfo.attempts.filter { attempt =>
252+
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
253+
}.foreach { attempt =>
254+
val logPath = new Path(logDir, attempt.logPath)
255+
// If this is a legacy directory, then add the directory to the zipStream and add
256+
// each file to that directory.
257+
if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
258+
val files = fs.listFiles(logPath, false)
259+
zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/"))
260+
zipStream.closeEntry()
261+
while (files.hasNext) {
262+
val file = files.next().getPath
263+
zipFileToStream(file, attempt.logPath + Path.SEPARATOR + file.getName, zipStream)
264+
}
265+
} else {
266+
zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
267+
}
268+
}
269+
} finally {
270+
zipStream.close()
271+
}
272+
case None => throw new SparkException(s"Logs for $appId not found.")
273+
}
274+
}
275+
276+
222277
/**
223278
* Replay the log files in the list and merge the list of old applications with new ones
224279
*/

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.deploy.history
1919

2020
import java.util.NoSuchElementException
21+
import java.util.zip.ZipOutputStream
2122
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
2223

2324
import com.google.common.cache._
@@ -173,6 +174,13 @@ class HistoryServer(
173174
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
174175
}
175176

177+
override def writeEventLogs(
178+
appId: String,
179+
attemptId: Option[String],
180+
zipStream: ZipOutputStream): Unit = {
181+
provider.writeEventLogs(appId, attemptId, zipStream)
182+
}
183+
176184
/**
177185
* Returns the provider configuration to show in the listing page.
178186
*

core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19+
import java.util.zip.ZipOutputStream
1920
import javax.servlet.ServletContext
2021
import javax.ws.rs._
2122
import javax.ws.rs.core.{Context, Response}
@@ -164,6 +165,18 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
164165
}
165166
}
166167

168+
@Path("applications/{appId}/logs")
169+
def getEventLogs(
170+
@PathParam("appId") appId: String): EventLogDownloadResource = {
171+
new EventLogDownloadResource(uiRoot, appId, None)
172+
}
173+
174+
@Path("applications/{appId}/{attemptId}/logs")
175+
def getEventLogs(
176+
@PathParam("appId") appId: String,
177+
@PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
178+
new EventLogDownloadResource(uiRoot, appId, Some(attemptId))
179+
}
167180
}
168181

169182
private[spark] object ApiRootResource {
@@ -193,6 +206,13 @@ private[spark] trait UIRoot {
193206
def getSparkUI(appKey: String): Option[SparkUI]
194207
def getApplicationInfoList: Iterator[ApplicationInfo]
195208

209+
def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit = {
210+
Response.serverError()
211+
.entity("Event logs are only available through the history server.")
212+
.status(Response.Status.SERVICE_UNAVAILABLE)
213+
.build()
214+
}
215+
196216
/**
197217
* Get the spark UI with the given appID, and apply a function
198218
* to it. If there is no such app, throw an appropriate exception
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.status.api.v1
18+
19+
import java.io.OutputStream
20+
import java.util.zip.ZipOutputStream
21+
import javax.ws.rs.{GET, Produces}
22+
import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
23+
24+
import scala.util.control.NonFatal
25+
26+
import org.apache.spark.{Logging, SparkConf}
27+
import org.apache.spark.deploy.SparkHadoopUtil
28+
29+
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
30+
private[v1] class EventLogDownloadResource(
31+
val uIRoot: UIRoot,
32+
val appId: String,
33+
val attemptId: Option[String]) extends Logging {
34+
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf)
35+
36+
@GET
37+
def getEventLogs(): Response = {
38+
try {
39+
val fileName = {
40+
attemptId match {
41+
case Some(id) => s"eventLogs-$appId-$id.zip"
42+
case None => s"eventLogs-$appId.zip"
43+
}
44+
}
45+
46+
val stream = new StreamingOutput {
47+
override def write(output: OutputStream) = {
48+
val zipStream = new ZipOutputStream(output)
49+
try {
50+
uIRoot.writeEventLogs(appId, attemptId, zipStream)
51+
} finally {
52+
zipStream.close()
53+
}
54+
55+
}
56+
}
57+
58+
Response.ok(stream)
59+
.header("Content-Disposition", s"attachment; filename=$fileName")
60+
.header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
61+
.build()
62+
} catch {
63+
case NonFatal(e) =>
64+
Response.serverError()
65+
.entity(s"Event logs are not available for app: $appId.")
66+
.status(Response.Status.SERVICE_UNAVAILABLE)
67+
.build()
68+
}
69+
}
70+
}

core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,22 @@
77
"sparkUser" : "irashid",
88
"completed" : true
99
} ]
10+
}, {
11+
"id" : "local-1430917381535",
12+
"name" : "Spark shell",
13+
"attempts" : [ {
14+
"attemptId" : "2",
15+
"startTime" : "2015-05-06T13:03:00.893GMT",
16+
"endTime" : "2015-05-06T13:03:00.950GMT",
17+
"sparkUser" : "irashid",
18+
"completed" : true
19+
}, {
20+
"attemptId" : "1",
21+
"startTime" : "2015-05-06T13:03:00.880GMT",
22+
"endTime" : "2015-05-06T13:03:00.890GMT",
23+
"sparkUser" : "irashid",
24+
"completed" : true
25+
} ]
1026
}, {
1127
"id" : "local-1426533911241",
1228
"name" : "Spark shell",

core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,22 @@
77
"sparkUser" : "irashid",
88
"completed" : true
99
} ]
10+
}, {
11+
"id" : "local-1430917381535",
12+
"name" : "Spark shell",
13+
"attempts" : [ {
14+
"attemptId" : "2",
15+
"startTime" : "2015-05-06T13:03:00.893GMT",
16+
"endTime" : "2015-05-06T13:03:00.950GMT",
17+
"sparkUser" : "irashid",
18+
"completed" : true
19+
}, {
20+
"attemptId" : "1",
21+
"startTime" : "2015-05-06T13:03:00.880GMT",
22+
"endTime" : "2015-05-06T13:03:00.890GMT",
23+
"sparkUser" : "irashid",
24+
"completed" : true
25+
} ]
1026
}, {
1127
"id" : "local-1426533911241",
1228
"name" : "Spark shell",

core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,22 @@
77
"sparkUser" : "irashid",
88
"completed" : true
99
} ]
10+
}, {
11+
"id" : "local-1430917381535",
12+
"name" : "Spark shell",
13+
"attempts" : [ {
14+
"attemptId" : "2",
15+
"startTime" : "2015-05-06T13:03:00.893GMT",
16+
"endTime" : "2015-05-06T13:03:00.950GMT",
17+
"sparkUser" : "irashid",
18+
"completed" : true
19+
}, {
20+
"attemptId" : "1",
21+
"startTime" : "2015-05-06T13:03:00.880GMT",
22+
"endTime" : "2015-05-06T13:03:00.890GMT",
23+
"sparkUser" : "irashid",
24+
"completed" : true
25+
} ]
1026
}, {
1127
"id" : "local-1426533911241",
1228
"name" : "Spark shell",
@@ -24,12 +40,14 @@
2440
"completed" : true
2541
} ]
2642
}, {
27-
"id" : "local-1425081759269",
28-
"name" : "Spark shell",
29-
"attempts" : [ {
30-
"startTime" : "2015-02-28T00:02:38.277GMT",
31-
"endTime" : "2015-02-28T00:02:46.912GMT",
32-
"sparkUser" : "irashid",
33-
"completed" : true
34-
} ]
43+
"id": "local-1425081759269",
44+
"name": "Spark shell",
45+
"attempts": [
46+
{
47+
"startTime": "2015-02-28T00:02:38.277GMT",
48+
"endTime": "2015-02-28T00:02:46.912GMT",
49+
"sparkUser": "irashid",
50+
"completed": true
51+
}
52+
]
3553
} ]
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
2+
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917380880}
3+
{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/irashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
4+
{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380880,"User":"irashid","App Attempt ID":"1"}
5+
{"Event":"SparkListenerApplicationEnd","Timestamp":1430917380890}

0 commit comments

Comments
 (0)