Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d8cdf08
WIP
Tim-Brooks Oct 23, 2017
a5a30ca
WIP
Tim-Brooks Oct 23, 2017
e5d4c21
Fix mock transport tests
Tim-Brooks Oct 24, 2017
044f6bf
WIP
Tim-Brooks Oct 24, 2017
068bde0
DWIP
Tim-Brooks Oct 25, 2017
dbc2c85
WIP
Tim-Brooks Oct 25, 2017
701e3a8
Fix nio client tests
Tim-Brooks Oct 25, 2017
ac4cfc3
WIP
Tim-Brooks Oct 25, 2017
b63eb9b
Merge branch 'master' into introduce_tcp_connection
Tim-Brooks Oct 25, 2017
8f85ba5
WIP
Tim-Brooks Oct 25, 2017
c887e7d
Cleanup futures
Tim-Brooks Oct 26, 2017
4c5d0d6
Cleanup channel closing
Tim-Brooks Oct 26, 2017
57a506d
Work on handling accepting
Tim-Brooks Oct 26, 2017
8fa658d
Cleanup
Tim-Brooks Oct 26, 2017
1028126
Add license
Tim-Brooks Oct 26, 2017
172f9eb
Work on cleaning up close futures
Tim-Brooks Oct 27, 2017
cea0253
Cleanups
Tim-Brooks Oct 27, 2017
3565108
Add back method
Tim-Brooks Oct 30, 2017
9dc88dc
Merge remote-tracking branch 'upstream/master' into introduce_tcp_con…
Tim-Brooks Nov 2, 2017
10fb412
Remove futures
Tim-Brooks Nov 2, 2017
9e7903d
Merge branch 'master' into introduce_tcp_connection
Tim-Brooks Nov 8, 2017
8e32b7b
Drop parameterization
Tim-Brooks Nov 13, 2017
1f1fd43
Fix test
Tim-Brooks Nov 13, 2017
8d8df72
make changes based on review
Tim-Brooks Nov 13, 2017
c57d919
Cleanup code
Tim-Brooks Nov 13, 2017
22bef9c
Merge remote-tracking branch 'upstream/master' into introduce_tcp_con…
Tim-Brooks Nov 13, 2017
c664c07
use completablefuture
Tim-Brooks Nov 13, 2017
d58678c
Make getter public
Tim-Brooks Nov 14, 2017
3a79a16
Changes based on review
Tim-Brooks Nov 14, 2017
bb89a97
Don't wrap throwable
Tim-Brooks Nov 14, 2017
9fd9984
Merge remote-tracking branch 'upstream/master' into introduce_tcp_con…
Tim-Brooks Nov 15, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,28 @@ public void onFailure(Exception e) {
};
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding runnable when the response (or failure) is received.
*
* @param runnable the runnable that will be called in event of success or failure
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the runnable when received
*/
static <Response> ActionListener<Response> wrap(Runnable runnable) {
return new ActionListener<Response>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just return wrap(runnable:run, e -> runnable.run());?

@Override
public void onResponse(Response response) {
runnable.run();
}

@Override
public void onFailure(Exception e) {
runnable.run();
}
};
}

/**
* Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception
* the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.support;

import org.elasticsearch.action.ActionListener;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

public class ListenerExecutionContext<V> implements ActionListener<V> {

private static Object NULL_VALUE = new Object();

private final ConcurrentLinkedQueue<ActionListener<V>> listeners = new ConcurrentLinkedQueue<>();
private final AtomicReference<Object> result = new AtomicReference<>(null);

@Override
public void onResponse(V value) {
if (result.compareAndSet(null, value != null ? value : NULL_VALUE)) {
ActionListener<V> listener;
while ((listener = listeners.poll()) != null) {
listener.onResponse(value);
}
}
}

@Override
public void onFailure(Exception e) {
if (e == null) {
throw new IllegalArgumentException("Exception cannot be null");
}
if (result.compareAndSet(null, e)) {
ActionListener<V> listener;
while ((listener = listeners.poll()) != null) {
listener.onFailure(e);
}
}
}

public boolean isDone() {
return result.get() != null;
}

public void addListener(ActionListener<V> listener) {
internalAddListener(listener);
}

@SuppressWarnings("unchecked")
private void internalAddListener(ActionListener<V> listener) {
listeners.offer(listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a race here. We might execute this listener twice. I think we should keep it simple and just use synchronize and make a fully copy of the list every time we add a listener. I don't think this is perf critical. Let's make the state only mutable under a mutex otherwise it's too complicated. That way you can also just use ActionListener#onResponse and ActionListener#onFailure


Object result = this.result.get();
if (result != null) {
if (listeners.remove(listener)) {
if (result instanceof Exception) {
listener.onFailure((Exception) result);
} else if (result == NULL_VALUE) {
listener.onResponse(null);
} else {
listener.onResponse((V) result);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ private ConnectionTypeHandle(int offset, int length, Set<TransportRequestOptions
* Returns one of the channels out configured for this handle. The channel is selected in a round-robin
* fashion.
*/
<T> T getChannel(T[] channels) {
<T> T getChannel(List<T> channels) {
if (length == 0) {
throw new IllegalStateException("can't select channel size is 0 for types: " + types);
}
assert channels.length >= offset + length : "illegal size: " + channels.length + " expected >= " + (offset + length);
return channels[offset + Math.floorMod(counter.incrementAndGet(), length)];
assert channels.size() >= offset + length : "illegal size: " + channels.size() + " expected >= " + (offset + length);
return channels.get(offset + Math.floorMod(counter.incrementAndGet(), length));
}

/**
Expand All @@ -223,5 +223,4 @@ Set<TransportRequestOptions.Type> getTypes() {
return types;
}
}

}
35 changes: 35 additions & 0 deletions core/src/main/java/org/elasticsearch/transport/TcpChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import org.elasticsearch.action.ActionListener;

import java.io.IOException;

public interface TcpChannel<C extends TcpChannel<C>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that down the road we will get rid of the generics here and add all the necessary method to it that TcpTransport needs, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs javadocs :)


void closeAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this can simply implement AutoCloseable and we document that is might be async and the close listeners will be called upon closing.


void addCloseListener(ActionListener<C> listener);

void setSoLinger(int value) throws IOException;

boolean isOpen();
}
137 changes: 137 additions & 0 deletions core/src/main/java/org/elasticsearch/transport/TcpChannelUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TcpChannelUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move these static methods on to TcpChannel I don't think we need this util class?


public static <C extends TcpChannel<C>> void closeChannel(C channel, boolean blocking) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just delegate to closeChannels with a Collections#singletonList or can we even do it on the caller side?

if (channel.isOpen()) {
if (blocking) {
PlainActionFuture<C> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture);
channel.closeAsync();
blockOnFutures(Collections.singletonList(closeFuture));
} else {
channel.closeAsync();
}
}
}

public static <C extends TcpChannel<C>> void closeChannels(List<C> channels, boolean blocking) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all these need javadocs too

if (blocking) {
ArrayList<ActionFuture<C>> futures = new ArrayList<>(channels.size());
for (final C channel : channels) {
if (channel.isOpen()) {
PlainActionFuture<C> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture);
channel.closeAsync();
futures.add(closeFuture);
}
}
blockOnFutures(futures);
} else {
channels.forEach(c -> {
if (c.isOpen()) {
c.closeAsync();
}
});
}
}

public static <C extends TcpChannel<C>> void closeServerChannels(String profile, List<C> channels, Logger logger) {
ArrayList<ActionFuture<C>> futures = new ArrayList<>(channels.size());
for (final C channel : channels) {
if (channel.isOpen()) {
PlainActionFuture<C> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(ActionListener.wrap(c -> {
},
e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e)));
channel.addCloseListener(closeFuture);
channel.closeAsync();
futures.add(closeFuture);
}
}

blockOnFutures(futures);
}

public static <C extends TcpChannel<C>> void finishConnection(DiscoveryNode discoveryNode, List<ActionFuture<C>> connectionFutures,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about calling this awaitConnected?

TimeValue connectTimeout) throws ConnectTransportException {
Exception connectionException = null;
boolean allConnected = true;

for (ActionFuture<C> connectionFuture : connectionFutures) {
try {
connectionFuture.get(connectTimeout.getMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
allConnected = false;
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
} catch (ExecutionException e) {
allConnected = false;
connectionException = (Exception) e.getCause();
break;
}
}

if (allConnected == false) {
if (connectionException == null) {
throw new ConnectTransportException(discoveryNode, "connect_timeout[" + connectTimeout + "]");
} else {
throw new ConnectTransportException(discoveryNode, "connect_exception", connectionException);
}
}
}


private static <C extends TcpChannel<C>> void blockOnFutures(List<ActionFuture<C>> futures) {
for (ActionFuture<C> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
// Ignore as we already attached a listener to log
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
}
}
}
}
Loading