Skip to content

Commit b2318eb

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into fix-assembly-jarname
2 parents 5fc1259 + c9ae79f commit b2318eb

File tree

25 files changed

+395
-219
lines changed

25 files changed

+395
-219
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -779,20 +779,20 @@ class SparkContext(config: SparkConf) extends Logging {
779779
/**
780780
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
781781
* with `+=`. Only the driver can access the accumuable's `value`.
782-
* @tparam T accumulator type
783-
* @tparam R type that can be added to the accumulator
782+
* @tparam R accumulator result type
783+
* @tparam T type that can be added to the accumulator
784784
*/
785-
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
785+
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) =
786786
new Accumulable(initialValue, param)
787787

788788
/**
789789
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
790790
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
791791
* access the accumuable's `value`.
792-
* @tparam T accumulator type
793-
* @tparam R type that can be added to the accumulator
792+
* @tparam R accumulator result type
793+
* @tparam T type that can be added to the accumulator
794794
*/
795-
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
795+
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) =
796796
new Accumulable(initialValue, param, Some(name))
797797

798798
/**

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging {
163163

164164
private def write(id: Long, value: Any) {
165165
val file = getFile(id)
166-
val out: OutputStream = {
167-
if (compress) {
168-
compressionCodec.compressedOutputStream(new FileOutputStream(file))
169-
} else {
170-
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
166+
val fileOutputStream = new FileOutputStream(file)
167+
try {
168+
val out: OutputStream = {
169+
if (compress) {
170+
compressionCodec.compressedOutputStream(fileOutputStream)
171+
} else {
172+
new BufferedOutputStream(fileOutputStream, bufferSize)
173+
}
171174
}
175+
val ser = SparkEnv.get.serializer.newInstance()
176+
val serOut = ser.serializeStream(out)
177+
serOut.writeObject(value)
178+
serOut.close()
179+
files += file
180+
} finally {
181+
fileOutputStream.close()
172182
}
173-
val ser = SparkEnv.get.serializer.newInstance()
174-
val serOut = ser.serializeStream(out)
175-
serOut.writeObject(value)
176-
serOut.close()
177-
files += file
178183
}
179184

180185
private def read[T: ClassTag](id: Long): T = {

core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine(
8383
val serialized = serializer.toBinary(value)
8484

8585
val out = new FileOutputStream(file)
86-
out.write(serialized)
87-
out.close()
86+
try {
87+
out.write(serialized)
88+
} finally {
89+
out.close()
90+
}
8891
}
8992

9093
def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
9194
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
9295
val dis = new DataInputStream(new FileInputStream(file))
93-
dis.readFully(fileData)
94-
dis.close()
96+
try {
97+
dis.readFully(fileData)
98+
} finally {
99+
dis.close()
100+
}
95101

96102
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
97103
val serializer = serialization.serializerFor(clazz)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import scala.language.postfixOps
2929

3030
import akka.actor._
3131
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
32-
import org.apache.commons.io.FileUtils
3332

3433
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
3534
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,13 @@ private[spark] class MesosSchedulerBackend(
372372
recordSlaveLost(d, slaveId, ExecutorExited(status))
373373
}
374374

375+
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
376+
driver.killTask(
377+
TaskID.newBuilder()
378+
.setValue(taskId.toString).build()
379+
)
380+
}
381+
375382
// TODO: query Mesos for number of cores
376383
override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
377384

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,21 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
7373
val startTime = System.currentTimeMillis
7474
val file = diskManager.getFile(blockId)
7575
val outputStream = new FileOutputStream(file)
76-
blockManager.dataSerializeStream(blockId, outputStream, values)
76+
try {
77+
try {
78+
blockManager.dataSerializeStream(blockId, outputStream, values)
79+
} finally {
80+
// Close outputStream here because it should be closed before file is deleted.
81+
outputStream.close()
82+
}
83+
} catch {
84+
case e: Throwable =>
85+
if (file.exists()) {
86+
file.delete()
87+
}
88+
throw e
89+
}
90+
7791
val length = file.length
7892

7993
val timeTaken = System.currentTimeMillis - startTime

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ import scala.util.control.{ControlThrowable, NonFatal}
3535

3636
import com.google.common.io.Files
3737
import com.google.common.util.concurrent.ThreadFactoryBuilder
38-
import org.apache.commons.io.FileUtils
39-
import org.apache.commons.io.filefilter.TrueFileFilter
4038
import org.apache.commons.lang3.SystemUtils
4139
import org.apache.hadoop.conf.Configuration
4240
import org.apache.log4j.PropertyConfigurator
@@ -710,18 +708,20 @@ private[spark] object Utils extends Logging {
710708
* Determines if a directory contains any files newer than cutoff seconds.
711709
*
712710
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
713-
* @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
711+
* @param cutoff measured in seconds. Returns true if there are any files or directories in the
712+
* given directory whose last modified time is later than this many seconds ago
714713
*/
715714
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
716-
val currentTimeMillis = System.currentTimeMillis
717715
if (!dir.isDirectory) {
718-
throw new IllegalArgumentException (dir + " is not a directory!")
719-
} else {
720-
val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE)
721-
val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
722-
val newFiles = files.filter { _.lastModified > cutoffTimeInMillis }
723-
newFiles.nonEmpty
716+
throw new IllegalArgumentException("$dir is not a directory!")
724717
}
718+
val filesAndDirs = dir.listFiles()
719+
val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000)
720+
721+
filesAndDirs.exists(_.lastModified() > cutoffTimeInMillis) ||
722+
filesAndDirs.filter(_.isDirectory).exists(
723+
subdir => doesDirectoryContainAnyNewFiles(subdir, cutoff)
724+
)
725725
}
726726

727727
/**

dev/merge_spark_pr.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,10 @@ def fail(msg):
7373

7474

7575
def run_cmd(cmd):
76+
print cmd
7677
if isinstance(cmd, list):
77-
print " ".join(cmd)
7878
return subprocess.check_output(cmd)
7979
else:
80-
print cmd
8180
return subprocess.check_output(cmd.split(" "))
8281

8382

docs/building-spark.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,21 @@ can be set to control the SBT build. For example:
171171

172172
sbt/sbt -Pyarn -Phadoop-2.3 assembly
173173

174+
# Testing with SBT
175+
176+
Some of the tests require Spark to be packaged first, so always run `sbt/sbt assembly` the first time. The following is an example of a correct (build, test) sequence:
177+
178+
sbt/sbt -Pyarn -Phadoop-2.3 -Phive assembly
179+
sbt/sbt -Pyarn -Phadoop-2.3 -Phive test
180+
181+
To run only a specific test suite as follows:
182+
183+
sbt/sbt -Pyarn -Phadoop-2.3 -Phive "test-only org.apache.spark.repl.ReplSuite"
184+
185+
To run test suites of a specific sub project as follows:
186+
187+
sbt/sbt -Pyarn -Phadoop-2.3 -Phive core/test
188+
174189
# Speeding up Compilation with Zinc
175190

176191
[Zinc](https://github.com/typesafehub/zinc) is a long-running server version of SBT's incremental

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ object ScalaReflection {
3333

3434
/** Converts Scala objects to catalyst rows / types */
3535
def convertToCatalyst(a: Any): Any = a match {
36-
case o: Option[_] => o.orNull
36+
case o: Option[_] => o.map(convertToCatalyst).orNull
3737
case s: Seq[_] => s.map(convertToCatalyst)
3838
case m: Map[_, _] => m.map { case (k, v) => convertToCatalyst(k) -> convertToCatalyst(v) }
3939
case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)

0 commit comments

Comments
 (0)