Skip to content

Commit a48b91f

Browse files
Refactor to make attemptId optional in the API. Also added tests.
1 parent 0fc1424 commit a48b91f

File tree

13 files changed

+188
-86
lines changed

13 files changed

+188
-86
lines changed

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.{OutputStream, File}
21-
2220
import org.apache.hadoop.fs.Path
2321

2422
import org.apache.spark.ui.SparkUI
@@ -67,8 +65,11 @@ private[history] abstract class ApplicationHistoryProvider {
6765
def getConfig(): Map[String, String] = Map()
6866

6967
/**
70-
* Get the [[Path]]s to the Event log directories.
68+
* Get the [[Path]]s to the Event log files. For legacy event log directories, directory path
69+
* itself is returned. The caller is responsible for listing the files and using them as needed.
70+
* If the attemptId is [[None]], event logs corresponding to all attempts for the given
71+
* application are downloaded as a single zip file.
7172
*/
72-
def getEventLogPaths(appId: String, attemptId: String): Seq[Path] = Seq.empty
73+
def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = Seq.empty
7374

7475
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.{OutputStream, FileOutputStream, File, BufferedInputStream, FileNotFoundException, IOException, InputStream}
20+
import java.io.{File, FileOutputStream, BufferedInputStream, FileNotFoundException, IOException, InputStream}
2121
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
22-
import java.util.zip.{ZipEntry, ZipOutputStream}
2322

2423
import scala.collection.mutable
2524
import scala.collection.mutable.ArrayBuffer
2625

2726
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
28-
import org.apache.hadoop.fs.{FSDataInputStream, FileStatus, Path}
27+
import org.apache.hadoop.fs.{FileStatus, Path}
2928
import org.apache.hadoop.fs.permission.AccessControlException
3029

3130
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -221,27 +220,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
221220
}
222221
}
223222

224-
override def getEventLogPaths(
225-
appId: String,
226-
attemptId: String): Seq[Path] = {
227-
228-
var filePaths = new ArrayBuffer[Path]()
223+
override def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = {
229224

225+
val filePaths = new ArrayBuffer[Path]()
230226
applications.get(appId).foreach { appInfo =>
231-
appInfo.attempts.find { attempt =>
232-
if (attempt.attemptId.isDefined && attempt.attemptId.get == attemptId) true
233-
else false
234-
}.foreach { attempt =>
235-
val remotePath = new Path(logDir, attempt.logPath)
236-
if (isLegacyLogDirectory(fs.getFileStatus(remotePath))) {
237-
val filesIter = fs.listFiles(remotePath, true)
238-
while (filesIter.hasNext) {
239-
filePaths += filesIter.next().getPath
240-
}
241-
} else {
242-
filePaths += remotePath
243-
}
244-
}
227+
// If no attempt is specified, or there is no attemptId for attempts, return all attempts
228+
appInfo.attempts.filter { attempt =>
229+
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
230+
}.foreach { attempt => filePaths += new Path(logDir, attempt.logPath) }
245231
}
246232
filePaths
247233
}

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,10 @@ class HistoryServer(
174174
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
175175
}
176176

177-
def getEventLogPaths(
178-
appId: String,
179-
attemptId: String): Seq[Path] = {
177+
override def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[Path] = {
180178
provider.getEventLogPaths(appId, attemptId)
181179
}
182180

183-
184181
/**
185182
* Returns the provider configuration to show in the listing page.
186183
*

core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import javax.ws.rs.core.{Context, Response}
2222

2323
import com.sun.jersey.api.core.ResourceConfig
2424
import com.sun.jersey.spi.container.servlet.ServletContainer
25+
import org.apache.hadoop.fs
2526
import org.eclipse.jetty.server.handler.ContextHandler
2627
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
2728

@@ -204,6 +205,7 @@ private[spark] object ApiRootResource {
204205
private[spark] trait UIRoot {
205206
def getSparkUI(appKey: String): Option[SparkUI]
206207
def getApplicationInfoList: Iterator[ApplicationInfo]
208+
def getEventLogPaths(appId: String, attemptId: Option[String]): Seq[fs.Path] = Seq.empty
207209

208210
/**
209211
* Get the spark UI with the given appID, and apply a function

core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19-
import java.io.{BufferedInputStream, FileInputStream, OutputStream}
19+
import java.io.{File, FileOutputStream, BufferedInputStream, FileInputStream, OutputStream}
2020
import javax.ws.rs.{GET, Produces}
2121
import javax.ws.rs.core.{Response, StreamingOutput, MediaType}
2222

@@ -41,36 +41,17 @@ private[v1] class EventLogDownloadResource(
4141
uIRoot match {
4242
case hs: HistoryServer =>
4343
var logsNotFound = false
44-
val fileName: String = {
44+
val fileName = {
4545
attemptId match {
4646
case Some(id) => s"eventLogs-$appId-$id.zip"
4747
case None => s"eventLogs-$appId.zip"
4848
}
4949
}
5050
val stream = new StreamingOutput {
5151
override def write(output: OutputStream): Unit = {
52-
attemptId match {
53-
case Some(id) =>
54-
Utils.zipFilesToStream(hs.getEventLogPaths(appId, id), conf, output)
55-
case None =>
56-
val appInfo = hs.getApplicationInfoList.find(_.id == appId)
57-
appInfo match {
58-
case Some(info) =>
59-
val attempts = info.attempts
60-
val files = new ArrayBuffer[Path]
61-
attempts.foreach { attempt =>
62-
attempt.attemptId.foreach { attemptId =>
63-
logInfo(s"Attempt found: ${attemptId}")
64-
files ++= hs.getEventLogPaths(appId, attemptId)
65-
}
66-
}
67-
if (files.nonEmpty) {
68-
Utils.zipFilesToStream(files, conf, output)
69-
}
70-
case None => logsNotFound = true
71-
}
72-
}
73-
output.flush()
52+
val eventLogs = hs.getEventLogPaths(appId, attemptId)
53+
if (eventLogs.isEmpty) logsNotFound = true
54+
else zipLogFiles(eventLogs, output)
7455
}
7556
}
7657
if (logsNotFound) {
@@ -86,16 +67,44 @@ private[v1] class EventLogDownloadResource(
8667
}
8768
case _ =>
8869
Response.serverError()
89-
.entity("History Server is not running - cannot return event logs.")
70+
.entity("Event logs are only available through the history server.")
9071
.status(Response.Status.SERVICE_UNAVAILABLE)
9172
.build()
9273
}
9374
}
94-
}
95-
96-
private[v1] object EventLogDownloadResource {
9775

98-
def unapply(resource: EventLogDownloadResource): Option[(UIRoot, String)] = {
99-
Some((resource.uIRoot, resource.appId))
76+
private def zipLogFiles(eventLogs: Seq[Path], output: OutputStream): Unit = {
77+
val areLegacyLogs = eventLogs.headOption.exists { path =>
78+
path.getFileSystem(conf).isDirectory(path)
79+
}
80+
val pathsToZip = if (areLegacyLogs) {
81+
new ArrayBuffer[Path]()
82+
} else {
83+
eventLogs
84+
}
85+
var tempDir: File = null
86+
try {
87+
if (areLegacyLogs) {
88+
tempDir = Utils.createTempDir()
89+
Utils.chmod700(tempDir)
90+
eventLogs.foreach { logPath =>
91+
// If the event logs are directories (legacy), then create a zip file for each
92+
// one and write each of these files to the eventual output.
93+
val fs = logPath.getFileSystem(conf)
94+
val logFiles = fs.listFiles(logPath, true)
95+
val zipFile = new File(tempDir, logPath.getName + ".zip")
96+
pathsToZip.asInstanceOf[ArrayBuffer[Path]] += new Path(zipFile.toURI)
97+
val outputStream = new FileOutputStream(zipFile)
98+
val paths = new ArrayBuffer[Path]()
99+
while (logFiles.hasNext) {
100+
paths += logFiles.next().getPath
101+
}
102+
Utils.zipFilesToStream(paths, conf, outputStream)
103+
}
104+
}
105+
Utils.zipFilesToStream(pathsToZip, conf, output)
106+
} finally {
107+
if (tempDir != null) Utils.deleteRecursively(tempDir)
108+
}
100109
}
101110
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.lang.management.ManagementFactory
2222
import java.net._
2323
import java.nio.ByteBuffer
24-
import java.util.zip.{ZipOutputStream, ZipEntry}
24+
import java.util.zip.{ZipEntry, ZipOutputStream}
2525
import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
2626
import java.util.concurrent._
2727
import javax.net.ssl.HttpsURLConnection
@@ -794,23 +794,29 @@ private[spark] object Utils extends Logging {
794794

795795
val fs = FileSystem.get(hadoopConf)
796796
val buffer = new Array[Byte](64 * 1024)
797-
val zipStream = new ZipOutputStream(outputStream)
798-
files.foreach { remotePath =>
799-
val inputStream = fs.open(remotePath, 1 * 1024 * 1024) // 1MB Buffer
800-
zipStream.putNextEntry(new ZipEntry(remotePath.getName))
801-
var dataRemaining = true
802-
while (dataRemaining) {
803-
val length = inputStream.read(buffer)
804-
if (length != -1) {
805-
zipStream.write(buffer, 0, length)
806-
} else {
807-
dataRemaining = false
797+
val zipStream = Some(new ZipOutputStream(outputStream))
798+
try {
799+
files.foreach { remotePath =>
800+
val inputStream = Some(fs.open(remotePath, 1 * 1024 * 1024)) // 1MB Buffer
801+
try {
802+
zipStream.get.putNextEntry(new ZipEntry(remotePath.getName))
803+
var dataRemaining = true
804+
while (dataRemaining) {
805+
val length = inputStream.get.read(buffer)
806+
if (length != -1) {
807+
zipStream.get.write(buffer, 0, length)
808+
} else {
809+
dataRemaining = false
810+
}
811+
}
812+
zipStream.get.closeEntry()
813+
} finally {
814+
inputStream.foreach(_.close())
808815
}
809816
}
810-
zipStream.closeEntry()
811-
inputStream.close()
817+
} finally {
818+
zipStream.foreach(_.close())
812819
}
813-
zipStream.close()
814820
}
815821

816822
/**

core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,22 @@
4141
"sparkUser" : "irashid",
4242
"completed" : true
4343
} ]
44+
}, {
45+
"id" : "local-1430917381535",
46+
"name" : "Spark shell",
47+
"attempts" : [ {
48+
"attemptId" : "1",
49+
"startTime" : "2015-05-06T13:03:00.893GMT",
50+
"endTime" : "2015-02-03T16:42:46.912GMT",
51+
"sparkUser" : "irashid",
52+
"completed" : true
53+
}, {
54+
"attemptId" : "2",
55+
"startTime" : "2015-05-06T13:03:00.893GMT",
56+
"endTime" : "2015-02-03T16:42:46.912GMT",
57+
"sparkUser" : "irashid",
58+
"completed" : true
59+
} ]
4460
}, {
4561
"id" : "local-1422981759269",
4662
"name" : "Spark shell",

core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,22 @@
4141
"sparkUser" : "irashid",
4242
"completed" : true
4343
} ]
44+
}, {
45+
"id" : "local-1430917381535",
46+
"name" : "Spark shell",
47+
"attempts" : [ {
48+
"attemptId" : "1",
49+
"startTime" : "2015-05-06T13:03:00.893GMT",
50+
"endTime" : "2015-02-03T16:42:46.912GMT",
51+
"sparkUser" : "irashid",
52+
"completed" : true
53+
}, {
54+
"attemptId" : "2",
55+
"startTime" : "2015-05-06T13:03:00.893GMT",
56+
"endTime" : "2015-02-03T16:42:46.912GMT",
57+
"sparkUser" : "irashid",
58+
"completed" : true
59+
} ]
4460
}, {
4561
"id" : "local-1422981759269",
4662
"name" : "Spark shell",

core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,29 @@
2424
"completed" : true
2525
} ]
2626
}, {
27-
"id" : "local-1425081759269",
27+
"id": "local-1425081759269",
28+
"name": "Spark shell",
29+
"attempts": [
30+
{
31+
"startTime": "2015-02-28T00:02:38.277GMT",
32+
"endTime": "2015-02-28T00:02:46.912GMT",
33+
"sparkUser": "irashid",
34+
"completed": true
35+
}
36+
]
37+
}, {
38+
"id" : "local-1430917381535",
2839
"name" : "Spark shell",
2940
"attempts" : [ {
30-
"startTime" : "2015-02-28T00:02:38.277GMT",
31-
"endTime" : "2015-02-28T00:02:46.912GMT",
41+
"attemptId" : "1",
42+
"startTime" : "2015-05-06T13:03:00.893GMT",
43+
"endTime" : "2015-02-03T16:42:46.912GMT",
44+
"sparkUser" : "irashid",
45+
"completed" : true
46+
}, {
47+
"attemptId" : "2",
48+
"startTime" : "2015-05-06T13:03:00.893GMT",
49+
"endTime" : "2015-02-03T16:42:46.912GMT",
3250
"sparkUser" : "irashid",
3351
"completed" : true
3452
} ]
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
2+
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917381651}
3+
{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/irashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
4+
{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380893,"User":"irashid","App Attempt ID":"1"}
5+
{"Event":"SparkListenerApplicationEnd","Timestamp":1422981766912}

0 commit comments

Comments
 (0)