Skip to content

Commit ef3ddae

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
2 parents 0ee67a2 + 4c42986 commit ef3ddae

File tree

15 files changed

+272
-23
lines changed

15 files changed

+272
-23
lines changed

LICENSE

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ SUCH DAMAGE.
754754

755755

756756
========================================================================
757-
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
757+
For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
758758
========================================================================
759759
Copyright (C) 2008 The Android Open Source Project
760760

@@ -771,6 +771,25 @@ See the License for the specific language governing permissions and
771771
limitations under the License.
772772

773773

774+
========================================================================
775+
For LimitedInputStream
776+
(network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
777+
========================================================================
778+
Copyright (C) 2007 The Guava Authors
779+
780+
Licensed under the Apache License, Version 2.0 (the "License");
781+
you may not use this file except in compliance with the License.
782+
You may obtain a copy of the License at
783+
784+
http://www.apache.org/licenses/LICENSE-2.0
785+
786+
Unless required by applicable law or agreed to in writing, software
787+
distributed under the License is distributed on an "AS IS" BASIS,
788+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
789+
See the License for the specific language governing permissions and
790+
limitations under the License.
791+
792+
774793
========================================================================
775794
BSD-style licenses
776795
========================================================================

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ object SparkEnv extends Logging {
287287

288288
// NB: blockManager is not valid until initialize() is called later.
289289
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
290-
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
290+
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
291291

292292
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
293293

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ private[spark] class BlockManager(
7171
val conf: SparkConf,
7272
mapOutputTracker: MapOutputTracker,
7373
shuffleManager: ShuffleManager,
74-
blockTransferService: BlockTransferService)
74+
blockTransferService: BlockTransferService,
75+
securityManager: SecurityManager)
7576
extends BlockDataManager with Logging {
7677

7778
val diskBlockManager = new DiskBlockManager(this, conf)
@@ -119,7 +120,8 @@ private[spark] class BlockManager(
119120
// Client to read other executors' shuffle files. This is either an external service, or just the
120121
// standard BlockTranserService to directly connect to other Executors.
121122
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
122-
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf))
123+
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
124+
securityManager.isAuthenticationEnabled())
123125
} else {
124126
blockTransferService
125127
}
@@ -170,9 +172,10 @@ private[spark] class BlockManager(
170172
conf: SparkConf,
171173
mapOutputTracker: MapOutputTracker,
172174
shuffleManager: ShuffleManager,
173-
blockTransferService: BlockTransferService) = {
175+
blockTransferService: BlockTransferService,
176+
securityManager: SecurityManager) = {
174177
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
175-
conf, mapOutputTracker, shuffleManager, blockTransferService)
178+
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
176179
}
177180

178181
/**
@@ -223,7 +226,6 @@ private[spark] class BlockManager(
223226
return
224227
} catch {
225228
case e: Exception if i < MAX_ATTEMPTS =>
226-
val attemptsRemaining =
227229
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
228230
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
229231
Thread.sleep(SLEEP_TIME_SECS * 1000)

core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
6262
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
6363
val transfer = new NioBlockTransferService(conf, securityMgr)
6464
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
65-
mapOutputTracker, shuffleManager, transfer)
65+
mapOutputTracker, shuffleManager, transfer, securityMgr)
6666
store.initialize("app-id")
6767
allStores += store
6868
store
@@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
263263
when(failableTransfer.hostName).thenReturn("some-hostname")
264264
when(failableTransfer.port).thenReturn(1000)
265265
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
266-
10000, conf, mapOutputTracker, shuffleManager, failableTransfer)
266+
10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
267267
failableStore.initialize("app-id")
268268
allStores += failableStore // so that this gets stopped after test
269269
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
7474
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
7575
val transfer = new NioBlockTransferService(conf, securityMgr)
7676
val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
77-
mapOutputTracker, shuffleManager, transfer)
77+
mapOutputTracker, shuffleManager, transfer, securityMgr)
7878
manager.initialize("app-id")
7979
manager
8080
}
@@ -795,7 +795,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
795795
// Use Java serializer so we can create an unserializable error.
796796
val transfer = new NioBlockTransferService(conf, securityMgr)
797797
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
798-
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer)
798+
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
799799

800800
// The put should fail since a1 is not serializable.
801801
class UnserializableClass

network/common/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<dependency>
5151
<groupId>com.google.guava</groupId>
5252
<artifactId>guava</artifactId>
53+
<version>11.0.2</version> <!-- yarn 2.4.0's version -->
5354
<scope>provided</scope>
5455
</dependency>
5556

network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.netty.channel.DefaultFileRegion;
3131

3232
import org.apache.spark.network.util.JavaUtils;
33+
import org.apache.spark.network.util.LimitedInputStream;
3334

3435
/**
3536
* A {@link ManagedBuffer} backed by a segment in a file.
@@ -101,7 +102,7 @@ public InputStream createInputStream() throws IOException {
101102
try {
102103
is = new FileInputStream(file);
103104
ByteStreams.skipFully(is, offset);
104-
return ByteStreams.limit(is, length);
105+
return new LimitedInputStream(is, length);
105106
} catch (IOException e) {
106107
try {
107108
if (is != null) {
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.util;
19+
20+
import java.io.FilterInputStream;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
24+
import com.google.common.base.Preconditions;
25+
26+
/**
27+
* Wraps a {@link InputStream}, limiting the number of bytes which can be read.
28+
*
29+
* This code is from Guava's 14.0 source code, because there is no compatible way to
30+
* use this functionality in both a Guava 11 environment and a Guava >14 environment.
31+
*/
32+
public final class LimitedInputStream extends FilterInputStream {
33+
private long left;
34+
private long mark = -1;
35+
36+
public LimitedInputStream(InputStream in, long limit) {
37+
super(in);
38+
Preconditions.checkNotNull(in);
39+
Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
40+
left = limit;
41+
}
42+
@Override public int available() throws IOException {
43+
return (int) Math.min(in.available(), left);
44+
}
45+
// it's okay to mark even if mark isn't supported, as reset won't work
46+
@Override public synchronized void mark(int readLimit) {
47+
in.mark(readLimit);
48+
mark = left;
49+
}
50+
@Override public int read() throws IOException {
51+
if (left == 0) {
52+
return -1;
53+
}
54+
int result = in.read();
55+
if (result != -1) {
56+
--left;
57+
}
58+
return result;
59+
}
60+
@Override public int read(byte[] b, int off, int len) throws IOException {
61+
if (left == 0) {
62+
return -1;
63+
}
64+
len = (int) Math.min(len, left);
65+
int result = in.read(b, off, len);
66+
if (result != -1) {
67+
left -= result;
68+
}
69+
return result;
70+
}
71+
@Override public synchronized void reset() throws IOException {
72+
if (!in.markSupported()) {
73+
throw new IOException("Mark not supported");
74+
}
75+
if (mark == -1) {
76+
throw new IOException("Mark not set");
77+
}
78+
in.reset();
79+
left = mark;
80+
}
81+
@Override public long skip(long n) throws IOException {
82+
n = Math.min(n, left);
83+
long skipped = in.skip(n);
84+
left -= skipped;
85+
return skipped;
86+
}
87+
}

network/shuffle/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
<dependency>
5252
<groupId>com.google.guava</groupId>
5353
<artifactId>guava</artifactId>
54+
<version>11.0.2</version> <!-- yarn 2.4.0's version -->
5455
<scope>provided</scope>
5556
</dependency>
5657

network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
126126
logger.trace("SASL client callback: setting realm");
127127
RealmCallback rc = (RealmCallback) callback;
128128
rc.setText(rc.getDefaultText());
129-
logger.info("Realm callback");
130129
} else if (callback instanceof RealmChoiceCallback) {
131130
// ignore (?)
132131
} else {

0 commit comments

Comments
 (0)