Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions core/src/main/java/org/elasticsearch/transport/TcpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@

package org.elasticsearch.transport;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.unit.TimeValue;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -80,6 +80,22 @@ public interface TcpChannel extends Releasable {
*/
boolean isOpen();

/**
* Returns the local address for this channel.
*
* @return the local address of this channel.
*/
InetSocketAddress getLocalAddress();

/**
* Sends a tcp message to the channel. The listener will be executed once the send process has been
* completed.
*
* @param reference to send to channel
* @param listener to execute upon send completion
*/
void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener);

/**
* Closes the channel.
*
Expand Down
126 changes: 58 additions & 68 deletions core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public final class TcpTransportChannel<Channel extends TcpChannel> implements TransportChannel {
private final TcpTransport<Channel> transport;
public final class TcpTransportChannel implements TransportChannel {
private final TcpTransport transport;
private final Version version;
private final String action;
private final long requestId;
private final String profileName;
private final long reservedBytes;
private final AtomicBoolean released = new AtomicBoolean();
private final String channelType;
private final Channel channel;
private final TcpChannel channel;

TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action,
long requestId, Version version, String profileName, long reservedBytes) {
this.version = version;
this.channel = channel;
Expand Down Expand Up @@ -97,7 +97,7 @@ public Version getVersion() {
return version;
}

public Channel getChannel() {
public TcpChannel getChannel() {
return channel;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -172,57 +171,23 @@ public void testEnsureVersionCompatibility() {

public void testCompressRequest() throws IOException {
final boolean compressed = randomBoolean();
final AtomicBoolean called = new AtomicBoolean(false);
Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100));
ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName());
AtomicReference<IOException> exceptionReference = new AtomicReference<>();
AtomicReference<BytesReference> messageCaptor = new AtomicReference<>();
try {
TcpTransport<FakeChannel> transport = new TcpTransport<FakeChannel>(
TcpTransport transport = new TcpTransport(
"test", Settings.builder().put("transport.tcp.compress", compressed).build(), threadPool,
new BigArrays(Settings.EMPTY, null), null, null, null) {
@Override
protected InetSocketAddress getLocalAddress(FakeChannel o) {
return null;
}

@Override
protected FakeChannel bind(String name, InetSocketAddress address) throws IOException {
return null;
}

@Override
protected void sendMessage(FakeChannel o, BytesReference reference, ActionListener listener) {
try {
StreamInput streamIn = reference.streamInput();
streamIn.skip(TcpHeader.MARKER_BYTES_SIZE);
int len = streamIn.readInt();
long requestId = streamIn.readLong();
assertEquals(42, requestId);
byte status = streamIn.readByte();
Version version = Version.fromId(streamIn.readInt());
assertEquals(Version.CURRENT, version);
assertEquals(compressed, TransportStatus.isCompress(status));
called.compareAndSet(false, true);
if (compressed) {
final int bytesConsumed = TcpHeader.HEADER_SIZE;
streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed))
.streamInput(streamIn);
}
threadPool.getThreadContext().readHeaders(streamIn);
assertEquals("foobar", streamIn.readString());
Req readReq = new Req("");
readReq.readFrom(streamIn);
assertEquals(request.value, readReq.value);
} catch (IOException e) {
exceptionReference.set(e);
}
}

@Override
protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout,
ActionListener<FakeChannel> connectListener) throws IOException {
FakeChannel fakeChannel = new FakeChannel();
return fakeChannel;
ActionListener<TcpChannel> connectListener) throws IOException {
return new FakeChannel(messageCaptor);
}

@Override
Expand All @@ -233,25 +198,54 @@ public long getNumOpenServerConnections() {
@Override
public NodeChannels getConnection(DiscoveryNode node) {
int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();
ArrayList<FakeChannel> fakeChannels = new ArrayList<>(numConnections);
ArrayList<TcpChannel> fakeChannels = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; ++i) {
fakeChannels.add(new FakeChannel());
fakeChannels.add(new FakeChannel(messageCaptor));
}
return new NodeChannels(node, fakeChannels, MockTcpTransport.LIGHT_PROFILE, Version.CURRENT);
}
};

DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
Transport.Connection connection = transport.getConnection(node);
connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
assertTrue(called.get());
assertNull("IOException while sending message.", exceptionReference.get());

BytesReference reference = messageCaptor.get();
assertNotNull(reference);

StreamInput streamIn = reference.streamInput();
streamIn.skip(TcpHeader.MARKER_BYTES_SIZE);
int len = streamIn.readInt();
long requestId = streamIn.readLong();
assertEquals(42, requestId);
byte status = streamIn.readByte();
Version version = Version.fromId(streamIn.readInt());
assertEquals(Version.CURRENT, version);
assertEquals(compressed, TransportStatus.isCompress(status));
if (compressed) {
final int bytesConsumed = TcpHeader.HEADER_SIZE;
streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed))
.streamInput(streamIn);
}
threadPool.getThreadContext().readHeaders(streamIn);
assertEquals("foobar", streamIn.readString());
Req readReq = new Req("");
readReq.readFrom(streamIn);
assertEquals(request.value, readReq.value);

} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

private static final class FakeChannel implements TcpChannel {

private final AtomicReference<BytesReference> messageCaptor;

FakeChannel(AtomicReference<BytesReference> messageCaptor) {
this.messageCaptor = messageCaptor;
}

@Override
public void close() {
}
Expand All @@ -268,6 +262,16 @@ public void setSoLinger(int value) throws IOException {
public boolean isOpen() {
return false;
}

@Override
public InetSocketAddress getLocalAddress() {
return null;
}

@Override
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
messageCaptor.set(reference);
}
}

private static final class Req extends TransportRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
Expand All @@ -57,6 +56,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;

Expand All @@ -79,7 +79,7 @@
* longer. Med is for the typical search / single doc index. And High for things like cluster state. Ping is reserved for
* sending out ping requests to other nodes.
*/
public class Netty4Transport extends TcpTransport<NettyTcpChannel> {
public class Netty4Transport extends TcpTransport {

static {
Netty4Utils.setup();
Expand Down Expand Up @@ -249,7 +249,7 @@ public long getNumOpenServerConnections() {
}

@Override
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<NettyTcpChannel> listener)
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> listener)
throws IOException {
ChannelFuture channelFuture = bootstrap.connect(node.getAddress().address());
Channel channel = channelFuture.channel();
Expand Down Expand Up @@ -279,28 +279,6 @@ protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectT
return nettyChannel;
}

@Override
protected void sendMessage(NettyTcpChannel channel, BytesReference reference, ActionListener<NettyTcpChannel> listener) {
final ChannelFuture future = channel.getLowLevelChannel().writeAndFlush(Netty4Utils.toByteBuf(reference));
future.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(channel);
} else {
final Throwable cause = f.cause();
Netty4Utils.maybeDie(cause);
logger.warn((Supplier<?>) () ->
new ParameterizedMessage("write and flush on the network layer failed (channel: {})", channel), cause);
assert cause instanceof Exception;
listener.onFailure((Exception) cause);
}
});
}

@Override
protected InetSocketAddress getLocalAddress(NettyTcpChannel channel) {
return (InetSocketAddress) channel.getLowLevelChannel().localAddress();
}

@Override
protected NettyTcpChannel bind(String name, InetSocketAddress address) {
Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
package org.elasticsearch.transport.netty4;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpChannel;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;

public class NettyTcpChannel implements TcpChannel {
Expand All @@ -48,10 +53,6 @@ public class NettyTcpChannel implements TcpChannel {
});
}

public Channel getLowLevelChannel() {
return channel;
}

@Override
public void close() {
channel.close();
Expand All @@ -71,4 +72,28 @@ public void setSoLinger(int value) {
public boolean isOpen() {
return channel.isOpen();
}

@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}

@Override
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));
future.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(this);
} else {
final Throwable cause = f.cause();
Netty4Utils.maybeDie(cause);
assert cause instanceof Exception;
listener.onFailure((Exception) cause);
}
});
}

public Channel getLowLevelChannel() {
return channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;

Expand Down Expand Up @@ -108,7 +109,8 @@ public ExceptionThrowingNetty4Transport(
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
}

protected String handleRequest(NettyTcpChannel channel, String profileName,
@Override
protected String handleRequest(TcpChannel channel, String profileName,
StreamInput stream, long requestId, int messageLengthBytes, Version version,
InetSocketAddress remoteAddress, byte status) throws IOException {
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
Expand Down
Loading