From 14dbc71ac9be7d027b8d2d008f3a1350e5012dd1 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 27 Nov 2017 10:57:16 -0800 Subject: [PATCH 1/3] Upgrade Netty to 4.1.17 --- .../spark/network/crypto/TransportCipher.java | 57 +++++++++++++++++++ .../network/protocol/MessageWithHeader.java | 51 +++++++++++++++++ .../spark/network/sasl/SaslEncryption.java | 57 +++++++++++++++++++ .../protocol/MessageWithHeaderSuite.java | 28 +++++++++ .../org/apache/spark/storage/DiskStore.scala | 29 ++++++++++ dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- pom.xml | 2 +- 8 files changed, 225 insertions(+), 3 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java index 7376d1ddc481..5086ce220f30 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java @@ -203,6 +203,63 @@ public long transfered() { return transferred; } + @Override + public long transferred() { + return transferred; + } + + /** + * Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. + */ + @Override + public EncryptedMessage touch() { + super.touch(); + return this; + } + + @Override + public EncryptedMessage touch(Object o) { + if (region != null) { + region.touch(o); + } + if (buf != null) { + buf.touch(o); + } + return this; + } + + /** + * Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. + */ + @Override + public EncryptedMessage retain() { + super.retain(); + return this; + } + + @Override + public EncryptedMessage retain(int increment) { + super.retain(increment); + if (region != null) { + region.retain(increment); + } + if (buf != null) { + buf.retain(increment); + } + return this; + } + + @Override + public boolean release(int decrement) { + if (region != null) { + region.release(decrement); + } + if (buf != null) { + buf.release(decrement); + } + return super.release(decrement); + } + @Override public long transferTo(WritableByteChannel target, long position) throws IOException { Preconditions.checkArgument(position == transfered(), "Invalid position."); diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java index 4f8781b42a0e..99b12b785754 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java @@ -95,6 +95,11 @@ public long transfered() { return totalBytesTransferred; } + @Override + public long transferred() { + return totalBytesTransferred; + } + /** * This code is more complicated than you would think because we might require multiple * transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting. @@ -160,4 +165,50 @@ private int writeNioBuffer( return ret; } + + /** Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. */ + @Override + public MessageWithHeader touch() { + super.touch(); + return this; + } + + @Override + public MessageWithHeader touch(Object o) { + header.touch(o); + ReferenceCountUtil.touch(body, o); + return this; + } + + /** Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. */ + @Override + public MessageWithHeader retain() { + super.retain(); + return this; + } + + @Override + public MessageWithHeader retain(int increment) { + super.retain(increment); + header.retain(increment); + ReferenceCountUtil.retain(body, increment); + if (managedBuffer != null) { + for (int i = 0; i < increment; i++) { + managedBuffer.retain(); + } + } + return this; + } + + @Override + public boolean release(int decrement) { + header.release(decrement); + ReferenceCountUtil.release(body, decrement); + if (managedBuffer != null) { + for (int i = 0; i < decrement; i++) { + managedBuffer.release(); + } + } + return super.release(decrement); + } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java index 3d71ebaa7ea0..117968926b03 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java @@ -187,6 +187,63 @@ public long transfered() { return transferred; } + @Override + public long transferred() { + return transferred; + } + + /** + * Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. + */ + @Override + public EncryptedMessage touch() { + super.touch(); + return this; + } + + @Override + public EncryptedMessage touch(Object o) { + if (buf != null) { + buf.touch(o); + } + if (region != null) { + region.touch(o); + } + return this; + } + + /** + * Override this due to different return types of ReferenceCounted.retain and FileRegion.retain. + */ + @Override + public EncryptedMessage retain() { + super.retain(); + return this; + } + + @Override + public EncryptedMessage retain(int increment) { + super.retain(increment); + if (buf != null) { + buf.retain(increment); + } + if (region != null) { + region.retain(increment); + } + return this; + } + + @Override + public boolean release(int decrement) { + if (region != null) { + region.release(decrement); + } + if (buf != null) { + buf.release(decrement); + } + return super.release(decrement); + } + /** * Transfers data from the original message to the channel, encrypting it in the process. * diff --git a/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java index b341c5681e00..f1d6b29d6df3 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java @@ -134,6 +134,34 @@ public long transfered() { return 8 * written; } + @Override + public long transferred() { + return 8 * written; + } + + @Override + public TestFileRegion touch() { + super.touch(); + return this; + } + + @Override + public TestFileRegion touch(Object o) { + return this; + } + + @Override + public TestFileRegion retain() { + super.retain(); + return this; + } + + @Override + public TestFileRegion retain(int increment) { + super.retain(increment); + return this; + } + @Override public long transferTo(WritableByteChannel target, long position) throws IOException { for (int i = 0; i < writesPerCall; i++) { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 97abd92d4b70..76e54194abb6 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -279,6 +279,35 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: override def transfered(): Long = _transferred + override def transferred(): Long = _transferred + + /** + * Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. + */ + override def touch(): this.type = { + super.touch() + this + } + + override def touch(o: Object): this.type = { + this + } + + /** + * Override this due to different return types of ReferenceCounted.retain and FileRegion.retain. + */ + override def retain(): this.type = { + super.retain() + this + } + + override def retain(increment: Int): this.type = { + super.retain(increment) + this + } + + override def release(decrement: Int): Boolean = super.release(decrement) + override def transferTo(target: WritableByteChannel, pos: Long): Long = { assert(pos == transfered(), "Invalid position.") diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 21c8a7579638..7e63dae19255 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -146,7 +146,7 @@ metrics-jvm-3.1.5.jar minlog-1.3.0.jar mx4j-3.0.2.jar netty-3.9.9.Final.jar -netty-all-4.0.47.Final.jar +netty-all-4.1.17.Final.jar objenesis-2.1.jar opencsv-2.3.jar orc-core-1.4.1-nohive.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 7173426c7bf7..3e0a52b50f27 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -147,7 +147,7 @@ metrics-jvm-3.1.5.jar minlog-1.3.0.jar mx4j-3.0.2.jar netty-3.9.9.Final.jar -netty-all-4.0.47.Final.jar +netty-all-4.1.17.Final.jar objenesis-2.1.jar opencsv-2.3.jar orc-core-1.4.1-nohive.jar diff --git a/pom.xml b/pom.xml index 0297311dd6e6..e69f509d236d 100644 --- a/pom.xml +++ b/pom.xml @@ -580,7 +580,7 @@ io.netty netty-all - 4.0.47.Final + 4.1.17.Final io.netty From 93632a071b5eef1f4d2882f547c43ed0750b1450 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 27 Nov 2017 14:27:00 -0800 Subject: [PATCH 2/3] Fix ProtocolSuite --- .../src/test/java/org/apache/spark/network/ProtocolSuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java index bb1c40c4b0e0..bc94f7ca63a9 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java @@ -56,7 +56,7 @@ private void testServerToClient(Message msg) { NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE); while (!serverChannel.outboundMessages().isEmpty()) { - clientChannel.writeInbound(serverChannel.readOutbound()); + clientChannel.writeOneInbound(serverChannel.readOutbound()); } assertEquals(1, clientChannel.inboundMessages().size()); @@ -72,7 +72,7 @@ private void testClientToServer(Message msg) { NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE); while (!clientChannel.outboundMessages().isEmpty()) { - serverChannel.writeInbound(clientChannel.readOutbound()); + serverChannel.writeOneInbound(clientChannel.readOutbound()); } assertEquals(1, serverChannel.inboundMessages().size()); From 96df5f26d163a4a17d8ab824995b57992afa6b8b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 8 Dec 2017 16:15:29 -0800 Subject: [PATCH 3/3] Add AbstractFileRegion --- .../spark/network/crypto/TransportCipher.java | 28 ++-------- .../network/protocol/MessageWithHeader.java | 24 ++------- .../spark/network/sasl/SaslEncryption.java | 28 ++-------- .../network/util/AbstractFileRegion.java | 53 +++++++++++++++++++ .../protocol/MessageWithHeaderSuite.java | 33 +----------- .../org/apache/spark/storage/DiskStore.scala | 36 ++----------- 6 files changed, 67 insertions(+), 135 deletions(-) create mode 100644 common/network-common/src/main/java/org/apache/spark/network/util/AbstractFileRegion.java diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java index 5086ce220f30..e04524dde0a7 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java @@ -30,10 +30,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; -import io.netty.util.AbstractReferenceCounted; import org.apache.commons.crypto.stream.CryptoInputStream; import org.apache.commons.crypto.stream.CryptoOutputStream; +import org.apache.spark.network.util.AbstractFileRegion; import org.apache.spark.network.util.ByteArrayReadableChannel; import org.apache.spark.network.util.ByteArrayWritableChannel; @@ -161,7 +161,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } } - private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion { + private static class EncryptedMessage extends AbstractFileRegion { private final boolean isByteBuf; private final ByteBuf buf; private final FileRegion region; @@ -198,27 +198,14 @@ public long position() { return 0; } - @Override - public long transfered() { - return transferred; - } - @Override public long transferred() { return transferred; } - /** - * Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. - */ - @Override - public EncryptedMessage touch() { - super.touch(); - return this; - } - @Override public EncryptedMessage touch(Object o) { + super.touch(o); if (region != null) { region.touch(o); } @@ -228,15 +215,6 @@ public EncryptedMessage touch(Object o) { return this; } - /** - * Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. - */ - @Override - public EncryptedMessage retain() { - super.retain(); - return this; - } - @Override public EncryptedMessage retain(int increment) { super.retain(increment); diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java index 99b12b785754..897d0f9e4fb8 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java @@ -25,17 +25,17 @@ import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.channel.FileRegion; -import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCountUtil; import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.util.AbstractFileRegion; /** * A wrapper message that holds two separate pieces (a header and a body). * * The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion. */ -class MessageWithHeader extends AbstractReferenceCounted implements FileRegion { +class MessageWithHeader extends AbstractFileRegion { @Nullable private final ManagedBuffer managedBuffer; private final ByteBuf header; @@ -90,11 +90,6 @@ public long position() { return 0; } - @Override - public long transfered() { - return totalBytesTransferred; - } - @Override public long transferred() { return totalBytesTransferred; @@ -166,27 +161,14 @@ private int writeNioBuffer( return ret; } - /** Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. */ - @Override - public MessageWithHeader touch() { - super.touch(); - return this; - } - @Override public MessageWithHeader touch(Object o) { + super.touch(o); header.touch(o); ReferenceCountUtil.touch(body, o); return this; } - /** Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. */ - @Override - public MessageWithHeader retain() { - super.retain(); - return this; - } - @Override public MessageWithHeader retain(int increment) { super.retain(increment); diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java index 117968926b03..16ab4efcd4f5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java @@ -32,8 +32,8 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.FileRegion; import io.netty.handler.codec.MessageToMessageDecoder; -import io.netty.util.AbstractReferenceCounted; +import org.apache.spark.network.util.AbstractFileRegion; import org.apache.spark.network.util.ByteArrayWritableChannel; import org.apache.spark.network.util.NettyUtils; @@ -129,7 +129,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) } @VisibleForTesting - static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion { + static class EncryptedMessage extends AbstractFileRegion { private final SaslEncryptionBackend backend; private final boolean isByteBuf; @@ -182,27 +182,14 @@ public long position() { /** * Returns an approximation of the amount of data transferred. See {@link #count()}. */ - @Override - public long transfered() { - return transferred; - } - @Override public long transferred() { return transferred; } - /** - * Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. - */ - @Override - public EncryptedMessage touch() { - super.touch(); - return this; - } - @Override public EncryptedMessage touch(Object o) { + super.touch(o); if (buf != null) { buf.touch(o); } @@ -212,15 +199,6 @@ public EncryptedMessage touch(Object o) { return this; } - /** - * Override this due to different return types of ReferenceCounted.retain and FileRegion.retain. - */ - @Override - public EncryptedMessage retain() { - super.retain(); - return this; - } - @Override public EncryptedMessage retain(int increment) { super.retain(increment); diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/AbstractFileRegion.java b/common/network-common/src/main/java/org/apache/spark/network/util/AbstractFileRegion.java new file mode 100644 index 000000000000..8651297d97ec --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/util/AbstractFileRegion.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import io.netty.channel.FileRegion; +import io.netty.util.AbstractReferenceCounted; + +public abstract class AbstractFileRegion extends AbstractReferenceCounted implements FileRegion { + + @Override + @SuppressWarnings("deprecation") + public final long transfered() { + return transferred(); + } + + @Override + public AbstractFileRegion retain() { + super.retain(); + return this; + } + + @Override + public AbstractFileRegion retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public AbstractFileRegion touch() { + super.touch(); + return this; + } + + @Override + public AbstractFileRegion touch(Object o) { + return this; + } +} diff --git a/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java index f1d6b29d6df3..ecb66fcf2ff7 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java @@ -23,8 +23,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.FileRegion; -import io.netty.util.AbstractReferenceCounted; +import org.apache.spark.network.util.AbstractFileRegion; import org.junit.Test; import org.mockito.Mockito; @@ -108,7 +107,7 @@ private ByteBuf doWrite(MessageWithHeader msg, int minExpectedWrites) throws Exc return Unpooled.wrappedBuffer(channel.getData()); } - private static class TestFileRegion extends AbstractReferenceCounted implements FileRegion { + private static class TestFileRegion extends AbstractFileRegion { private final int writeCount; private final int writesPerCall; @@ -129,39 +128,11 @@ public long position() { return 0; } - @Override - public long transfered() { - return 8 * written; - } - @Override public long transferred() { return 8 * written; } - @Override - public TestFileRegion touch() { - super.touch(); - return this; - } - - @Override - public TestFileRegion touch(Object o) { - return this; - } - - @Override - public TestFileRegion retain() { - super.retain(); - return this; - } - - @Override - public TestFileRegion retain(int increment) { - super.retain(increment); - return this; - } - @Override public long transferTo(WritableByteChannel target, long position) throws IOException { for (int i = 0; i < writesPerCall; i++) { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 76e54194abb6..39249d411b58 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -26,12 +26,11 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ListBuffer import com.google.common.io.Closeables -import io.netty.channel.{DefaultFileRegion, FileRegion} -import io.netty.util.AbstractReferenceCounted +import io.netty.channel.DefaultFileRegion import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging -import org.apache.spark.network.util.JavaUtils +import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils} import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBuffer @@ -266,7 +265,7 @@ private class EncryptedBlockData( } private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: Long) - extends AbstractReferenceCounted with FileRegion { + extends AbstractFileRegion { private var _transferred = 0L @@ -277,37 +276,8 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: override def position(): Long = 0 - override def transfered(): Long = _transferred - override def transferred(): Long = _transferred - /** - * Override this due to different return types of ReferenceCounted.touch and FileRegion.touch. - */ - override def touch(): this.type = { - super.touch() - this - } - - override def touch(o: Object): this.type = { - this - } - - /** - * Override this due to different return types of ReferenceCounted.retain and FileRegion.retain. - */ - override def retain(): this.type = { - super.retain() - this - } - - override def retain(increment: Int): this.type = { - super.retain(increment) - this - } - - override def release(decrement: Int): Boolean = super.release(decrement) - override def transferTo(target: WritableByteChannel, pos: Long): Long = { assert(pos == transfered(), "Invalid position.")