Skip to content

Commit 27984d0

Browse files
committed
Merge remote-tracking branch 'origin/master' into countDistinctPartial
2 parents 57ae3b1 + 66ade00 commit 27984d0

File tree

50 files changed

+946
-314
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+946
-314
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ private[spark] class Master(
697697
appIdToUI(app.id) = ui
698698
webUi.attachSparkUI(ui)
699699
// Application UI is successfully rebuilt, so link the Master UI to it
700-
app.desc.appUiUrl = ui.basePath
700+
app.desc.appUiUrl = ui.getBasePath
701701
true
702702
} catch {
703703
case e: Exception =>

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ private[spark] class Worker(
7272
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
7373

7474
val testing: Boolean = sys.props.contains("spark.testing")
75-
val masterLock: Object = new Object()
7675
var master: ActorSelection = null
7776
var masterAddress: Address = null
7877
var activeMasterUrl: String = ""
@@ -145,18 +144,16 @@ private[spark] class Worker(
145144
}
146145

147146
def changeMaster(url: String, uiUrl: String) {
148-
masterLock.synchronized {
149-
activeMasterUrl = url
150-
activeMasterWebUiUrl = uiUrl
151-
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
152-
masterAddress = activeMasterUrl match {
153-
case Master.sparkUrlRegex(_host, _port) =>
154-
Address("akka.tcp", Master.systemName, _host, _port.toInt)
155-
case x =>
156-
throw new SparkException("Invalid spark URL: " + x)
157-
}
158-
connected = true
147+
activeMasterUrl = url
148+
activeMasterWebUiUrl = uiUrl
149+
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
150+
masterAddress = activeMasterUrl match {
151+
case Master.sparkUrlRegex(_host, _port) =>
152+
Address("akka.tcp", Master.systemName, _host, _port.toInt)
153+
case x =>
154+
throw new SparkException("Invalid spark URL: " + x)
159155
}
156+
connected = true
160157
}
161158

162159
def tryRegisterAllMasters() {
@@ -199,9 +196,7 @@ private[spark] class Worker(
199196
}
200197

201198
case SendHeartbeat =>
202-
masterLock.synchronized {
203-
if (connected) { master ! Heartbeat(workerId) }
204-
}
199+
if (connected) { master ! Heartbeat(workerId) }
205200

206201
case WorkDirCleanup =>
207202
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
@@ -244,27 +239,21 @@ private[spark] class Worker(
244239
manager.start()
245240
coresUsed += cores_
246241
memoryUsed += memory_
247-
masterLock.synchronized {
248-
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
249-
}
242+
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
250243
} catch {
251244
case e: Exception => {
252245
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
253246
if (executors.contains(appId + "/" + execId)) {
254247
executors(appId + "/" + execId).kill()
255248
executors -= appId + "/" + execId
256249
}
257-
masterLock.synchronized {
258-
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
259-
}
250+
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
260251
}
261252
}
262253
}
263254

264255
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
265-
masterLock.synchronized {
266-
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
267-
}
256+
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
268257
val fullId = appId + "/" + execId
269258
if (ExecutorState.isFinished(state)) {
270259
executors.get(fullId) match {
@@ -330,9 +319,7 @@ private[spark] class Worker(
330319
case _ =>
331320
logDebug(s"Driver $driverId changed state to $state")
332321
}
333-
masterLock.synchronized {
334-
master ! DriverStateChanged(driverId, state, exception)
335-
}
322+
master ! DriverStateChanged(driverId, state, exception)
336323
val driver = drivers.remove(driverId).get
337324
finishedDrivers(driverId) = driver
338325
memoryUsed -= driver.driverDesc.mem

core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
9595
* If the elements in RDD do not vary (max == min) always returns a single bucket.
9696
*/
9797
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
98-
// Compute the minimum and the maxium
98+
// Scala's built-in range has issues. See #SI-8782
99+
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
100+
val span = max - min
101+
Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
102+
}
103+
// Compute the minimum and the maximum
99104
val (max: Double, min: Double) = self.mapPartitions { items =>
100105
Iterator(items.foldRight(Double.NegativeInfinity,
101106
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
@@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
107112
throw new UnsupportedOperationException(
108113
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
109114
}
110-
val increment = (max-min)/bucketCount.toDouble
111-
val range = if (increment != 0) {
112-
Range.Double.inclusive(min, max, increment)
115+
val range = if (min != max) {
116+
// Range.Double.inclusive(min, max, increment)
117+
// The above code doesn't always work. See Scala bug #SI-8782.
118+
// https://issues.scala-lang.org/browse/SI-8782
119+
customRange(min, max, bucketCount)
113120
} else {
114121
List(min, min)
115122
}

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ private[spark] class SparkUI(
7676
}
7777
}
7878

79+
def getAppName = appName
80+
7981
/** Set the app name for this UI. */
8082
def setAppName(name: String) {
8183
appName = name
@@ -100,6 +102,13 @@ private[spark] class SparkUI(
100102
private[spark] def appUIAddress = s"http://$appUIHostPort"
101103
}
102104

105+
private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
106+
extends WebUITab(parent, prefix) {
107+
108+
def appName: String = parent.getAppName
109+
110+
}
111+
103112
private[spark] object SparkUI {
104113
val DEFAULT_PORT = 4040
105114
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -163,17 +163,15 @@ private[spark] object UIUtils extends Logging {
163163

164164
/** Returns a spark page with correctly formatted headers */
165165
def headerSparkPage(
166-
content: => Seq[Node],
167-
basePath: String,
168-
appName: String,
169166
title: String,
170-
tabs: Seq[WebUITab],
171-
activeTab: WebUITab,
167+
content: => Seq[Node],
168+
activeTab: SparkUITab,
172169
refreshInterval: Option[Int] = None): Seq[Node] = {
173170

174-
val header = tabs.map { tab =>
171+
val appName = activeTab.appName
172+
val header = activeTab.headerTabs.map { tab =>
175173
<li class={if (tab == activeTab) "active" else ""}>
176-
<a href={prependBaseUri(basePath, "/" + tab.prefix)}>{tab.name}</a>
174+
<a href={prependBaseUri(activeTab.basePath, "/" + tab.prefix)}>{tab.name}</a>
177175
</li>
178176
}
179177

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ private[spark] abstract class WebUI(
5050
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
5151
private val className = Utils.getFormattedClassName(this)
5252

53+
def getBasePath: String = basePath
5354
def getTabs: Seq[WebUITab] = tabs.toSeq
5455
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
5556
def getSecurityManager: SecurityManager = securityManager
@@ -135,6 +136,8 @@ private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
135136

136137
/** Get a list of header tabs from the parent UI. */
137138
def headerTabs: Seq[WebUITab] = parent.getTabs
139+
140+
def basePath: String = parent.getBasePath
138141
}
139142

140143

core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import scala.xml.Node
2424
import org.apache.spark.ui.{UIUtils, WebUIPage}
2525

2626
private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
27-
private val appName = parent.appName
28-
private val basePath = parent.basePath
2927
private val listener = parent.listener
3028

3129
def render(request: HttpServletRequest): Seq[Node] = {
@@ -45,7 +43,7 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("")
4543
<h4>Classpath Entries</h4> {classpathEntriesTable}
4644
</span>
4745

48-
UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent)
46+
UIUtils.headerSparkPage("Environment", content, parent)
4947
}
5048

5149
private def propertyHeader = Seq("Name", "Value")

core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.scheduler._
2222
import org.apache.spark.ui._
2323

24-
private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") {
25-
val appName = parent.appName
26-
val basePath = parent.basePath
24+
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
2725
val listener = new EnvironmentListener
2826

2927
attachPage(new EnvironmentPage(this))

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ private case class ExecutorSummaryInfo(
4343
maxMemory: Long)
4444

4545
private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
46-
private val appName = parent.appName
47-
private val basePath = parent.basePath
4846
private val listener = parent.listener
4947

5048
def render(request: HttpServletRequest): Seq[Node] = {
@@ -101,8 +99,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
10199
</div>
102100
</div>;
103101

104-
UIUtils.headerSparkPage(content, basePath, appName, "Executors (" + execInfo.size + ")",
105-
parent.headerTabs, parent)
102+
UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent)
106103
}
107104

108105
/** Render an HTML row representing an executor */

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ import org.apache.spark.ExceptionFailure
2323
import org.apache.spark.annotation.DeveloperApi
2424
import org.apache.spark.scheduler._
2525
import org.apache.spark.storage.StorageStatusListener
26-
import org.apache.spark.ui.{SparkUI, WebUITab}
26+
import org.apache.spark.ui.{SparkUI, SparkUITab}
2727

28-
private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") {
29-
val appName = parent.appName
30-
val basePath = parent.basePath
28+
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
3129
val listener = new ExecutorsListener(parent.storageStatusListener)
3230

3331
attachPage(new ExecutorsPage(this))

0 commit comments

Comments
 (0)