Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
b999fa4
[SPARK-17696][SPARK-12330][CORE] Partial backport of to branch-1.6.
drcrallen Sep 28, 2016
376545e
[SPARK-17721][MLLIB][BACKPORT] Fix for multiplying transposed SparseM…
bwahlgreen Oct 2, 2016
d3890de
[SPARK-15062][SQL] Backport fix list type infer serializer issue
brkyvz Oct 6, 2016
585c565
[SPARK-17850][CORE] Add a flag to ignore corrupt files (branch 1.6)
zsxwing Oct 13, 2016
18b173c
[SPARK-17678][REPL][BRANCH-1.6] Honor spark.replClassServer.port in s…
jerryshao Oct 13, 2016
903cc92
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Oct 14, 2016
745c5e7
[SPARK-17884][SQL] To resolve Null pointer exception when casting fro…
priyankagar Oct 14, 2016
0f57785
Prepare branch-1.6 for 1.6.3 release.
rxin Oct 17, 2016
7375bb0
Preparing Spark release v1.6.3
pwendell Oct 17, 2016
b95ac0d
Preparing development version 1.6.4-SNAPSHOT
pwendell Oct 17, 2016
4f9c026
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Oct 17, 2016
82e98f1
[SPARK-16078][SQL] Backport: from_utc_timestamp/to_utc_timestamp shou…
Oct 20, 2016
1e86074
Preparing Spark release v1.6.3-rc2
pwendell Nov 2, 2016
9136e26
Preparing development version 1.6.4-SNAPSHOT
pwendell Nov 2, 2016
8f25cb2
[SPARK-18553][CORE][BRANCH-1.6] Fix leak of TaskSetManager following …
JoshRosen Dec 1, 2016
70f271b
[SPARK-12446][SQL][BACKPORT-1.6] Add unit tests for JDBCRDD internal …
maropu Dec 3, 2016
91c9700
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Dec 6, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: SparkR
Type: Package
Title: R frontend for Spark
Version: 1.6.1
Version: 1.6.4
Date: 2013-09-09
Author: The Apache Software Foundation
Maintainer: Shivaram Venkataraman <[email protected]>
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>1.6.2-csd-7-SNAPSHOT</version>
<version>1.6.3-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>1.6.2-csd-7-SNAPSHOT</version>
<version>1.6.3-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>1.6.2-csd-7-SNAPSHOT</version>
<version>1.6.3-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.executor

import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.hadoop.conf.Configuration

Expand All @@ -45,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend(
env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None

Expand Down Expand Up @@ -106,19 +108,23 @@ private[spark] class CoarseGrainedExecutorBackend(
}

case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)

case Shutdown =>
stopping.set(true)
executor.stop()
stop()
rpcEnv.shutdown()
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (driver.exists(_.address == remoteAddress)) {
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
System.exit(1)
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ package org.apache

package object spark {
// For package docs only
val SPARK_VERSION = "1.6.2"
val SPARK_VERSION = "1.6.3"
}
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class HadoopRDD[K, V](

private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)

private val ignoreCorruptFiles =
sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
Expand Down Expand Up @@ -245,8 +248,7 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
case _: EOFException if ignoreCorruptFiles => finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.rdd

import java.io.EOFException
import java.text.SimpleDateFormat
import java.util.Date

Expand Down Expand Up @@ -84,6 +85,9 @@ class NewHadoopRDD[K, V](

private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)

private val ignoreCorruptFiles =
sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)

def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
if (shouldCloneJobConf) {
Expand Down Expand Up @@ -171,7 +175,11 @@ class NewHadoopRDD[K, V](

override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
try {
finished = !reader.nextKeyValue
} catch {
case _: EOFException if ignoreCorruptFiles => finished = true
}
if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl(
// Incrementing task IDs
val nextTaskId = new AtomicLong(0)

// Number of tasks running on each executor
private val executorIdToTaskCount = new HashMap[String, Int]
// IDs of the tasks running on each executor
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]

// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
Expand Down Expand Up @@ -254,7 +254,7 @@ private[spark] class TaskSchedulerImpl(
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
executorIdToRunningTaskIds(execId).add(tid)
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
Expand Down Expand Up @@ -283,7 +283,7 @@ private[spark] class TaskSchedulerImpl(
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
executorIdToRunningTaskIds.getOrElseUpdate(o.executorId, HashSet[Long]())
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
Expand Down Expand Up @@ -329,37 +329,34 @@ private[spark] class TaskSchedulerImpl(
var failedExecutor: Option[String] = None
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)

if (executorIdToTaskCount.contains(execId)) {
removeExecutor(execId,
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
}
}
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
val reason =
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")
removeExecutor(execId, reason)
failedExecutor = Some(execId)
}
}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates)")
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
Expand Down Expand Up @@ -468,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
var failedExecutor: Option[String] = None

synchronized {
if (executorIdToTaskCount.contains(executorId)) {
if (executorIdToRunningTaskIds.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logExecutorLoss(executorId, hostPort, reason)
removeExecutor(executorId, reason)
Expand Down Expand Up @@ -510,13 +507,31 @@ private[spark] class TaskSchedulerImpl(
logError(s"Lost executor $executorId on $hostPort: $reason")
}

/**
* Cleans up the TaskScheduler's state for tracking the given task.
*/
private def cleanupTaskState(tid: Long): Unit = {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { executorId =>
executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
}
}

/**
* Remove an executor from all our data structures and mark it as lost. If the executor's loss
* reason is not yet known, do not yet remove its association with its host nor update the status
* of any running tasks, since the loss reason defines whether we'll fail those tasks.
*/
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
executorIdToTaskCount -= executorId
// The tasks on the lost executor may not send any more status updates (because the executor
// has been lost), so they should be cleaned up here.
executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
logDebug("Cleaning up TaskScheduler state for tasks " +
s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
// We do not notify the TaskSetManager of the task failures because that will
// happen below in the rootPool.executorLost() call.
taskIds.foreach(cleanupTaskState)
}

val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
Expand Down Expand Up @@ -554,11 +569,11 @@ private[spark] class TaskSchedulerImpl(
}

def isExecutorAlive(execId: String): Boolean = synchronized {
executorIdToTaskCount.contains(execId)
executorIdToRunningTaskIds.contains(execId)
}

def isExecutorBusy(execId: String): Boolean = synchronized {
executorIdToTaskCount.getOrElse(execId, -1) > 0
executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
}

// By default, rack is unknown
Expand Down
62 changes: 61 additions & 1 deletion core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark

import java.io.{File, FileWriter}
import java.io._
import java.util.zip.GZIPOutputStream

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.PortableDataStream
Expand Down Expand Up @@ -540,4 +541,63 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}.collect()
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
}

test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
val inputFile = File.createTempFile("input-", ".gz")
try {
// Create a corrupt gzip file
val byteOutput = new ByteArrayOutputStream()
val gzip = new GZIPOutputStream(byteOutput)
try {
gzip.write(Array[Byte](1, 2, 3, 4))
} finally {
gzip.close()
}
val bytes = byteOutput.toByteArray
val o = new FileOutputStream(inputFile)
try {
// It's corrupt since we only write half of bytes into the file.
o.write(bytes.take(bytes.length / 2))
} finally {
o.close()
}

// Spark job should ignore corrupt files by default
sc = new SparkContext("local", "test")
// Test HadoopRDD
assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
// Test NewHadoopRDD
assert {
sc.newAPIHadoopFile(
inputFile.toURI.toString,
classOf[NewTextInputFormat],
classOf[LongWritable],
classOf[Text]).collect().isEmpty
}
sc.stop()

// Reading a corrupt gzip file should throw EOFException
val conf = new SparkConf().set("spark.files.ignoreCorruptFiles", "false")
sc = new SparkContext("local", "test", conf)
// Test HadoopRDD
var e = intercept[SparkException] {
sc.textFile(inputFile.toURI.toString).collect()
}
assert(e.getCause.isInstanceOf[EOFException])
assert(e.getCause.getMessage === "Unexpected end of input stream")
// Test NewHadoopRDD
e = intercept[SparkException] {
sc.newAPIHadoopFile(
inputFile.toURI.toString,
classOf[NewTextInputFormat],
classOf[LongWritable],
classOf[Text]).collect()
}
assert(e.getCause.isInstanceOf[EOFException])
assert(e.getCause.getMessage === "Unexpected end of input stream")
} finally {
inputFile.delete()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,11 @@ class StandaloneDynamicAllocationSuite
assert(executors.size === 2)

// simulate running a task on the executor
val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount)
val getMap =
PrivateMethod[mutable.HashMap[String, mutable.HashSet[Long]]]('executorIdToRunningTaskIds)
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
executorIdToTaskCount(executors.head) = 1
val executorIdToRunningTaskIds = taskScheduler invokePrivate getMap()
executorIdToRunningTaskIds(executors.head) = mutable.HashSet(1L)
// kill the busy executor without force; this should fail
assert(killExecutor(sc, executors.head, force = false))
apps = getApplications()
Expand Down
Loading