From a8fb1e0ad6573aa290b179cd9c1883abc62552be Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 19 Apr 2017 16:22:05 +0800 Subject: [PATCH 1/3] Change to only honor ACLs in event log download API Change-Id: I378c5c4082a7a2567eb108847e9eb637f62fe748 --- .../history/ApplicationHistoryProvider.scala | 4 +- .../spark/deploy/history/HistoryServer.scala | 8 ++++ .../spark/status/api/v1/ApiRootResource.scala | 18 +++++-- .../deploy/history/HistoryServerSuite.scala | 48 ++++++++++++++++--- 4 files changed, 67 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index d7d82800b8b55..6d8758a3d3b1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -86,7 +86,7 @@ private[history] abstract class ApplicationHistoryProvider { * @return Count of application event logs that are currently under process */ def getEventLogsUnderProcess(): Int = { - return 0; + 0 } /** @@ -95,7 +95,7 @@ private[history] abstract class ApplicationHistoryProvider { * @return 0 if this is undefined or unsupported, otherwise the last updated time in millis */ def getLastUpdatedTime(): Long = { - return 0; + 0 } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 54f39f7620e5d..36ab9277cffe0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -301,6 +301,14 @@ object HistoryServer extends Logging { logDebug(s"Clearing ${SecurityManager.SPARK_AUTH_CONF}") config.set(SecurityManager.SPARK_AUTH_CONF, "false") } + + if (config.getBoolean("spark.acls.enable", config.getBoolean("spark.ui.acls.enable", false))) { + logInfo(s"Either spark.acls.enable or spark.ui.acles.enable is configured, clearing it and " + + s"only honor spark.history.ui.acl.enable") + config.set("spark.acls.enable", "false") + config.set("spark.ui.acls.enable", "false") + } + new SecurityManager(config) } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index 00f918c09c66b..f52f8198d52f3 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -184,14 +184,27 @@ private[v1] class ApiRootResource extends ApiRequestContext { @Path("applications/{appId}/logs") def getEventLogs( @PathParam("appId") appId: String): EventLogDownloadResource = { - new EventLogDownloadResource(uiRoot, appId, None) + try { + // withSparkUI will throw NotFoundException if attemptId is existed for this application. + // So we need to try again with attempt id "1". + withSparkUI(appId, None) { _ => + new EventLogDownloadResource(uiRoot, appId, None) + } + } catch { + case _: NotFoundException => + withSparkUI(appId, Some("1")) { _ => + new EventLogDownloadResource(uiRoot, appId, None) + } + } } @Path("applications/{appId}/{attemptId}/logs") def getEventLogs( @PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): EventLogDownloadResource = { - new EventLogDownloadResource(uiRoot, appId, Some(attemptId)) + withSparkUI(appId, Some(attemptId)) { _ => + new EventLogDownloadResource(uiRoot, appId, Some(attemptId)) + } } @Path("version") @@ -291,7 +304,6 @@ private[v1] trait ApiRequestContext { case None => throw new NotFoundException("no such app: " + appId) } } - } private[v1] class ForbiddenException(msg: String) extends WebApplicationException( diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 764156c3edc41..d8d1b155fdbe4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -209,7 +209,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } // Test that the files are downloaded correctly, and validate them. - def doDownloadTest(appId: String, attemptId: Option[Int]): Unit = { + def doDownloadTest(appId: String, attemptId: Option[Int], user: String = null): Unit = { val url = attemptId match { case Some(id) => @@ -218,8 +218,13 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers new URL(s"${generateURL(s"applications/$appId")}/logs") } - val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url) - code should be (HttpServletResponse.SC_OK) + val headers = if (user != null) Seq(FakeAuthFilter.FAKE_HTTP_USER -> user) else Nil + val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url, headers) + if (code != HttpServletResponse.SC_OK) { + throw new IllegalStateException( + s"Return code $code is not equal to ${HttpServletResponse.SC_OK}") + } + inputStream should not be None error should be (None) @@ -565,8 +570,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers assert(jobcount === getNumJobs("/jobs")) // no need to retain the test dir now the tests complete - logDir.deleteOnExit(); - + logDir.deleteOnExit() } test("ui and api authorization checks") { @@ -602,6 +606,36 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } + test("acls with downloading files") { + val admin = "root" + val owner = "irashid" + val other = "alice" + val appId = "local-1430917381535" + + stop() + init("spark.ui.filters" -> classOf[FakeAuthFilter].getName(), + "spark.history.ui.acls.enable" -> "true", + "spark.history.ui.admin.acls" -> admin) + + doDownloadTest(appId, None, admin) + doDownloadTest(appId, None, owner) + intercept[IllegalStateException](doDownloadTest(appId, None, other)).getMessage should be ( + s"Return code ${HttpServletResponse.SC_FORBIDDEN} is not " + + s"equal to ${HttpServletResponse.SC_OK}") + + (1 to 2).foreach { attemptId => doDownloadTest(appId, Some(attemptId), admin) } + (1 to 2).foreach { attemptId => doDownloadTest(appId, Some(attemptId), owner) } + // Should throw exception, since user "alice" has no permission to access file, so it will + // return as an empty file. + (1 to 2).foreach { attemptId => + val exception = intercept[IllegalStateException]( + doDownloadTest(appId, Some(attemptId), other)) + exception.getMessage should be ( + s"Return code ${HttpServletResponse.SC_FORBIDDEN} is not " + + s"equal to ${HttpServletResponse.SC_OK}") + } + } + def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = { HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path")) } @@ -649,9 +683,11 @@ object HistoryServerSuite { (code, inString, errString) } - def connectAndGetInputStream(url: URL): (Int, Option[InputStream], Option[String]) = { + def connectAndGetInputStream(url: URL, + headers: Seq[(String, String)] = Nil): (Int, Option[InputStream], Option[String]) = { val connection = url.openConnection().asInstanceOf[HttpURLConnection] connection.setRequestMethod("GET") + headers.foreach { case (k, v) => connection.setRequestProperty(k, v) } connection.connect() val code = connection.getResponseCode() val inStream = try { From 68c9d83a48751e57988f09a46c8e61a073c7d582 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 20 Apr 2017 13:12:58 +0800 Subject: [PATCH 2/3] Address the comments Change-Id: I037137cd3125547fc408dfc69c73ca9b5d3b1ce5 --- .../spark/deploy/history/HistoryServer.scala | 4 +- .../spark/status/api/v1/ApiRootResource.scala | 2 +- .../deploy/history/HistoryServerSuite.scala | 56 ++++--------------- 3 files changed, 14 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 36ab9277cffe0..e2c2c621c658e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -303,8 +303,8 @@ object HistoryServer extends Logging { } if (config.getBoolean("spark.acls.enable", config.getBoolean("spark.ui.acls.enable", false))) { - logInfo(s"Either spark.acls.enable or spark.ui.acles.enable is configured, clearing it and " + - s"only honor spark.history.ui.acl.enable") + logInfo(s"Either spark.acls.enable or spark.ui.acls.enable is configured, clearing it and " + + s"only using spark.history.ui.acl.enable") config.set("spark.acls.enable", "false") config.set("spark.ui.acls.enable", "false") } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index f52f8198d52f3..f17b637754826 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -185,7 +185,7 @@ private[v1] class ApiRootResource extends ApiRequestContext { def getEventLogs( @PathParam("appId") appId: String): EventLogDownloadResource = { try { - // withSparkUI will throw NotFoundException if attemptId is existed for this application. + // withSparkUI will throw NotFoundException if attemptId exists for this application. // So we need to try again with attempt id "1". withSparkUI(appId, None) { _ => new EventLogDownloadResource(uiRoot, appId, None) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index d8d1b155fdbe4..95acb9a54440f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -209,7 +209,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } // Test that the files are downloaded correctly, and validate them. - def doDownloadTest(appId: String, attemptId: Option[Int], user: String = null): Unit = { + def doDownloadTest(appId: String, attemptId: Option[Int]): Unit = { val url = attemptId match { case Some(id) => @@ -218,13 +218,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers new URL(s"${generateURL(s"applications/$appId")}/logs") } - val headers = if (user != null) Seq(FakeAuthFilter.FAKE_HTTP_USER -> user) else Nil - val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url, headers) - if (code != HttpServletResponse.SC_OK) { - throw new IllegalStateException( - s"Return code $code is not equal to ${HttpServletResponse.SC_OK}") - } - + val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url) + code should be (HttpServletResponse.SC_OK) inputStream should not be None error should be (None) @@ -574,8 +569,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } test("ui and api authorization checks") { - val appId = "app-20161115172038-0000" - val owner = "jose" + val appId = "local-1430917381535" + val owner = "irashid" val admin = "root" val other = "alice" @@ -594,8 +589,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val port = server.boundPort val testUrls = Seq( - s"http://localhost:$port/api/v1/applications/$appId/jobs", - s"http://localhost:$port/history/$appId/jobs/") + s"http://localhost:$port/api/v1/applications/$appId/1/jobs", + s"http://localhost:$port/history/$appId/1/jobs/", + s"http://localhost:$port/api/v1/applications/$appId/logs", + s"http://localhost:$port/api/v1/applications/$appId/1/logs", + s"http://localhost:$port/api/v1/applications/$appId/2/logs") tests.foreach { case (user, expectedCode) => testUrls.foreach { url => @@ -606,36 +604,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } - test("acls with downloading files") { - val admin = "root" - val owner = "irashid" - val other = "alice" - val appId = "local-1430917381535" - - stop() - init("spark.ui.filters" -> classOf[FakeAuthFilter].getName(), - "spark.history.ui.acls.enable" -> "true", - "spark.history.ui.admin.acls" -> admin) - - doDownloadTest(appId, None, admin) - doDownloadTest(appId, None, owner) - intercept[IllegalStateException](doDownloadTest(appId, None, other)).getMessage should be ( - s"Return code ${HttpServletResponse.SC_FORBIDDEN} is not " + - s"equal to ${HttpServletResponse.SC_OK}") - - (1 to 2).foreach { attemptId => doDownloadTest(appId, Some(attemptId), admin) } - (1 to 2).foreach { attemptId => doDownloadTest(appId, Some(attemptId), owner) } - // Should throw exception, since user "alice" has no permission to access file, so it will - // return as an empty file. - (1 to 2).foreach { attemptId => - val exception = intercept[IllegalStateException]( - doDownloadTest(appId, Some(attemptId), other)) - exception.getMessage should be ( - s"Return code ${HttpServletResponse.SC_FORBIDDEN} is not " + - s"equal to ${HttpServletResponse.SC_OK}") - } - } - def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = { HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path")) } @@ -683,11 +651,9 @@ object HistoryServerSuite { (code, inString, errString) } - def connectAndGetInputStream(url: URL, - headers: Seq[(String, String)] = Nil): (Int, Option[InputStream], Option[String]) = { + def connectAndGetInputStream(url: URL): (Int, Option[InputStream], Option[String]) = { val connection = url.openConnection().asInstanceOf[HttpURLConnection] connection.setRequestMethod("GET") - headers.foreach { case (k, v) => connection.setRequestProperty(k, v) } connection.connect() val code = connection.getResponseCode() val inStream = try { From 4b3781ff6dce571130538a3f29a7e386f3e3fb9b Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 20 Apr 2017 13:25:12 +0800 Subject: [PATCH 3/3] Minor style changes Change-Id: Ia45d5aedab8c2a7bd65ccd842d29c5159de53604 --- .../scala/org/apache/spark/deploy/history/HistoryServer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index e2c2c621c658e..d9c8fda99ef97 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -303,8 +303,8 @@ object HistoryServer extends Logging { } if (config.getBoolean("spark.acls.enable", config.getBoolean("spark.ui.acls.enable", false))) { - logInfo(s"Either spark.acls.enable or spark.ui.acls.enable is configured, clearing it and " + - s"only using spark.history.ui.acl.enable") + logInfo("Either spark.acls.enable or spark.ui.acls.enable is configured, clearing it and " + + "only using spark.history.ui.acl.enable") config.set("spark.acls.enable", "false") config.set("spark.ui.acls.enable", "false") }