Skip to content

Commit 37e221c

Browse files
committed
Merge branch 'master' of github.com:apache/spark into SPARK-23937
2 parents 9bbaa3b + ac527b5 commit 37e221c

File tree

78 files changed

+1281
-549
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+1281
-549
lines changed

assembly/README

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ This module is off by default. To activate it specify the profile in the command
99

1010
If you need to build an assembly for a different version of Hadoop the
1111
hadoop-version system property needs to be set as in this example:
12-
-Dhadoop.version=2.7.3
12+
-Dhadoop.version=2.7.7

common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public void close() throws IOException {
234234
* Closes the given iterator if the DB is still open. Trying to close a JNI LevelDB handle
235235
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
236236
*/
237-
void closeIterator(LevelDBIterator it) throws IOException {
237+
void closeIterator(LevelDBIterator<?> it) throws IOException {
238238
synchronized (this._db) {
239239
DB _db = this._db.get();
240240
if (_db != null) {

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ private class StdChannelListener
318318
}
319319

320320
@Override
321-
public void operationComplete(Future future) throws Exception {
321+
public void operationComplete(Future<? super Void> future) throws Exception {
322322
if (future.isSuccess()) {
323323
if (logger.isTraceEnabled()) {
324324
long timeTaken = System.currentTimeMillis() - startTime;

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,8 @@ public void handle(ResponseMessage message) throws Exception {
212212
if (entry != null) {
213213
StreamCallback callback = entry.getValue();
214214
if (resp.byteCount > 0) {
215-
StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
216-
callback);
215+
StreamInterceptor<ResponseMessage> interceptor = new StreamInterceptor<>(
216+
this, resp.streamId, resp.byteCount, callback);
217217
try {
218218
TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
219219
channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);

common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private void encryptMore() throws IOException {
267267
int copied = byteRawChannel.write(buf.nioBuffer());
268268
buf.skipBytes(copied);
269269
} else {
270-
region.transferTo(byteRawChannel, region.transfered());
270+
region.transferTo(byteRawChannel, region.transferred());
271271
}
272272
cos.write(byteRawChannel.getData(), 0, byteRawChannel.length());
273273
cos.flush();

common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ private void nextChunk() throws IOException {
301301
int copied = byteChannel.write(buf.nioBuffer());
302302
buf.skipBytes(copied);
303303
} else {
304-
region.transferTo(byteChannel, region.transfered());
304+
region.transferTo(byteChannel, region.transferred());
305305
}
306306

307307
byte[] encrypted = backend.wrap(byteChannel.getData(), 0, byteChannel.length());

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,8 @@ public String getID() {
252252
}
253253
};
254254
if (req.bodyByteCount > 0) {
255-
StreamInterceptor interceptor = new StreamInterceptor(this, wrappedCallback.getID(),
256-
req.bodyByteCount, wrappedCallback);
255+
StreamInterceptor<RequestMessage> interceptor = new StreamInterceptor<>(
256+
this, wrappedCallback.getID(), req.bodyByteCount, wrappedCallback);
257257
frameDecoder.setInterceptor(interceptor);
258258
} else {
259259
wrappedCallback.onComplete(wrappedCallback.getID());

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,11 @@ public void close() {
148148
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
149149
channelFuture = null;
150150
}
151-
if (bootstrap != null && bootstrap.group() != null) {
152-
bootstrap.group().shutdownGracefully();
151+
if (bootstrap != null && bootstrap.config().group() != null) {
152+
bootstrap.config().group().shutdownGracefully();
153153
}
154-
if (bootstrap != null && bootstrap.childGroup() != null) {
155-
bootstrap.childGroup().shutdownGracefully();
154+
if (bootstrap != null && bootstrap.config().childGroup() != null) {
155+
bootstrap.config().childGroup().shutdownGracefully();
156156
}
157157
bootstrap = null;
158158
}

common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.network.util;
1919

20-
import java.lang.reflect.Field;
2120
import java.util.concurrent.ThreadFactory;
2221

2322
import io.netty.buffer.PooledByteBufAllocator;
@@ -111,24 +110,14 @@ public static PooledByteBufAllocator createPooledByteBufAllocator(
111110
}
112111
return new PooledByteBufAllocator(
113112
allowDirectBufs && PlatformDependent.directBufferPreferred(),
114-
Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
115-
Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
116-
getPrivateStaticField("DEFAULT_PAGE_SIZE"),
117-
getPrivateStaticField("DEFAULT_MAX_ORDER"),
118-
allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
119-
allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
120-
allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
113+
Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
114+
Math.min(PooledByteBufAllocator.defaultNumDirectArena(), allowDirectBufs ? numCores : 0),
115+
PooledByteBufAllocator.defaultPageSize(),
116+
PooledByteBufAllocator.defaultMaxOrder(),
117+
allowCache ? PooledByteBufAllocator.defaultTinyCacheSize() : 0,
118+
allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
119+
allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
120+
allowCache ? PooledByteBufAllocator.defaultUseCacheForAllThreads() : false
121121
);
122122
}
123-
124-
/** Used to get defaults from Netty's private static fields. */
125-
private static int getPrivateStaticField(String name) {
126-
try {
127-
Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
128-
f.setAccessible(true);
129-
return f.getInt(null);
130-
} catch (Exception e) {
131-
throw new RuntimeException(e);
132-
}
133-
}
134123
}

common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ public void encode(ChannelHandlerContext ctx, FileRegion in, List<Object> out)
116116
throws Exception {
117117

118118
ByteArrayWritableChannel channel = new ByteArrayWritableChannel(Ints.checkedCast(in.count()));
119-
while (in.transfered() < in.count()) {
120-
in.transferTo(channel, in.transfered());
119+
while (in.transferred() < in.count()) {
120+
in.transferTo(channel, in.transferred());
121121
}
122122
out.add(Unpooled.wrappedBuffer(channel.getData()));
123123
}

0 commit comments

Comments
 (0)