diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 166564c70922b..2b435f2078778 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -198,7 +198,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected final ScheduledPing scheduledPing; private final TimeValue pingSchedule; protected final ThreadPool threadPool; - private final BigArrays bigArrays; + protected final BigArrays bigArrays; protected final NetworkService networkService; protected final Set profileSettings; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 0655a6d871197..c48318ab4e447 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.transport; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -26,6 +25,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -35,6 +35,7 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.mocksocket.MockSocket; @@ -153,19 +154,20 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx if (msgSize == -1) { socket.getOutputStream().flush(); } else { - BytesStreamOutput output = new BytesStreamOutput(); - final byte[] buffer = new byte[msgSize]; - input.readFully(buffer); - output.write(minimalHeader); - output.writeInt(msgSize); - output.write(buffer); - final BytesReference bytes = output.bytes(); - if (TcpTransport.validateMessageHeader(bytes)) { - InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); - messageReceived(bytes.slice(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE, msgSize), - mockChannel, mockChannel.profile, remoteAddress, msgSize); - } else { - // ping message - we just drop all stuff + try (BytesStreamOutput output = new ReleasableBytesStreamOutput(msgSize, bigArrays)) { + final byte[] buffer = new byte[msgSize]; + input.readFully(buffer); + output.write(minimalHeader); + output.writeInt(msgSize); + output.write(buffer); + final BytesReference bytes = output.bytes(); + if (TcpTransport.validateMessageHeader(bytes)) { + InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + messageReceived(bytes.slice(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE, msgSize), + mockChannel, mockChannel.profile, remoteAddress, msgSize); + } else { + // ping message - we just drop all stuff + } } } }