Skip to content

Commit 97cb85c

Browse files
committed
Modified confliction of MimExcludes
2 parents 2cdd009 + 0bbe7fa commit 97cb85c

File tree

39 files changed

+2287
-884
lines changed

39 files changed

+2287
-884
lines changed

core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717

1818
package org.apache.spark.network
1919

20-
import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
20+
import java.io._
2121
import java.nio.ByteBuffer
2222
import java.nio.channels.FileChannel
2323
import java.nio.channels.FileChannel.MapMode
2424

25+
import scala.util.Try
26+
2527
import com.google.common.io.ByteStreams
2628
import io.netty.buffer.{ByteBufInputStream, ByteBuf}
2729

28-
import org.apache.spark.util.ByteBufferInputStream
30+
import org.apache.spark.util.{ByteBufferInputStream, Utils}
2931

3032

3133
/**
@@ -71,18 +73,47 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
7173
try {
7274
channel = new RandomAccessFile(file, "r").getChannel
7375
channel.map(MapMode.READ_ONLY, offset, length)
76+
} catch {
77+
case e: IOException =>
78+
Try(channel.size).toOption match {
79+
case Some(fileLen) =>
80+
throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
81+
case None =>
82+
throw new IOException(s"Error in opening $this", e)
83+
}
7484
} finally {
7585
if (channel != null) {
76-
channel.close()
86+
Utils.tryLog(channel.close())
7787
}
7888
}
7989
}
8090

8191
override def inputStream(): InputStream = {
82-
val is = new FileInputStream(file)
83-
is.skip(offset)
84-
ByteStreams.limit(is, length)
92+
var is: FileInputStream = null
93+
try {
94+
is = new FileInputStream(file)
95+
is.skip(offset)
96+
ByteStreams.limit(is, length)
97+
} catch {
98+
case e: IOException =>
99+
if (is != null) {
100+
Utils.tryLog(is.close())
101+
}
102+
Try(file.length).toOption match {
103+
case Some(fileLen) =>
104+
throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
105+
case None =>
106+
throw new IOException(s"Error in opening $this", e)
107+
}
108+
case e: Throwable =>
109+
if (is != null) {
110+
Utils.tryLog(is.close())
111+
}
112+
throw e
113+
}
85114
}
115+
116+
override def toString: String = s"${getClass.getName}($file, $offset, $length)"
86117
}
87118

88119

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1209,7 +1209,7 @@ class DAGScheduler(
12091209
.format(job.jobId, stageId))
12101210
} else if (jobsForStage.get.size == 1) {
12111211
if (!stageIdToStage.contains(stageId)) {
1212-
logError("Missing Stage for stage with id $stageId")
1212+
logError(s"Missing Stage for stage with id $stageId")
12131213
} else {
12141214
// This is the only job that uses this stage, so fail the stage if it is running.
12151215
val stage = stageIdToStage(stageId)

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,20 @@ private[spark] object Utils extends Logging {
13041304
}
13051305
}
13061306

1307+
/** Executes the given block in a Try, logging any uncaught exceptions. */
1308+
def tryLog[T](f: => T): Try[T] = {
1309+
try {
1310+
val res = f
1311+
scala.util.Success(res)
1312+
} catch {
1313+
case ct: ControlThrowable =>
1314+
throw ct
1315+
case t: Throwable =>
1316+
logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
1317+
scala.util.Failure(t)
1318+
}
1319+
}
1320+
13071321
/** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
13081322
def isFatalError(e: Throwable): Boolean = {
13091323
e match {

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private[spark] class ExternalSorter[K, V, C](
144144
override def compare(a: K, b: K): Int = {
145145
val h1 = if (a == null) 0 else a.hashCode()
146146
val h2 = if (b == null) 0 else b.hashCode()
147-
h1 - h2
147+
if (h1 < h2) -1 else if (h1 == h2) 0 else 1
148148
}
149149
})
150150

core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import org.scalatest.{PrivateMethodTester, FunSuite}
2424
import org.apache.spark._
2525
import org.apache.spark.SparkContext._
2626

27+
import scala.util.Random
28+
2729
class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMethodTester {
2830
private def createSparkConf(loadDefaults: Boolean): SparkConf = {
2931
val conf = new SparkConf(loadDefaults)
@@ -707,4 +709,57 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
707709
Some(agg), Some(new HashPartitioner(FEW_PARTITIONS)), None, None)
708710
assertDidNotBypassMergeSort(sorter4)
709711
}
712+
713+
test("sort without breaking sorting contracts") {
714+
val conf = createSparkConf(true)
715+
conf.set("spark.shuffle.memoryFraction", "0.01")
716+
conf.set("spark.shuffle.manager", "sort")
717+
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
718+
719+
// Using wrongOrdering to show integer overflow introduced exception.
720+
val rand = new Random(100L)
721+
val wrongOrdering = new Ordering[String] {
722+
override def compare(a: String, b: String) = {
723+
val h1 = if (a == null) 0 else a.hashCode()
724+
val h2 = if (b == null) 0 else b.hashCode()
725+
h1 - h2
726+
}
727+
}
728+
729+
val testData = Array.tabulate(100000) { _ => rand.nextInt().toString }
730+
731+
val sorter1 = new ExternalSorter[String, String, String](
732+
None, None, Some(wrongOrdering), None)
733+
val thrown = intercept[IllegalArgumentException] {
734+
sorter1.insertAll(testData.iterator.map(i => (i, i)))
735+
sorter1.iterator
736+
}
737+
738+
assert(thrown.getClass() === classOf[IllegalArgumentException])
739+
assert(thrown.getMessage().contains("Comparison method violates its general contract"))
740+
sorter1.stop()
741+
742+
// Using aggregation and external spill to make sure ExternalSorter using
743+
// partitionKeyComparator.
744+
def createCombiner(i: String) = ArrayBuffer(i)
745+
def mergeValue(c: ArrayBuffer[String], i: String) = c += i
746+
def mergeCombiners(c1: ArrayBuffer[String], c2: ArrayBuffer[String]) = c1 ++= c2
747+
748+
val agg = new Aggregator[String, String, ArrayBuffer[String]](
749+
createCombiner, mergeValue, mergeCombiners)
750+
751+
val sorter2 = new ExternalSorter[String, String, ArrayBuffer[String]](
752+
Some(agg), None, None, None)
753+
sorter2.insertAll(testData.iterator.map(i => (i, i)))
754+
755+
// To validate the hash ordering of key
756+
var minKey = Int.MinValue
757+
sorter2.iterator.foreach { case (k, v) =>
758+
val h = k.hashCode()
759+
assert(h >= minKey)
760+
minKey = h
761+
}
762+
763+
sorter2.stop()
764+
}
710765
}

0 commit comments

Comments
 (0)