Skip to content

Commit f5a5196

Browse files
committed
undo removal of renderJson from MasterPage, since there is no substitute yet
1 parent db61211 commit f5a5196

File tree

3 files changed

+94
-33
lines changed

3 files changed

+94
-33
lines changed

core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateR
2424
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
2525
import org.apache.spark.deploy.worker.ExecutorRunner
2626

27-
private[spark] object JsonProtocol {
27+
private[deploy] object JsonProtocol {
2828
def writeWorkerInfo(obj: WorkerInfo): JObject = {
2929
("id" -> obj.id) ~
3030
("host" -> obj.host) ~
@@ -67,6 +67,27 @@ private[spark] object JsonProtocol {
6767
("appdesc" -> writeApplicationDescription(obj.appDesc))
6868
}
6969

70+
def writeDriverInfo(obj: DriverInfo): JObject = {
71+
("id" -> obj.id) ~
72+
("starttime" -> obj.startTime.toString) ~
73+
("state" -> obj.state.toString) ~
74+
("cores" -> obj.desc.cores) ~
75+
("memory" -> obj.desc.mem)
76+
}
77+
78+
def writeMasterState(obj: MasterStateResponse): JObject = {
79+
("url" -> obj.uri) ~
80+
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
81+
("cores" -> obj.workers.map(_.cores).sum) ~
82+
("coresused" -> obj.workers.map(_.coresUsed).sum) ~
83+
("memory" -> obj.workers.map(_.memory).sum) ~
84+
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
85+
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
86+
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
87+
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
88+
("status" -> obj.status.toString)
89+
}
90+
7091
def writeWorkerState(obj: WorkerStateResponse): JObject = {
7192
("id" -> obj.workerId) ~
7293
("masterurl" -> obj.masterUrl) ~
@@ -78,5 +99,4 @@ private[spark] object JsonProtocol {
7899
("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
79100
("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
80101
}
81-
82102
}

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,15 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
3535
private val master = parent.masterActorRef
3636
private val timeout = parent.timeout
3737

38-
3938
def getMasterState: MasterStateResponse = {
4039
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
4140
Await.result(stateFuture, timeout)
4241
}
4342

43+
override def renderJson(request: HttpServletRequest): JValue = {
44+
JsonProtocol.writeMasterState(getMasterState)
45+
}
46+
4447
def handleAppKillRequest(request: HttpServletRequest): Unit = {
4548
handleKillRequest(request, id => {
4649
parent.master.idToApp.get(id).foreach { app =>
@@ -69,21 +72,17 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
6972
/** Index view listing applications and executors */
7073
def render(request: HttpServletRequest): Seq[Node] = {
7174
val state = getMasterState
72-
val workerHeaders = Seq("Worker Id", "Address", "State", "Cores", "Memory")
7375

76+
val workerHeaders = Seq("Worker Id", "Address", "State", "Cores", "Memory")
7477
val workers = state.workers.sortBy(_.id)
7578
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
7679

77-
val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
78-
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
80+
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
81+
"User", "State", "Duration")
7982
val activeApps = state.activeApps.sortBy(_.startTime).reverse
80-
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)
81-
82-
val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
83-
"Submitted Time", "User", "State", "Duration")
83+
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
8484
val completedApps = state.completedApps.sortBy(_.endTime).reverse
85-
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
86-
completedApps)
85+
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
8786

8887
val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
8988
"Memory", "Main Class")
@@ -190,7 +189,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
190189
</tr>
191190
}
192191

193-
private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
192+
private def appRow(app: ApplicationInfo): Seq[Node] = {
194193
val killLink = if (parent.killEnabled &&
195194
(app.state == ApplicationState.RUNNING || app.state == ApplicationState.WAITING)) {
196195
val killLinkUri = s"app/kill?id=${app.id}&terminate=true"
@@ -200,7 +199,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
200199
(<a href={killLinkUri} onclick={confirm}>kill</a>)
201200
</span>
202201
}
203-
204202
<tr>
205203
<td>
206204
<a href={"app?appId=" + app.id}>{app.id}</a>
@@ -209,15 +207,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
209207
<td>
210208
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
211209
</td>
212-
{
213-
if (active) {
214-
<td>
215-
{app.coresGranted}
216-
</td>
217-
}
218-
}
219210
<td>
220-
{if (app.requestedCores == Int.MaxValue) "*" else app.requestedCores}
211+
{app.coresGranted}
221212
</td>
222213
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
223214
{Utils.megabytesToString(app.desc.memoryPerSlave)}
@@ -229,14 +220,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
229220
</tr>
230221
}
231222

232-
private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
233-
appRow(app, active = true)
234-
}
235-
236-
private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
237-
appRow(app, active = false)
238-
}
239-
240223
private def driverRow(driver: DriverInfo): Seq[Node] = {
241224
val killLink = if (parent.killEnabled &&
242225
(driver.state == DriverState.RUNNING ||

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,24 @@ import org.json4s._
2525
import org.json4s.jackson.JsonMethods
2626
import org.scalatest.FunSuite
2727

28-
import org.apache.spark.{JsonTestUtils, SparkConf}
29-
import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
28+
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
29+
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
3030
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
31+
import org.apache.spark.SparkConf
3132

32-
class JsonProtocolSuite extends FunSuite with JsonTestUtils {
33+
class JsonProtocolSuite extends FunSuite {
34+
35+
test("writeApplicationInfo") {
36+
val output = JsonProtocol.writeApplicationInfo(createAppInfo())
37+
assertValidJson(output)
38+
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appInfoJsonStr))
39+
}
40+
41+
test("writeWorkerInfo") {
42+
val output = JsonProtocol.writeWorkerInfo(createWorkerInfo())
43+
assertValidJson(output)
44+
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerInfoJsonStr))
45+
}
3346

3447
test("writeApplicationDescription") {
3548
val output = JsonProtocol.writeApplicationDescription(createAppDesc())
@@ -43,6 +56,26 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils {
4356
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr))
4457
}
4558

59+
test("writeDriverInfo") {
60+
val output = JsonProtocol.writeDriverInfo(createDriverInfo())
61+
assertValidJson(output)
62+
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.driverInfoJsonStr))
63+
}
64+
65+
test("writeMasterState") {
66+
val workers = Array(createWorkerInfo(), createWorkerInfo())
67+
val activeApps = Array(createAppInfo())
68+
val completedApps = Array[ApplicationInfo]()
69+
val activeDrivers = Array(createDriverInfo())
70+
val completedDrivers = Array(createDriverInfo())
71+
val stateResponse = new MasterStateResponse(
72+
"host", 8080, None, workers, activeApps, completedApps,
73+
activeDrivers, completedDrivers, RecoveryState.ALIVE)
74+
val output = JsonProtocol.writeMasterState(stateResponse)
75+
assertValidJson(output)
76+
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr))
77+
}
78+
4679
test("writeWorkerState") {
4780
val executors = List[ExecutorRunner]()
4881
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner())
@@ -60,6 +93,13 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils {
6093
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
6194
}
6295

96+
def createAppInfo() : ApplicationInfo = {
97+
val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
98+
"id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
99+
appInfo.endTime = JsonConstants.currTimeInMillis
100+
appInfo
101+
}
102+
63103
def createDriverCommand() = new Command(
64104
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
65105
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
@@ -68,6 +108,15 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils {
68108
def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
69109
false, createDriverCommand())
70110

111+
def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
112+
createDriverDesc(), new Date())
113+
114+
def createWorkerInfo(): WorkerInfo = {
115+
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
116+
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
117+
workerInfo
118+
}
119+
71120
def createExecutorRunner(): ExecutorRunner = {
72121
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
73122
"publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
@@ -87,6 +136,15 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils {
87136
}
88137
}
89138

139+
def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
140+
val Diff(c, a, d) = validateJson diff expectedJson
141+
val validatePretty = JsonMethods.pretty(validateJson)
142+
val expectedPretty = JsonMethods.pretty(expectedJson)
143+
val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
144+
assert(c === JNothing, s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}")
145+
assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}")
146+
assert(d === JNothing, s"$errorMessage\nDelected:\n${JsonMethods.pretty(d)}")
147+
}
90148
}
91149

92150
object JsonConstants {

0 commit comments

Comments
 (0)