Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.AbstractReferenceCounted;
import org.apache.commons.crypto.stream.CryptoInputStream;
import org.apache.commons.crypto.stream.CryptoOutputStream;

import org.apache.spark.network.util.AbstractFileRegion;
import org.apache.spark.network.util.ByteArrayReadableChannel;
import org.apache.spark.network.util.ByteArrayWritableChannel;

Expand Down Expand Up @@ -161,7 +161,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
}

private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
private static class EncryptedMessage extends AbstractFileRegion {
private final boolean isByteBuf;
private final ByteBuf buf;
private final FileRegion region;
Expand Down Expand Up @@ -199,10 +199,45 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
return transferred;
}

@Override
public EncryptedMessage touch(Object o) {
super.touch(o);
if (region != null) {
region.touch(o);
}
if (buf != null) {
buf.touch(o);
}
return this;
}

@Override
public EncryptedMessage retain(int increment) {
super.retain(increment);
if (region != null) {
region.retain(increment);
}
if (buf != null) {
buf.retain(increment);
}
return this;
}

@Override
public boolean release(int decrement) {
if (region != null) {
region.release(decrement);
}
if (buf != null) {
buf.release(decrement);
}
return super.release(decrement);
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
Preconditions.checkArgument(position == transfered(), "Invalid position.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.AbstractFileRegion;

/**
* A wrapper message that holds two separate pieces (a header and a body).
*
* The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
*/
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
class MessageWithHeader extends AbstractFileRegion {

@Nullable private final ManagedBuffer managedBuffer;
private final ByteBuf header;
Expand Down Expand Up @@ -91,7 +91,7 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
return totalBytesTransferred;
}

Expand Down Expand Up @@ -160,4 +160,37 @@ private int writeNioBuffer(

return ret;
}

@Override
public MessageWithHeader touch(Object o) {
super.touch(o);
header.touch(o);
ReferenceCountUtil.touch(body, o);
return this;
}

@Override
public MessageWithHeader retain(int increment) {
super.retain(increment);
header.retain(increment);
ReferenceCountUtil.retain(body, increment);
if (managedBuffer != null) {
for (int i = 0; i < increment; i++) {
managedBuffer.retain();
}
}
return this;
}

@Override
public boolean release(int decrement) {
header.release(decrement);
ReferenceCountUtil.release(body, decrement);
if (managedBuffer != null) {
for (int i = 0; i < decrement; i++) {
managedBuffer.release();
}
}
return super.release(decrement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.AbstractReferenceCounted;

import org.apache.spark.network.util.AbstractFileRegion;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;

Expand Down Expand Up @@ -129,7 +129,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
}

@VisibleForTesting
static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
static class EncryptedMessage extends AbstractFileRegion {

private final SaslEncryptionBackend backend;
private final boolean isByteBuf;
Expand Down Expand Up @@ -183,10 +183,45 @@ public long position() {
* Returns an approximation of the amount of data transferred. See {@link #count()}.
*/
@Override
public long transfered() {
public long transferred() {
return transferred;
}

@Override
public EncryptedMessage touch(Object o) {
super.touch(o);
if (buf != null) {
buf.touch(o);
}
if (region != null) {
region.touch(o);
}
return this;
}

@Override
public EncryptedMessage retain(int increment) {
super.retain(increment);
if (buf != null) {
buf.retain(increment);
}
if (region != null) {
region.retain(increment);
}
return this;
}

@Override
public boolean release(int decrement) {
if (region != null) {
region.release(decrement);
}
if (buf != null) {
buf.release(decrement);
}
return super.release(decrement);
}

/**
* Transfers data from the original message to the channel, encrypting it in the process.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;

public abstract class AbstractFileRegion extends AbstractReferenceCounted implements FileRegion {

@Override
@SuppressWarnings("deprecation")
public final long transfered() {
return transferred();
}

@Override
public AbstractFileRegion retain() {
super.retain();
return this;
}

@Override
public AbstractFileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public AbstractFileRegion touch() {
super.touch();
return this;
}

@Override
public AbstractFileRegion touch(Object o) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private void testServerToClient(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!serverChannel.outboundMessages().isEmpty()) {
clientChannel.writeInbound(serverChannel.readOutbound());
clientChannel.writeOneInbound(serverChannel.readOutbound());
}

assertEquals(1, clientChannel.inboundMessages().size());
Expand All @@ -72,7 +72,7 @@ private void testClientToServer(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!clientChannel.outboundMessages().isEmpty()) {
serverChannel.writeInbound(clientChannel.readOutbound());
serverChannel.writeOneInbound(clientChannel.readOutbound());
}

assertEquals(1, serverChannel.inboundMessages().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import org.apache.spark.network.util.AbstractFileRegion;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -108,7 +107,7 @@ private ByteBuf doWrite(MessageWithHeader msg, int minExpectedWrites) throws Exc
return Unpooled.wrappedBuffer(channel.getData());
}

private static class TestFileRegion extends AbstractReferenceCounted implements FileRegion {
private static class TestFileRegion extends AbstractFileRegion {

private final int writeCount;
private final int writesPerCall;
Expand All @@ -130,7 +129,7 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
return 8 * written;
}

Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ListBuffer

import com.google.common.io.Closeables
import io.netty.channel.{DefaultFileRegion, FileRegion}
import io.netty.util.AbstractReferenceCounted
import io.netty.channel.DefaultFileRegion

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
import org.apache.spark.security.CryptoStreamUtils
import org.apache.spark.util.Utils
import org.apache.spark.util.io.ChunkedByteBuffer
Expand Down Expand Up @@ -266,7 +265,7 @@ private class EncryptedBlockData(
}

private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: Long)
extends AbstractReferenceCounted with FileRegion {
extends AbstractFileRegion {

private var _transferred = 0L

Expand All @@ -277,7 +276,7 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize:

override def position(): Long = 0

override def transfered(): Long = _transferred
override def transferred(): Long = _transferred

override def transferTo(target: WritableByteChannel, pos: Long): Long = {
assert(pos == transfered(), "Invalid position.")
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ metrics-jvm-3.1.5.jar
minlog-1.3.0.jar
mx4j-3.0.2.jar
netty-3.9.9.Final.jar
netty-all-4.0.47.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
orc-core-1.4.1-nohive.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ metrics-jvm-3.1.5.jar
minlog-1.3.0.jar
mx4j-3.0.2.jar
netty-3.9.9.Final.jar
netty-all-4.0.47.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
opencsv-2.3.jar
orc-core-1.4.1-nohive.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.47.Final</version>
<version>4.1.17.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down