Skip to content

Commit 4df5be6

Browse files
Merge remote-tracking branch 'asf/master' into spark-sink-test
2 parents c9190d1 + cbfc26b commit 4df5be6

File tree

82 files changed

+1514
-296
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+1514
-296
lines changed

.travis.yml

Lines changed: 0 additions & 32 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.broadcast
1919

20-
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
21-
ObjectInputStream, ObjectOutputStream, OutputStream}
20+
import java.io._
2221

2322
import scala.reflect.ClassTag
2423
import scala.util.Random
@@ -53,10 +52,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](
5352

5453
private val broadcastId = BroadcastBlockId(id)
5554

56-
TorrentBroadcast.synchronized {
57-
SparkEnv.get.blockManager.putSingle(
58-
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
59-
}
55+
SparkEnv.get.blockManager.putSingle(
56+
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
6057

6158
@transient private var arrayOfBlocks: Array[TorrentBlock] = null
6259
@transient private var totalBlocks = -1
@@ -91,18 +88,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](
9188
// Store meta-info
9289
val metaId = BroadcastBlockId(id, "meta")
9390
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
94-
TorrentBroadcast.synchronized {
95-
SparkEnv.get.blockManager.putSingle(
96-
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
97-
}
91+
SparkEnv.get.blockManager.putSingle(
92+
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
9893

9994
// Store individual pieces
10095
for (i <- 0 until totalBlocks) {
10196
val pieceId = BroadcastBlockId(id, "piece" + i)
102-
TorrentBroadcast.synchronized {
103-
SparkEnv.get.blockManager.putSingle(
104-
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
105-
}
97+
SparkEnv.get.blockManager.putSingle(
98+
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
10699
}
107100
}
108101

@@ -165,21 +158,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
165158
val metaId = BroadcastBlockId(id, "meta")
166159
var attemptId = 10
167160
while (attemptId > 0 && totalBlocks == -1) {
168-
TorrentBroadcast.synchronized {
169-
SparkEnv.get.blockManager.getSingle(metaId) match {
170-
case Some(x) =>
171-
val tInfo = x.asInstanceOf[TorrentInfo]
172-
totalBlocks = tInfo.totalBlocks
173-
totalBytes = tInfo.totalBytes
174-
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
175-
hasBlocks = 0
176-
177-
case None =>
178-
Thread.sleep(500)
179-
}
161+
SparkEnv.get.blockManager.getSingle(metaId) match {
162+
case Some(x) =>
163+
val tInfo = x.asInstanceOf[TorrentInfo]
164+
totalBlocks = tInfo.totalBlocks
165+
totalBytes = tInfo.totalBytes
166+
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
167+
hasBlocks = 0
168+
169+
case None =>
170+
Thread.sleep(500)
180171
}
181172
attemptId -= 1
182173
}
174+
183175
if (totalBlocks == -1) {
184176
return false
185177
}
@@ -192,17 +184,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](
192184
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
193185
for (pid <- recvOrder) {
194186
val pieceId = BroadcastBlockId(id, "piece" + pid)
195-
TorrentBroadcast.synchronized {
196-
SparkEnv.get.blockManager.getSingle(pieceId) match {
197-
case Some(x) =>
198-
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
199-
hasBlocks += 1
200-
SparkEnv.get.blockManager.putSingle(
201-
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
187+
SparkEnv.get.blockManager.getSingle(pieceId) match {
188+
case Some(x) =>
189+
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
190+
hasBlocks += 1
191+
SparkEnv.get.blockManager.putSingle(
192+
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
202193

203-
case None =>
204-
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
205-
}
194+
case None =>
195+
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
206196
}
207197
}
208198

@@ -291,9 +281,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
291281
* If removeFromDriver is true, also remove these persisted blocks on the driver.
292282
*/
293283
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
294-
synchronized {
295-
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
296-
}
284+
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
297285
}
298286
}
299287

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ private[spark] class Master(
697697
appIdToUI(app.id) = ui
698698
webUi.attachSparkUI(ui)
699699
// Application UI is successfully rebuilt, so link the Master UI to it
700-
app.desc.appUiUrl = ui.basePath
700+
app.desc.appUiUrl = ui.getBasePath
701701
true
702702
} catch {
703703
case e: Exception =>

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ private[spark] class ConnectionManager(
467467

468468
val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
469469
if (!sendingConnectionOpt.isDefined) {
470-
logError("Corresponding SendingConnectionManagerId not found")
470+
logError(s"Corresponding SendingConnection to ${remoteConnectionManagerId} not found")
471471
return
472472
}
473473

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ private[spark] class EventLoggingListener(
5454
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
5555
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
5656
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
57-
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
57+
private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
58+
.toLowerCase + "-" + System.currentTimeMillis
5859
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
5960

6061
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ private[spark] class SparkUI(
7676
}
7777
}
7878

79+
def getAppName = appName
80+
7981
/** Set the app name for this UI. */
8082
def setAppName(name: String) {
8183
appName = name
@@ -100,6 +102,13 @@ private[spark] class SparkUI(
100102
private[spark] def appUIAddress = s"http://$appUIHostPort"
101103
}
102104

105+
private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
106+
extends WebUITab(parent, prefix) {
107+
108+
def appName: String = parent.getAppName
109+
110+
}
111+
103112
private[spark] object SparkUI {
104113
val DEFAULT_PORT = 4040
105114
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -163,17 +163,15 @@ private[spark] object UIUtils extends Logging {
163163

164164
/** Returns a spark page with correctly formatted headers */
165165
def headerSparkPage(
166-
content: => Seq[Node],
167-
basePath: String,
168-
appName: String,
169166
title: String,
170-
tabs: Seq[WebUITab],
171-
activeTab: WebUITab,
167+
content: => Seq[Node],
168+
activeTab: SparkUITab,
172169
refreshInterval: Option[Int] = None): Seq[Node] = {
173170

174-
val header = tabs.map { tab =>
171+
val appName = activeTab.appName
172+
val header = activeTab.headerTabs.map { tab =>
175173
<li class={if (tab == activeTab) "active" else ""}>
176-
<a href={prependBaseUri(basePath, "/" + tab.prefix)}>{tab.name}</a>
174+
<a href={prependBaseUri(activeTab.basePath, "/" + tab.prefix)}>{tab.name}</a>
177175
</li>
178176
}
179177

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ private[spark] abstract class WebUI(
5050
protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
5151
private val className = Utils.getFormattedClassName(this)
5252

53+
def getBasePath: String = basePath
5354
def getTabs: Seq[WebUITab] = tabs.toSeq
5455
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
5556
def getSecurityManager: SecurityManager = securityManager
@@ -135,6 +136,8 @@ private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
135136

136137
/** Get a list of header tabs from the parent UI. */
137138
def headerTabs: Seq[WebUITab] = parent.getTabs
139+
140+
def basePath: String = parent.getBasePath
138141
}
139142

140143

core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import scala.xml.Node
2424
import org.apache.spark.ui.{UIUtils, WebUIPage}
2525

2626
private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
27-
private val appName = parent.appName
28-
private val basePath = parent.basePath
2927
private val listener = parent.listener
3028

3129
def render(request: HttpServletRequest): Seq[Node] = {
@@ -45,7 +43,7 @@ private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("")
4543
<h4>Classpath Entries</h4> {classpathEntriesTable}
4644
</span>
4745

48-
UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent)
46+
UIUtils.headerSparkPage("Environment", content, parent)
4947
}
5048

5149
private def propertyHeader = Seq("Name", "Value")

core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.scheduler._
2222
import org.apache.spark.ui._
2323

24-
private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") {
25-
val appName = parent.appName
26-
val basePath = parent.basePath
24+
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
2725
val listener = new EnvironmentListener
2826

2927
attachPage(new EnvironmentPage(this))

0 commit comments

Comments
 (0)