diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 545055fadf019..324b7ebe506a1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -179,7 +179,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final CircuitBreakerService circuitBreakerService; private final Version version; protected final ThreadPool threadPool; - private final BigArrays bigArrays; + protected final BigArrays bigArrays; protected final NetworkService networkService; protected final Set profileSettings; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index d1977da97951f..6d636e557a9de 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -18,15 +18,17 @@ */ package org.elasticsearch.transport; -import org.elasticsearch.cli.SuppressForbidden; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cli.SuppressForbidden; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -35,15 +37,16 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.mocksocket.MockSocket; -import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.threadpool.ThreadPool; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.Closeable; +import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; @@ -139,24 +142,26 @@ protected void doRun() throws Exception { private void readMessage(MockChannel mockChannel, StreamInput input) throws IOException { Socket socket = mockChannel.activeChannel; - byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE]; - int firstByte = input.read(); - if (firstByte == -1) { + byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE]; + try { + input.readFully(minimalHeader); + } catch (EOFException eof) { throw new IOException("Connection reset by peer"); } - minimalHeader[0] = (byte) firstByte; - minimalHeader[1] = (byte) input.read(); - int msgSize = input.readInt(); + + // Read message length will throw stream corrupted exception if the marker bytes incorrect + int msgSize = TcpTransport.readMessageLength(new BytesArray(minimalHeader)); if (msgSize == -1) { socket.getOutputStream().flush(); } else { - BytesStreamOutput output = new BytesStreamOutput(); final byte[] buffer = new byte[msgSize]; input.readFully(buffer); - output.write(minimalHeader); - output.writeInt(msgSize); - output.write(buffer); - consumeNetworkReads(mockChannel, output.bytes()); + int expectedSize = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE + msgSize; + try (BytesStreamOutput output = new ReleasableBytesStreamOutput(expectedSize, bigArrays)) { + output.write(minimalHeader); + output.write(buffer); + consumeNetworkReads(mockChannel, output.bytes()); + } } }