Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final CircuitBreakerService circuitBreakerService;
private final Version version;
protected final ThreadPool threadPool;
private final BigArrays bigArrays;
protected final BigArrays bigArrays;
protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
*/
package org.elasticsearch.transport;

import org.elasticsearch.cli.SuppressForbidden;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cli.SuppressForbidden;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -35,15 +37,16 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.mocksocket.MockSocket;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -139,24 +142,26 @@ protected void doRun() throws Exception {

private void readMessage(MockChannel mockChannel, StreamInput input) throws IOException {
Socket socket = mockChannel.activeChannel;
byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE];
int firstByte = input.read();
if (firstByte == -1) {
byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE];
try {
input.readFully(minimalHeader);
} catch (EOFException eof) {
throw new IOException("Connection reset by peer");
}
minimalHeader[0] = (byte) firstByte;
minimalHeader[1] = (byte) input.read();
int msgSize = input.readInt();

// Read message length will throw stream corrupted exception if the marker bytes incorrect
int msgSize = TcpTransport.readMessageLength(new BytesArray(minimalHeader));
if (msgSize == -1) {
socket.getOutputStream().flush();
} else {
BytesStreamOutput output = new BytesStreamOutput();
final byte[] buffer = new byte[msgSize];
input.readFully(buffer);
output.write(minimalHeader);
output.writeInt(msgSize);
output.write(buffer);
consumeNetworkReads(mockChannel, output.bytes());
int expectedSize = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE + msgSize;
try (BytesStreamOutput output = new ReleasableBytesStreamOutput(expectedSize, bigArrays)) {
output.write(minimalHeader);
output.write(buffer);
consumeNetworkReads(mockChannel, output.bytes());
}
}
}

Expand Down