Skip to content

Commit 28ee8a9

Browse files
committed
Merge branch 'master' into SPARK-22861
2 parents 29d184c + fe65361 commit 28ee8a9

File tree

125 files changed

+12860
-1635
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

125 files changed

+12860
-1635
lines changed

common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
import io.netty.buffer.ByteBuf;
3131
import io.netty.buffer.Unpooled;
3232
import io.netty.channel.*;
33-
import io.netty.util.AbstractReferenceCounted;
3433
import org.apache.commons.crypto.stream.CryptoInputStream;
3534
import org.apache.commons.crypto.stream.CryptoOutputStream;
3635

36+
import org.apache.spark.network.util.AbstractFileRegion;
3737
import org.apache.spark.network.util.ByteArrayReadableChannel;
3838
import org.apache.spark.network.util.ByteArrayWritableChannel;
3939

@@ -161,7 +161,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
161161
}
162162
}
163163

164-
private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
164+
private static class EncryptedMessage extends AbstractFileRegion {
165165
private final boolean isByteBuf;
166166
private final ByteBuf buf;
167167
private final FileRegion region;
@@ -199,10 +199,45 @@ public long position() {
199199
}
200200

201201
@Override
202-
public long transfered() {
202+
public long transferred() {
203203
return transferred;
204204
}
205205

206+
@Override
207+
public EncryptedMessage touch(Object o) {
208+
super.touch(o);
209+
if (region != null) {
210+
region.touch(o);
211+
}
212+
if (buf != null) {
213+
buf.touch(o);
214+
}
215+
return this;
216+
}
217+
218+
@Override
219+
public EncryptedMessage retain(int increment) {
220+
super.retain(increment);
221+
if (region != null) {
222+
region.retain(increment);
223+
}
224+
if (buf != null) {
225+
buf.retain(increment);
226+
}
227+
return this;
228+
}
229+
230+
@Override
231+
public boolean release(int decrement) {
232+
if (region != null) {
233+
region.release(decrement);
234+
}
235+
if (buf != null) {
236+
buf.release(decrement);
237+
}
238+
return super.release(decrement);
239+
}
240+
206241
@Override
207242
public long transferTo(WritableByteChannel target, long position) throws IOException {
208243
Preconditions.checkArgument(position == transfered(), "Invalid position.");

common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,17 @@
2525
import com.google.common.base.Preconditions;
2626
import io.netty.buffer.ByteBuf;
2727
import io.netty.channel.FileRegion;
28-
import io.netty.util.AbstractReferenceCounted;
2928
import io.netty.util.ReferenceCountUtil;
3029

3130
import org.apache.spark.network.buffer.ManagedBuffer;
31+
import org.apache.spark.network.util.AbstractFileRegion;
3232

3333
/**
3434
* A wrapper message that holds two separate pieces (a header and a body).
3535
*
3636
* The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
3737
*/
38-
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
38+
class MessageWithHeader extends AbstractFileRegion {
3939

4040
@Nullable private final ManagedBuffer managedBuffer;
4141
private final ByteBuf header;
@@ -91,7 +91,7 @@ public long position() {
9191
}
9292

9393
@Override
94-
public long transfered() {
94+
public long transferred() {
9595
return totalBytesTransferred;
9696
}
9797

@@ -160,4 +160,37 @@ private int writeNioBuffer(
160160

161161
return ret;
162162
}
163+
164+
@Override
165+
public MessageWithHeader touch(Object o) {
166+
super.touch(o);
167+
header.touch(o);
168+
ReferenceCountUtil.touch(body, o);
169+
return this;
170+
}
171+
172+
@Override
173+
public MessageWithHeader retain(int increment) {
174+
super.retain(increment);
175+
header.retain(increment);
176+
ReferenceCountUtil.retain(body, increment);
177+
if (managedBuffer != null) {
178+
for (int i = 0; i < increment; i++) {
179+
managedBuffer.retain();
180+
}
181+
}
182+
return this;
183+
}
184+
185+
@Override
186+
public boolean release(int decrement) {
187+
header.release(decrement);
188+
ReferenceCountUtil.release(body, decrement);
189+
if (managedBuffer != null) {
190+
for (int i = 0; i < decrement; i++) {
191+
managedBuffer.release();
192+
}
193+
}
194+
return super.release(decrement);
195+
}
163196
}

common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import io.netty.channel.ChannelPromise;
3333
import io.netty.channel.FileRegion;
3434
import io.netty.handler.codec.MessageToMessageDecoder;
35-
import io.netty.util.AbstractReferenceCounted;
3635

36+
import org.apache.spark.network.util.AbstractFileRegion;
3737
import org.apache.spark.network.util.ByteArrayWritableChannel;
3838
import org.apache.spark.network.util.NettyUtils;
3939

@@ -129,7 +129,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
129129
}
130130

131131
@VisibleForTesting
132-
static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
132+
static class EncryptedMessage extends AbstractFileRegion {
133133

134134
private final SaslEncryptionBackend backend;
135135
private final boolean isByteBuf;
@@ -183,10 +183,45 @@ public long position() {
183183
* Returns an approximation of the amount of data transferred. See {@link #count()}.
184184
*/
185185
@Override
186-
public long transfered() {
186+
public long transferred() {
187187
return transferred;
188188
}
189189

190+
@Override
191+
public EncryptedMessage touch(Object o) {
192+
super.touch(o);
193+
if (buf != null) {
194+
buf.touch(o);
195+
}
196+
if (region != null) {
197+
region.touch(o);
198+
}
199+
return this;
200+
}
201+
202+
@Override
203+
public EncryptedMessage retain(int increment) {
204+
super.retain(increment);
205+
if (buf != null) {
206+
buf.retain(increment);
207+
}
208+
if (region != null) {
209+
region.retain(increment);
210+
}
211+
return this;
212+
}
213+
214+
@Override
215+
public boolean release(int decrement) {
216+
if (region != null) {
217+
region.release(decrement);
218+
}
219+
if (buf != null) {
220+
buf.release(decrement);
221+
}
222+
return super.release(decrement);
223+
}
224+
190225
/**
191226
* Transfers data from the original message to the channel, encrypting it in the process.
192227
*
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.util;
19+
20+
import io.netty.channel.FileRegion;
21+
import io.netty.util.AbstractReferenceCounted;
22+
23+
public abstract class AbstractFileRegion extends AbstractReferenceCounted implements FileRegion {
24+
25+
@Override
26+
@SuppressWarnings("deprecation")
27+
public final long transfered() {
28+
return transferred();
29+
}
30+
31+
@Override
32+
public AbstractFileRegion retain() {
33+
super.retain();
34+
return this;
35+
}
36+
37+
@Override
38+
public AbstractFileRegion retain(int increment) {
39+
super.retain(increment);
40+
return this;
41+
}
42+
43+
@Override
44+
public AbstractFileRegion touch() {
45+
super.touch();
46+
return this;
47+
}
48+
49+
@Override
50+
public AbstractFileRegion touch(Object o) {
51+
return this;
52+
}
53+
}

common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private void testServerToClient(Message msg) {
5656
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);
5757

5858
while (!serverChannel.outboundMessages().isEmpty()) {
59-
clientChannel.writeInbound(serverChannel.readOutbound());
59+
clientChannel.writeOneInbound(serverChannel.readOutbound());
6060
}
6161

6262
assertEquals(1, clientChannel.inboundMessages().size());
@@ -72,7 +72,7 @@ private void testClientToServer(Message msg) {
7272
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);
7373

7474
while (!clientChannel.outboundMessages().isEmpty()) {
75-
serverChannel.writeInbound(clientChannel.readOutbound());
75+
serverChannel.writeOneInbound(clientChannel.readOutbound());
7676
}
7777

7878
assertEquals(1, serverChannel.inboundMessages().size());

common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323

2424
import io.netty.buffer.ByteBuf;
2525
import io.netty.buffer.Unpooled;
26-
import io.netty.channel.FileRegion;
27-
import io.netty.util.AbstractReferenceCounted;
26+
import org.apache.spark.network.util.AbstractFileRegion;
2827
import org.junit.Test;
2928
import org.mockito.Mockito;
3029

@@ -108,7 +107,7 @@ private ByteBuf doWrite(MessageWithHeader msg, int minExpectedWrites) throws Exc
108107
return Unpooled.wrappedBuffer(channel.getData());
109108
}
110109

111-
private static class TestFileRegion extends AbstractReferenceCounted implements FileRegion {
110+
private static class TestFileRegion extends AbstractFileRegion {
112111

113112
private final int writeCount;
114113
private final int writesPerCall;
@@ -130,7 +129,7 @@ public long position() {
130129
}
131130

132131
@Override
133-
public long transfered() {
132+
public long transferred() {
134133
return 8 * written;
135134
}
136135

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef
5353
import org.apache.spark.scheduler._
5454
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
5555
import org.apache.spark.scheduler.local.LocalSchedulerBackend
56-
import org.apache.spark.status.{AppStatusPlugin, AppStatusStore}
56+
import org.apache.spark.status.AppStatusStore
5757
import org.apache.spark.storage._
5858
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
5959
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
@@ -416,7 +416,8 @@ class SparkContext(config: SparkConf) extends Logging {
416416

417417
// Initialize the app status store and listener before SparkEnv is created so that it gets
418418
// all events.
419-
_statusStore = AppStatusStore.createLiveStore(conf, l => listenerBus.addToStatusQueue(l))
419+
_statusStore = AppStatusStore.createLiveStore(conf)
420+
listenerBus.addToStatusQueue(_statusStore.listener.get)
420421

421422
// Create the Spark execution environment (cache, map output tracker, etc)
422423
_env = createSparkEnv(_conf, isLocal, listenerBus)
@@ -445,14 +446,9 @@ class SparkContext(config: SparkConf) extends Logging {
445446
// For tests, do not enable the UI
446447
None
447448
}
448-
_ui.foreach { ui =>
449-
// Load any plugins that might want to modify the UI.
450-
AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
451-
452-
// Bind the UI before starting the task scheduler to communicate
453-
// the bound port to the cluster manager properly
454-
ui.bind()
455-
}
449+
// Bind the UI before starting the task scheduler to communicate
450+
// the bound port to the cluster manager properly
451+
_ui.foreach(_.bind())
456452

457453
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
458454

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1271,7 +1271,7 @@ private[spark] object SparkSubmitUtils {
12711271
// retrieve all resolved dependencies
12721272
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
12731273
packagesDirectory.getAbsolutePath + File.separator +
1274-
"[organization]_[artifact]-[revision].[ext]",
1274+
"[organization]_[artifact]-[revision](-[classifier]).[ext]",
12751275
retrieveOptions.setConfs(Array(ivyConfName)))
12761276
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
12771277
} finally {

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import org.apache.spark.scheduler.ReplayListenerBus._
4444
import org.apache.spark.status._
4545
import org.apache.spark.status.KVUtils._
4646
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
47-
import org.apache.spark.status.config._
4847
import org.apache.spark.ui.SparkUI
4948
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
5049
import org.apache.spark.util.kvstore._
@@ -322,15 +321,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
322321
(new InMemoryStore(), true)
323322
}
324323

324+
val plugins = ServiceLoader.load(
325+
classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
325326
val trackingStore = new ElementTrackingStore(kvstore, conf)
326327
if (needReplay) {
327328
val replayBus = new ReplayListenerBus()
328329
val listener = new AppStatusListener(trackingStore, conf, false,
329330
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
330331
replayBus.addListener(listener)
331-
AppStatusPlugin.loadPlugins().foreach { plugin =>
332-
plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false)
333-
}
332+
for {
333+
plugin <- plugins
334+
listener <- plugin.createListeners(conf, trackingStore)
335+
} replayBus.addListener(listener)
334336
try {
335337
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
336338
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
@@ -353,9 +355,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
353355
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
354356
attempt.info.startTime.getTime(),
355357
attempt.info.appSparkVersion)
356-
AppStatusPlugin.loadPlugins().foreach { plugin =>
357-
plugin.setupUI(ui)
358-
}
358+
plugins.foreach(_.setupUI(ui))
359359

360360
val loadedUI = LoadedAppUI(ui)
361361

0 commit comments

Comments
 (0)