Skip to content

Commit e1fd685

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into fix-concurrent-sql
2 parents 661c28e + e1772d3 commit e1fd685

File tree

203 files changed

+1732
-592
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

203 files changed

+1732
-592
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ private TransportClient createClient(InetSocketAddress address) throws IOExcepti
194194
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
195195
.option(ChannelOption.ALLOCATOR, pooledAllocator);
196196

197-
final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
198-
final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
197+
final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
198+
final AtomicReference<Channel> channelRef = new AtomicReference<>();
199199

200200
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
201201
@Override

common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ public class ChunkFetchIntegrationSuite {
6464
static ManagedBuffer bufferChunk;
6565
static ManagedBuffer fileChunk;
6666

67-
private TransportConf transportConf;
68-
6967
@BeforeClass
7068
public static void setUp() throws Exception {
7169
int bufSize = 100000;

common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class RequestTimeoutIntegrationSuite {
5555
private TransportConf conf;
5656

5757
// A large timeout that "shouldn't happen", for the sake of faulty tests not hanging forever.
58-
private final int FOREVER = 60 * 1000;
58+
private static final int FOREVER = 60 * 1000;
5959

6060
@Before
6161
public void setUp() throws Exception {

common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ public void run() {
122122
for (TransportClient client : clients) {
123123
client.close();
124124
}
125+
126+
factory.close();
125127
}
126128

127129
@Test

common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
* <p>
4545
* Note: This is not designed for general use cases, should not be used outside SQL.
4646
*/
47-
public final class UTF8String implements Comparable<UTF8String>, Externalizable, KryoSerializable {
47+
public final class UTF8String implements Comparable<UTF8String>, Externalizable, KryoSerializable,
48+
Cloneable {
4849

4950
// These are only updated by readExternal() or read()
5051
@Nonnull

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ final class ShuffleExternalSorter extends MemoryConsumer {
8484
* this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
8585
* itself).
8686
*/
87-
private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<MemoryBlock>();
87+
private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();
8888

89-
private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
89+
private final LinkedList<SpillInfo> spills = new LinkedList<>();
9090

9191
/** Peak memory used by this sorter so far, in bytes. **/
9292
private long peakMemoryUsedBytes;

core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public enum TaskSorting {
2929

3030
private final Set<String> alternateNames;
3131
private TaskSorting(String... names) {
32-
alternateNames = new HashSet<String>();
32+
alternateNames = new HashSet<>();
3333
for (String n: names) {
3434
alternateNames.add(n);
3535
}

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void insertRecord(long recordPointer, long keyPrefix) {
172172
pos++;
173173
}
174174

175-
public final class SortedIterator extends UnsafeSorterIterator {
175+
public final class SortedIterator extends UnsafeSorterIterator implements Cloneable {
176176

177177
private final int numRecords;
178178
private int position;

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,6 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
376376
* @param numReducers total number of reducers in the shuffle
377377
* @param fractionThreshold fraction of total map output size that a location must have
378378
* for it to be considered large.
379-
*
380-
* This method is not thread-safe.
381379
*/
382380
def getLocationsWithLargestOutputs(
383381
shuffleId: Int,
@@ -386,28 +384,36 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
386384
fractionThreshold: Double)
387385
: Option[Array[BlockManagerId]] = {
388386

389-
if (mapStatuses.contains(shuffleId)) {
390-
val statuses = mapStatuses(shuffleId)
391-
if (statuses.nonEmpty) {
392-
// HashMap to add up sizes of all blocks at the same location
393-
val locs = new HashMap[BlockManagerId, Long]
394-
var totalOutputSize = 0L
395-
var mapIdx = 0
396-
while (mapIdx < statuses.length) {
397-
val status = statuses(mapIdx)
398-
val blockSize = status.getSizeForBlock(reducerId)
399-
if (blockSize > 0) {
400-
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
401-
totalOutputSize += blockSize
387+
val statuses = mapStatuses.get(shuffleId).orNull
388+
if (statuses != null) {
389+
statuses.synchronized {
390+
if (statuses.nonEmpty) {
391+
// HashMap to add up sizes of all blocks at the same location
392+
val locs = new HashMap[BlockManagerId, Long]
393+
var totalOutputSize = 0L
394+
var mapIdx = 0
395+
while (mapIdx < statuses.length) {
396+
val status = statuses(mapIdx)
397+
// status may be null here if we are called between registerShuffle, which creates an
398+
// array with null entries for each output, and registerMapOutputs, which populates it
399+
// with valid status entries. This is possible if one thread schedules a job which
400+
// depends on an RDD which is currently being computed by another thread.
401+
if (status != null) {
402+
val blockSize = status.getSizeForBlock(reducerId)
403+
if (blockSize > 0) {
404+
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
405+
totalOutputSize += blockSize
406+
}
407+
}
408+
mapIdx = mapIdx + 1
409+
}
410+
val topLocs = locs.filter { case (loc, size) =>
411+
size.toDouble / totalOutputSize >= fractionThreshold
412+
}
413+
// Return if we have any locations which satisfy the required threshold
414+
if (topLocs.nonEmpty) {
415+
return Some(topLocs.keys.toArray)
402416
}
403-
mapIdx = mapIdx + 1
404-
}
405-
val topLocs = locs.filter { case (loc, size) =>
406-
size.toDouble / totalOutputSize >= fractionThreshold
407-
}
408-
// Return if we have any locations which satisfy the required threshold
409-
if (topLocs.nonEmpty) {
410-
return Some(topLocs.map(_._1).toArray)
411417
}
412418
}
413419
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, U
4848

4949
/**
5050
* Whether to submit, kill, or request the status of an application.
51-
* The latter two operations are currently supported only for standalone cluster mode.
51+
* The latter two operations are currently supported only for standalone and Mesos cluster modes.
5252
*/
5353
private[deploy] object SparkSubmitAction extends Enumeration {
5454
type SparkSubmitAction = Value

0 commit comments

Comments
 (0)