Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions core/src/main/java/org/elasticsearch/transport/TcpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -61,7 +60,7 @@ public interface TcpChannel extends Releasable {
*
* @param listener to be executed
*/
void addCloseListener(ActionListener<TcpChannel> listener);
void addCloseListener(ActionListener<Void> listener);


/**
Expand Down Expand Up @@ -94,7 +93,7 @@ public interface TcpChannel extends Releasable {
* @param reference to send to channel
* @param listener to execute upon send completion
*/
void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener);
void sendMessage(BytesReference reference, ActionListener<Void> listener);

/**
* Closes the channel.
Expand All @@ -114,10 +113,10 @@ static <C extends TcpChannel> void closeChannel(C channel, boolean blocking) {
*/
static <C extends TcpChannel> void closeChannels(List<C> channels, boolean blocking) {
if (blocking) {
ArrayList<ActionFuture<TcpChannel>> futures = new ArrayList<>(channels.size());
ArrayList<ActionFuture<Void>> futures = new ArrayList<>(channels.size());
for (final C channel : channels) {
if (channel.isOpen()) {
PlainActionFuture<TcpChannel> closeFuture = PlainActionFuture.newFuture();
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture);
channel.close();
futures.add(closeFuture);
Expand All @@ -136,15 +135,14 @@ static <C extends TcpChannel> void closeChannels(List<C> channels, boolean block
* @param discoveryNode the node for the pending connections
* @param connectionFutures representing the pending connections
* @param connectTimeout to wait for a connection
* @param <C> the type of channel
* @throws ConnectTransportException if one of the connections fails
*/
static <C extends TcpChannel> void awaitConnected(DiscoveryNode discoveryNode, List<ActionFuture<C>> connectionFutures,
TimeValue connectTimeout) throws ConnectTransportException {
static void awaitConnected(DiscoveryNode discoveryNode, List<ActionFuture<Void>> connectionFutures, TimeValue connectTimeout)
throws ConnectTransportException {
Exception connectionException = null;
boolean allConnected = true;

for (ActionFuture<C> connectionFuture : connectionFutures) {
for (ActionFuture<Void> connectionFuture : connectionFutures) {
try {
connectionFuture.get(connectTimeout.getMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
Expand All @@ -169,8 +167,8 @@ static <C extends TcpChannel> void awaitConnected(DiscoveryNode discoveryNode, L
}
}

static void blockOnFutures(List<ActionFuture<TcpChannel>> futures) {
for (ActionFuture<TcpChannel> future : futures) {
static void blockOnFutures(List<ActionFuture<Void>> futures) {
for (ActionFuture<Void> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ protected void doRunInLifecycle() throws Exception {
for (TcpChannel channel : channels.getChannels()) {
internalSendMessage(channel, pingHeader, new SendMetricListener(pingHeader.length()) {
@Override
protected void innerInnerOnResponse(TcpChannel channel) {
protected void innerInnerOnResponse(Void v) {
successfulPings.inc();
}

Expand Down Expand Up @@ -595,10 +595,10 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
int numConnections = connectionProfile.getNumConnections();
assert numConnections > 0 : "A connection profile must be configured with at least one connection";
List<TcpChannel> channels = new ArrayList<>(numConnections);
List<ActionFuture<TcpChannel>> connectionFutures = new ArrayList<>(numConnections);
List<ActionFuture<Void>> connectionFutures = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; ++i) {
try {
PlainActionFuture<TcpChannel> connectFuture = PlainActionFuture.newFuture();
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
connectionFutures.add(connectFuture);
TcpChannel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture);
channels.add(channel);
Expand Down Expand Up @@ -940,7 +940,7 @@ protected final void doStop() {
for (Map.Entry<String, List<TcpChannel>> entry : serverChannels.entrySet()) {
String profile = entry.getKey();
List<TcpChannel> channels = entry.getValue();
ActionListener<TcpChannel> closeFailLogger = ActionListener.wrap(c -> {},
ActionListener<Void> closeFailLogger = ActionListener.wrap(c -> {},
e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e));
channels.forEach(c -> c.addCloseListener(closeFailLogger));
TcpChannel.closeChannels(channels, true);
Expand Down Expand Up @@ -1016,7 +1016,7 @@ protected void onException(TcpChannel channel, Exception e) {
BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8));
final SendMetricListener closeChannel = new SendMetricListener(message.length()) {
@Override
protected void innerInnerOnResponse(TcpChannel channel) {
protected void innerInnerOnResponse(Void v) {
TcpChannel.closeChannel(channel, false);
}

Expand Down Expand Up @@ -1060,7 +1060,7 @@ protected void serverAcceptedChannel(TcpChannel channel) {
* @return the pending connection
* @throws IOException if an I/O exception occurs while opening the channel
*/
protected abstract TcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> connectListener)
protected abstract TcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
throws IOException;

/**
Expand Down Expand Up @@ -1686,20 +1686,20 @@ protected final void ensureOpen() {
/**
* This listener increments the transmitted bytes metric on success.
*/
private abstract class SendMetricListener extends NotifyOnceListener<TcpChannel> {
private abstract class SendMetricListener extends NotifyOnceListener<Void> {
private final long messageSize;

private SendMetricListener(long messageSize) {
this.messageSize = messageSize;
}

@Override
protected final void innerOnResponse(org.elasticsearch.transport.TcpChannel object) {
protected final void innerOnResponse(Void object) {
transmittedBytesMetric.inc(messageSize);
innerInnerOnResponse(object);
}

protected abstract void innerInnerOnResponse(org.elasticsearch.transport.TcpChannel object);
protected abstract void innerInnerOnResponse(Void object);
}

private final class SendListener extends SendMetricListener {
Expand All @@ -1715,7 +1715,7 @@ private SendListener(TcpChannel channel, Releasable optionalReleasable, Runnable
}

@Override
protected void innerInnerOnResponse(TcpChannel channel) {
protected void innerInnerOnResponse(Void v) {
release();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ protected FakeChannel bind(String name, InetSocketAddress address) throws IOExce
}

@Override
protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout,
ActionListener<TcpChannel> connectListener) throws IOException {
protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
throws IOException {
return new FakeChannel(messageCaptor);
}

Expand Down Expand Up @@ -251,7 +251,7 @@ public void close() {
}

@Override
public void addCloseListener(ActionListener<TcpChannel> listener) {
public void addCloseListener(ActionListener<Void> listener) {
}

@Override
Expand All @@ -269,7 +269,7 @@ public InetSocketAddress getLocalAddress() {
}

@Override
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
messageCaptor.set(reference);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public long getNumOpenServerConnections() {
}

@Override
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> listener)
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> listener)
throws IOException {
ChannelFuture channelFuture = bootstrap.connect(node.getAddress().address());
Channel channel = channelFuture.channel();
Expand All @@ -264,7 +264,7 @@ protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectT

channelFuture.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(nettyChannel);
listener.onResponse(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
public class NettyTcpChannel implements TcpChannel {

private final Channel channel;
private final CompletableFuture<TcpChannel> closeContext = new CompletableFuture<>();
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();

NettyTcpChannel(Channel channel) {
this.channel = channel;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(this);
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Expand All @@ -59,7 +59,7 @@ public void close() {
}

@Override
public void addCloseListener(ActionListener<TcpChannel> listener) {
public void addCloseListener(ActionListener<Void> listener) {
closeContext.whenComplete(ActionListener.toBiConsumer(listener));
}

Expand All @@ -79,11 +79,11 @@ public InetSocketAddress getLocalAddress() {
}

@Override
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));
future.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(this);
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
Netty4Utils.maybeDie(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
}

@Override
protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> connectListener)
protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
throws IOException {
InetSocketAddress address = node.getAddress().address();
final MockSocket socket = new MockSocket();
Expand All @@ -186,7 +186,7 @@ protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeo
MockChannel channel = new MockChannel(socket, address, "none", (c) -> {});
channel.loopRead(executor);
success = true;
connectListener.onResponse(channel);
connectListener.onResponse(null);
return channel;
} finally {
if (success == false) {
Expand Down Expand Up @@ -231,7 +231,7 @@ public final class MockChannel implements Closeable, TcpChannel {
private final String profile;
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final Closeable onClose;
private final CompletableFuture<TcpChannel> closeFuture = new CompletableFuture<>();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();

/**
* Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic.
Expand Down Expand Up @@ -356,14 +356,14 @@ public String toString() {
public void close() {
try {
close0();
closeFuture.complete(this);
closeFuture.complete(null);
} catch (IOException e) {
closeFuture.completeExceptionally(e);
}
}

@Override
public void addCloseListener(ActionListener<TcpChannel> listener) {
public void addCloseListener(ActionListener<Void> listener) {
closeFuture.whenComplete(ActionListener.toBiConsumer(listener));
}

Expand All @@ -386,14 +386,14 @@ public InetSocketAddress getLocalAddress() {
}

@Override
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
try {
synchronized (this) {
OutputStream outputStream = new BufferedOutputStream(activeChannel.getOutputStream());
reference.writeTo(outputStream);
outputStream.flush();
}
listener.onResponse(this);
listener.onResponse(null);
} catch (IOException e) {
listener.onFailure(e);
onException(this, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.transport.nio;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -32,7 +31,6 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
Expand Down Expand Up @@ -95,22 +93,11 @@ protected NioServerSocketChannel bind(String name, InetSocketAddress address) th
}

@Override
protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> connectListener)
protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
throws IOException {
NioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
openChannels.clientChannelOpened(channel);
// TODO: Temporary conversion due to types
channel.addConnectListener(new ActionListener<NioChannel>() {
@Override
public void onResponse(NioChannel nioChannel) {
connectListener.onResponse(nioChannel);
}

@Override
public void onFailure(Exception e) {
connectListener.onFailure(e);
}
});
channel.addConnectListener(connectListener);
return channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;

import java.io.IOException;
Expand All @@ -33,10 +32,10 @@
public class WriteOperation {

private final NioSocketChannel channel;
private final ActionListener<NioChannel> listener;
private final ActionListener<Void> listener;
private final NetworkBytesReference[] references;

public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener<NioChannel> listener) {
public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener<Void> listener) {
this.channel = channel;
this.listener = listener;
this.references = toArray(bytesReference);
Expand All @@ -46,7 +45,7 @@ public NetworkBytesReference[] getByteReferences() {
return references;
}

public ActionListener<NioChannel> getListener() {
public ActionListener<Void> getListener() {
return listener;
}

Expand Down
Loading