From f05062b87c74691c79db2319663bde8559633f95 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 8 Nov 2018 18:30:40 -0700 Subject: [PATCH] Improve MockTcpTransport memory usage The MockTcpTransport is not friendly in regards to memory usage. It must allocate multiple byte arrays for every message. This improves the memory situation by failing fast if the message is improperly formatted. Additionally, it uses reusable big arrays for at least half of the allocated byte arrays. --- .../elasticsearch/transport/TcpTransport.java | 2 +- .../transport/MockTcpTransport.java | 35 +++++++++++-------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 545055fadf019..324b7ebe506a1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -179,7 +179,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final CircuitBreakerService circuitBreakerService; private final Version version; protected final ThreadPool threadPool; - private final BigArrays bigArrays; + protected final BigArrays bigArrays; protected final NetworkService networkService; protected final Set profileSettings; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index d1977da97951f..6d636e557a9de 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -18,15 +18,17 @@ */ package org.elasticsearch.transport; -import org.elasticsearch.cli.SuppressForbidden; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cli.SuppressForbidden; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -35,15 +37,16 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.mocksocket.MockSocket; -import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.threadpool.ThreadPool; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.Closeable; +import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; @@ -139,24 +142,26 @@ protected void doRun() throws Exception { private void readMessage(MockChannel mockChannel, StreamInput input) throws IOException { Socket socket = mockChannel.activeChannel; - byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE]; - int firstByte = input.read(); - if (firstByte == -1) { + byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE]; + try { + input.readFully(minimalHeader); + } catch (EOFException eof) { throw new IOException("Connection reset by peer"); } - minimalHeader[0] = (byte) firstByte; - minimalHeader[1] = (byte) input.read(); - int msgSize = input.readInt(); + + // Read message length will throw stream corrupted exception if the marker bytes incorrect + int msgSize = TcpTransport.readMessageLength(new BytesArray(minimalHeader)); if (msgSize == -1) { socket.getOutputStream().flush(); } else { - BytesStreamOutput output = new BytesStreamOutput(); final byte[] buffer = new byte[msgSize]; input.readFully(buffer); - output.write(minimalHeader); - output.writeInt(msgSize); - output.write(buffer); - consumeNetworkReads(mockChannel, output.bytes()); + int expectedSize = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE + msgSize; + try (BytesStreamOutput output = new ReleasableBytesStreamOutput(expectedSize, bigArrays)) { + output.write(minimalHeader); + output.write(buffer); + consumeNetworkReads(mockChannel, output.bytes()); + } } }