Skip to content

Commit 753c963

Browse files
author
Brennon York
committed
fixed merge conflicts and set preference to use the diff(other: VertexRDD[VD]) method
2 parents 2c678c6 + 9f603fc commit 753c963

File tree

66 files changed

+1441
-954
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1441
-954
lines changed

conf/metrics.properties.template

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,15 @@
122122

123123
#worker.sink.csv.unit=minutes
124124

125+
# Enable Slf4jSink for all instances by class name
126+
#*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink
127+
128+
# Polling period for Slf4JSink
129+
#*.sink.sl4j.period=1
130+
131+
#*.sink.sl4j.unit=minutes
132+
133+
125134
# Enable jvm source for instance master, worker, driver and executor
126135
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
127136

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ private[spark] class ApplicationInfo(
9090
}
9191
}
9292

93-
private val myMaxCores = desc.maxCores.getOrElse(defaultCores)
93+
val requestedCores = desc.maxCores.getOrElse(defaultCores)
9494

95-
def coresLeft: Int = myMaxCores - coresGranted
95+
def coresLeft: Int = requestedCores - coresGranted
9696

9797
private var _retryCount = 0
9898

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,16 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
5050
val workers = state.workers.sortBy(_.id)
5151
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
5252

53-
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
54-
"User", "State", "Duration")
53+
val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
54+
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
5555
val activeApps = state.activeApps.sortBy(_.startTime).reverse
56-
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
56+
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)
57+
58+
val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
59+
"Submitted Time", "User", "State", "Duration")
5760
val completedApps = state.completedApps.sortBy(_.endTime).reverse
58-
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
61+
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
62+
completedApps)
5963

6064
val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
6165
"Memory", "Main Class")
@@ -162,16 +166,23 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
162166
</tr>
163167
}
164168

165-
private def appRow(app: ApplicationInfo): Seq[Node] = {
169+
private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
166170
<tr>
167171
<td>
168172
<a href={"app?appId=" + app.id}>{app.id}</a>
169173
</td>
170174
<td>
171175
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
172176
</td>
177+
{
178+
if (active) {
179+
<td>
180+
{app.coresGranted}
181+
</td>
182+
}
183+
}
173184
<td>
174-
{app.coresGranted}
185+
{app.requestedCores}
175186
</td>
176187
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
177188
{Utils.megabytesToString(app.desc.memoryPerSlave)}
@@ -183,6 +194,14 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
183194
</tr>
184195
}
185196

197+
private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
198+
appRow(app, active = true)
199+
}
200+
201+
private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
202+
appRow(app, active = false)
203+
}
204+
186205
private def driverRow(driver: DriverInfo): Seq[Node] = {
187206
<tr>
188207
<td>{driver.id} </td>

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
134134
def driverRow(driver: DriverRunner): Seq[Node] = {
135135
<tr>
136136
<td>{driver.driverId}</td>
137-
<td>{driver.driverDesc.command.arguments(1)}</td>
137+
<td>{driver.driverDesc.command.arguments(2)}</td>
138138
<td>{driver.finalState.getOrElse(DriverState.RUNNING)}</td>
139139
<td sorttable_customkey={driver.driverDesc.cores.toString}>
140140
{driver.driverDesc.cores.toString}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.metrics.sink
19+
20+
import java.util.Properties
21+
import java.util.concurrent.TimeUnit
22+
23+
import com.codahale.metrics.{Slf4jReporter, MetricRegistry}
24+
25+
import org.apache.spark.SecurityManager
26+
import org.apache.spark.metrics.MetricsSystem
27+
28+
private[spark] class Slf4jSink(
29+
val property: Properties,
30+
val registry: MetricRegistry,
31+
securityMgr: SecurityManager)
32+
extends Sink {
33+
val SLF4J_DEFAULT_PERIOD = 10
34+
val SLF4J_DEFAULT_UNIT = "SECONDS"
35+
36+
val SLF4J_KEY_PERIOD = "period"
37+
val SLF4J_KEY_UNIT = "unit"
38+
39+
val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
40+
case Some(s) => s.toInt
41+
case None => SLF4J_DEFAULT_PERIOD
42+
}
43+
44+
val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
45+
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
46+
case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
47+
}
48+
49+
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
50+
51+
val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
52+
.convertDurationsTo(TimeUnit.MILLISECONDS)
53+
.convertRatesTo(TimeUnit.SECONDS)
54+
.build()
55+
56+
override def start() {
57+
reporter.start(pollPeriod, pollUnit)
58+
}
59+
60+
override def stop() {
61+
reporter.stop()
62+
}
63+
64+
override def report() {
65+
reporter.report()
66+
}
67+
}
68+

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
203203
for (stageId <- jobData.stageIds) {
204204
stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
205205
jobsUsingStage.remove(jobEnd.jobId)
206+
if (jobsUsingStage.isEmpty) {
207+
stageIdToActiveJobIds.remove(stageId)
208+
}
206209
stageIdToInfo.get(stageId).foreach { stageInfo =>
207210
if (stageInfo.submissionTime.isEmpty) {
208211
// if this stage is pending, it won't complete, so mark it as "skipped":

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ private[spark] object Utils extends Logging {
696696
try {
697697
val rootDir = new File(root)
698698
if (rootDir.exists || rootDir.mkdirs()) {
699-
val dir = createDirectory(root)
699+
val dir = createTempDir(root)
700700
chmod700(dir)
701701
Some(dir.getAbsolutePath)
702702
} else {

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
8888
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
8989
}
9090

91+
test("test clearing of stageIdToActiveJobs") {
92+
val conf = new SparkConf()
93+
conf.set("spark.ui.retainedStages", 5.toString)
94+
val listener = new JobProgressListener(conf)
95+
val jobId = 0
96+
val stageIds = 1 to 50
97+
// Start a job with 50 stages
98+
listener.onJobStart(createJobStartEvent(jobId, stageIds))
99+
for (stageId <- stageIds) {
100+
listener.onStageSubmitted(createStageStartEvent(stageId))
101+
}
102+
listener.stageIdToActiveJobIds.size should be > 0
103+
104+
// Complete the stages and job
105+
for (stageId <- stageIds) {
106+
listener.onStageCompleted(createStageEndEvent(stageId, failed = false))
107+
}
108+
listener.onJobEnd(createJobEndEvent(jobId, false))
109+
assertActiveJobsStateIsEmpty(listener)
110+
listener.stageIdToActiveJobIds.size should be (0)
111+
}
112+
91113
test("test LRU eviction of jobs") {
92114
val conf = new SparkConf()
93115
conf.set("spark.ui.retainedStages", 5.toString)

docs/_config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ kramdown:
1010

1111
include:
1212
- _static
13+
- _modules
1314

1415
# These allow the documentation to be updated with newer releases
1516
# of Spark, Scala, and Mesos.

docs/configuration.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,9 @@ Apart from these, the following properties are also available, and may be useful
955955
<td>5</td>
956956
<td>
957957
(Netty only) Seconds to wait between retries of fetches. The maximum delay caused by retrying
958-
is simply <code>maxRetries * retryWait</code>, by default 15 seconds.
958+
is simply <code>maxRetries * retryWait</code>. The default maximum delay is therefore
959+
15 seconds, because the default value of <code>maxRetries</code> is 3, and the default
960+
<code>retryWait</code> here is 5 seconds.
959961
</td>
960962
</tr>
961963
</table>

0 commit comments

Comments
 (0)