Skip to content

Commit e0f38cb

Browse files
committed
Merge branch 'master' of github.com:apache/spark into simple-streaming-fix
2 parents bad921c + fbc4694 commit e0f38cb

File tree

80 files changed

+2336
-1623
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+2336
-1623
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,13 @@ private[spark] object SparkConf extends Logging {
362362
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
363363
"1.3"),
364364
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
365-
"Use spark.{driver,executor}.userClassPathFirst instead."))
365+
"Use spark.{driver,executor}.userClassPathFirst instead."),
366+
DeprecatedConfig("spark.history.fs.updateInterval",
367+
"spark.history.fs.update.interval.seconds",
368+
"1.3", "Use spark.history.fs.update.interval.seconds instead"),
369+
DeprecatedConfig("spark.history.updateInterval",
370+
"spark.history.fs.update.interval.seconds",
371+
"1.3", "Use spark.history.fs.update.interval.seconds instead"))
366372
configs.map { x => (x.oldName, x) }.toMap
367373
}
368374

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,13 @@ private[spark] class PythonRDD(
219219
val oldBids = PythonRDD.getWorkerBroadcasts(worker)
220220
val newBids = broadcastVars.map(_.id).toSet
221221
// number of different broadcasts
222-
val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size
222+
val toRemove = oldBids.diff(newBids)
223+
val cnt = toRemove.size + newBids.diff(oldBids).size
223224
dataOut.writeInt(cnt)
224-
for (bid <- oldBids) {
225-
if (!newBids.contains(bid)) {
226-
// remove the broadcast from worker
227-
dataOut.writeLong(- bid - 1) // bid >= 0
228-
oldBids.remove(bid)
229-
}
225+
for (bid <- toRemove) {
226+
// remove the broadcast from worker
227+
dataOut.writeLong(- bid - 1) // bid >= 0
228+
oldBids.remove(bid)
230229
}
231230
for (broadcast <- broadcastVars) {
232231
if (!oldBids.contains(broadcast.id)) {

core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.File
21-
2220
import scala.collection.JavaConversions._
2321

2422
import org.apache.spark.util.{RedirectThread, Utils}
@@ -164,6 +162,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
164162
}
165163
}
166164
val returnCode = process.waitFor()
165+
stdoutThread.join()
166+
stderrThread.join()
167167
sys.exit(returnCode)
168168
}
169169

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 79 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.{BufferedInputStream, FileNotFoundException, InputStream}
20+
import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream}
21+
import java.util.concurrent.{Executors, TimeUnit}
2122

2223
import scala.collection.mutable
24+
import scala.concurrent.duration.Duration
25+
26+
import com.google.common.util.concurrent.ThreadFactoryBuilder
2327

2428
import org.apache.hadoop.fs.{FileStatus, Path}
2529
import org.apache.hadoop.fs.permission.AccessControlException
@@ -44,17 +48,27 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
4448
private val NOT_STARTED = "<Not Started>"
4549

4650
// Interval between each check for event log updates
47-
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
48-
conf.getInt("spark.history.updateInterval", 10)) * 1000
51+
private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
52+
.orElse(conf.getOption(SparkConf.translateConfKey("spark.history.fs.updateInterval", true)))
53+
.orElse(conf.getOption(SparkConf.translateConfKey("spark.history.updateInterval", true)))
54+
.map(_.toInt)
55+
.getOrElse(10) * 1000
56+
57+
// Interval between each cleaner checks for event logs to delete
58+
private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
59+
DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000
4960

5061
private val logDir = conf.getOption("spark.history.fs.logDirectory")
5162
.map { d => Utils.resolveURI(d).toString }
5263
.getOrElse(DEFAULT_LOG_DIR)
5364

5465
private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
5566

56-
// A timestamp of when the disk was last accessed to check for log updates
57-
private var lastLogCheckTimeMs = -1L
67+
// Used by check event thread and clean log thread.
68+
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
69+
// and applications between check task and clean task.
70+
private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
71+
.setNameFormat("spark-history-task-%d").setDaemon(true).build())
5872

5973
// The modification time of the newest log detected during the last scan. This is used
6074
// to ignore logs that are older during subsequent scans, to avoid processing data that
@@ -73,25 +87,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
7387
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
7488

7589
/**
76-
* A background thread that periodically checks for event log updates on disk.
77-
*
78-
* If a log check is invoked manually in the middle of a period, this thread re-adjusts the
79-
* time at which it performs the next log check to maintain the same period as before.
80-
*
81-
* TODO: Add a mechanism to update manually.
90+
* Return a runnable that performs the given operation on the event logs.
91+
* This operation is expected to be executed periodically.
8292
*/
83-
private val logCheckingThread = new Thread("LogCheckingThread") {
84-
override def run() = Utils.logUncaughtExceptions {
85-
while (true) {
86-
val now = getMonotonicTimeMs()
87-
if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
88-
Thread.sleep(UPDATE_INTERVAL_MS)
89-
} else {
90-
// If the user has manually checked for logs recently, wait until
91-
// UPDATE_INTERVAL_MS after the last check time
92-
Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
93-
}
94-
checkForLogs()
93+
private def getRunner(operateFun: () => Unit): Runnable = {
94+
new Runnable() {
95+
override def run() = Utils.logUncaughtExceptions {
96+
operateFun()
9597
}
9698
}
9799
}
@@ -113,12 +115,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
113115
"Logging directory specified is not a directory: %s".format(logDir))
114116
}
115117

116-
checkForLogs()
117-
118118
// Disable the background thread during tests.
119119
if (!conf.contains("spark.testing")) {
120-
logCheckingThread.setDaemon(true)
121-
logCheckingThread.start()
120+
// A task that periodically checks for event log updates on disk.
121+
pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
122+
TimeUnit.MILLISECONDS)
123+
124+
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
125+
// A task that periodically cleans event logs on disk.
126+
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
127+
TimeUnit.MILLISECONDS)
128+
}
122129
}
123130
}
124131

@@ -163,9 +170,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
163170
* applications that haven't been updated since last time the logs were checked.
164171
*/
165172
private[history] def checkForLogs(): Unit = {
166-
lastLogCheckTimeMs = getMonotonicTimeMs()
167-
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
168-
169173
try {
170174
var newLastModifiedTime = lastModifiedTime
171175
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
@@ -230,6 +234,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
230234
}
231235
}
232236

237+
/**
238+
* Delete event logs from the log directory according to the clean policy defined by the user.
239+
*/
240+
private def cleanLogs(): Unit = {
241+
try {
242+
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
243+
.getOrElse(Seq[FileStatus]())
244+
val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
245+
DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
246+
247+
val now = System.currentTimeMillis()
248+
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
249+
250+
applications.values.foreach { info =>
251+
if (now - info.lastUpdated <= maxAge) {
252+
appsToRetain += (info.id -> info)
253+
}
254+
}
255+
256+
applications = appsToRetain
257+
258+
// Scan all logs from the log directory.
259+
// Only directories older than the specified max age will be deleted
260+
statusList.foreach { dir =>
261+
try {
262+
if (now - dir.getModificationTime() > maxAge) {
263+
// if path is a directory and set to true,
264+
// the directory is deleted else throws an exception
265+
fs.delete(dir.getPath, true)
266+
}
267+
} catch {
268+
case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
269+
}
270+
}
271+
} catch {
272+
case t: Exception => logError("Exception in cleaning logs", t)
273+
}
274+
}
275+
233276
/**
234277
* Comparison function that defines the sort order for the application listing.
235278
*
@@ -336,9 +379,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
336379
}
337380
}
338381

339-
/** Returns the system's mononotically increasing time. */
340-
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
341-
342382
/**
343383
* Return true when the application has completed.
344384
*/
@@ -354,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
354394

355395
private object FsHistoryProvider {
356396
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
397+
398+
// One day
399+
val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
400+
401+
// One week
402+
val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
357403
}
358404

359405
private class FsApplicationHistoryInfo(

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ private[spark] class ApplicationInfo(
9090
}
9191
}
9292

93-
private val myMaxCores = desc.maxCores.getOrElse(defaultCores)
93+
val requestedCores = desc.maxCores.getOrElse(defaultCores)
9494

95-
def coresLeft: Int = myMaxCores - coresGranted
95+
def coresLeft: Int = requestedCores - coresGranted
9696

9797
private var _retryCount = 0
9898

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,16 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
5050
val workers = state.workers.sortBy(_.id)
5151
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
5252

53-
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
54-
"User", "State", "Duration")
53+
val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
54+
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
5555
val activeApps = state.activeApps.sortBy(_.startTime).reverse
56-
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
56+
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)
57+
58+
val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
59+
"Submitted Time", "User", "State", "Duration")
5760
val completedApps = state.completedApps.sortBy(_.endTime).reverse
58-
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
61+
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
62+
completedApps)
5963

6064
val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
6165
"Memory", "Main Class")
@@ -162,16 +166,23 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
162166
</tr>
163167
}
164168

165-
private def appRow(app: ApplicationInfo): Seq[Node] = {
169+
private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
166170
<tr>
167171
<td>
168172
<a href={"app?appId=" + app.id}>{app.id}</a>
169173
</td>
170174
<td>
171175
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
172176
</td>
177+
{
178+
if (active) {
179+
<td>
180+
{app.coresGranted}
181+
</td>
182+
}
183+
}
173184
<td>
174-
{app.coresGranted}
185+
{app.requestedCores}
175186
</td>
176187
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
177188
{Utils.megabytesToString(app.desc.memoryPerSlave)}
@@ -183,6 +194,14 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
183194
</tr>
184195
}
185196

197+
private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
198+
appRow(app, active = true)
199+
}
200+
201+
private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
202+
appRow(app, active = false)
203+
}
204+
186205
private def driverRow(driver: DriverInfo): Seq[Node] = {
187206
<tr>
188207
<td>{driver.id} </td>

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,13 @@ private[spark] class ExecutorRunner(
8585
var exitCode: Option[Int] = None
8686
if (process != null) {
8787
logInfo("Killing process!")
88-
process.destroy()
89-
process.waitFor()
9088
if (stdoutAppender != null) {
9189
stdoutAppender.stop()
9290
}
9391
if (stderrAppender != null) {
9492
stderrAppender.stop()
9593
}
94+
process.destroy()
9695
exitCode = Some(process.waitFor())
9796
}
9897
worker ! ExecutorStateChanged(appId, execId, state, message, exitCode)
@@ -135,7 +134,7 @@ private[spark] class ExecutorRunner(
135134
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
136135

137136
builder.directory(executorDir)
138-
builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
137+
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
139138
// In case we are running this from within the Spark Shell, avoid creating a "scala"
140139
// parent process for the executor command
141140
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,11 +345,11 @@ private[spark] class Worker(
345345
}
346346

347347
// Create local dirs for the executor. These are passed to the executor via the
348-
// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
348+
// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
349349
// application finishes.
350350
val appLocalDirs = appDirectories.get(appId).getOrElse {
351351
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
352-
Utils.createDirectory(dir).getAbsolutePath()
352+
Utils.createDirectory(dir, namePrefix = "executor").getAbsolutePath()
353353
}.toSeq
354354
}
355355
appDirectories(appId) = appLocalDirs

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ class TaskMetrics extends Serializable {
203203
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
204204
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
205205
merged.incLocalBytesRead(depMetrics.localBytesRead)
206-
merged.incLocalReadTime(depMetrics.localReadTime)
207206
merged.incRecordsRead(depMetrics.recordsRead)
208207
}
209208
_shuffleReadMetrics = Some(merged)
@@ -345,13 +344,6 @@ class ShuffleReadMetrics extends Serializable {
345344
private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
346345
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
347346

348-
/**
349-
* Time the task spent (in milliseconds) reading local shuffle blocks (from the local disk).
350-
*/
351-
private var _localReadTime: Long = _
352-
def localReadTime = _localReadTime
353-
private[spark] def incLocalReadTime(value: Long) = _localReadTime += value
354-
355347
/**
356348
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
357349
*/

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
993993
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
994994

995995
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
996+
require(writer != null, "Unable to obtain RecordWriter")
996997
var recordsWritten = 0L
997998
try {
998999
while (iter.hasNext) {

0 commit comments

Comments
 (0)