-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add TcpChannel to unify Transport implementations #27132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Tim-Brooks
merged 31 commits into
elastic:master
from
Tim-Brooks:introduce_tcp_connection
Nov 15, 2017
Merged
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
d8cdf08
WIP
Tim-Brooks a5a30ca
WIP
Tim-Brooks e5d4c21
Fix mock transport tests
Tim-Brooks 044f6bf
WIP
Tim-Brooks 068bde0
DWIP
Tim-Brooks dbc2c85
WIP
Tim-Brooks 701e3a8
Fix nio client tests
Tim-Brooks ac4cfc3
WIP
Tim-Brooks b63eb9b
Merge branch 'master' into introduce_tcp_connection
Tim-Brooks 8f85ba5
WIP
Tim-Brooks c887e7d
Cleanup futures
Tim-Brooks 4c5d0d6
Cleanup channel closing
Tim-Brooks 57a506d
Work on handling accepting
Tim-Brooks 8fa658d
Cleanup
Tim-Brooks 1028126
Add license
Tim-Brooks 172f9eb
Work on cleaning up close futures
Tim-Brooks cea0253
Cleanups
Tim-Brooks 3565108
Add back method
Tim-Brooks 9dc88dc
Merge remote-tracking branch 'upstream/master' into introduce_tcp_con…
Tim-Brooks 10fb412
Remove futures
Tim-Brooks 9e7903d
Merge branch 'master' into introduce_tcp_connection
Tim-Brooks 8e32b7b
Drop parameterization
Tim-Brooks 1f1fd43
Fix test
Tim-Brooks 8d8df72
make changes based on review
Tim-Brooks c57d919
Cleanup code
Tim-Brooks 22bef9c
Merge remote-tracking branch 'upstream/master' into introduce_tcp_con…
Tim-Brooks c664c07
use completablefuture
Tim-Brooks d58678c
Make getter public
Tim-Brooks 3a79a16
Changes based on review
Tim-Brooks bb89a97
Don't wrap throwable
Tim-Brooks 9fd9984
Merge remote-tracking branch 'upstream/master' into introduce_tcp_con…
Tim-Brooks File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.function.BiConsumer; | ||
| import java.util.function.Consumer; | ||
|
|
||
| /** | ||
|
|
@@ -69,6 +70,50 @@ public void onFailure(Exception e) { | |
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Creates a listener that listens for a response (or failure) and executes the | ||
| * corresponding runnable when the response (or failure) is received. | ||
| * | ||
| * @param runnable the runnable that will be called in event of success or failure | ||
| * @param <Response> the type of the response | ||
| * @return a listener that listens for responses and invokes the runnable when received | ||
| */ | ||
| static <Response> ActionListener<Response> wrap(Runnable runnable) { | ||
| return new ActionListener<Response>() { | ||
| @Override | ||
| public void onResponse(Response response) { | ||
| runnable.run(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(Exception e) { | ||
| runnable.run(); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Converts a listener to a {@link BiConsumer} for compatibility with the {@link java.util.concurrent.CompletableFuture} | ||
| * api. | ||
| * | ||
| * @param listener that will be wrapped | ||
| * @param <Response> the type of the response | ||
| * @return a bi consumer that will complete the wrapped listener | ||
| */ | ||
| static <Response> BiConsumer<Response, Throwable> toBiConsumer(ActionListener<Response> listener) { | ||
| return (tcpChannel, throwable) -> { | ||
| if (throwable == null) { | ||
| listener.onResponse(tcpChannel); | ||
| } else { | ||
| if (throwable instanceof Exception) { | ||
| listener.onFailure((Exception) throwable); | ||
| } else { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should hide the non-exception. this will be an error and in this case we need to rethrow it? @jasontedor WDYT? |
||
| listener.onFailure(new Exception(throwable)); | ||
| } | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception | ||
| * the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
169 changes: 169 additions & 0 deletions
169
core/src/main/java/org/elasticsearch/transport/TcpChannel.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| /* | ||
| * Licensed to Elasticsearch under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.elasticsearch.transport; | ||
|
|
||
| import org.apache.logging.log4j.Logger; | ||
| import org.apache.logging.log4j.message.ParameterizedMessage; | ||
| import org.elasticsearch.action.ActionFuture; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.support.PlainActionFuture; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.common.lease.Releasable; | ||
| import org.elasticsearch.common.lease.Releasables; | ||
| import org.elasticsearch.common.unit.TimeValue; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
|
|
||
|
|
||
| /** | ||
| * This is a tcp channel representing a single channel connection to another node. It is the base channel | ||
| * abstraction used by the {@link TcpTransport} and {@link TransportService}. All tcp transport | ||
| * implementations must return channels that adhere to the required method contracts. | ||
| */ | ||
| public interface TcpChannel extends Releasable { | ||
|
|
||
| /** | ||
| * Closes the channel. This might be an asynchronous process. There is notguarantee that the channel | ||
| * will be closed when this method returns. Use the {@link #addCloseListener(ActionListener)} method | ||
| * to implement logic that depends on knowing when the channel is closed. | ||
| */ | ||
| void close(); | ||
|
|
||
| /** | ||
| * Adds a listener that will be executed when the channel is closed. If the channel is still open when | ||
| * this listener is added, the listener will be executed by the thread that eventually closes the | ||
| * channel. If the channel is already closed when the listener is added the listener will immediately be | ||
| * executed by the thread that is attempting to add the listener. | ||
| * | ||
| * @param listener to be executed | ||
| */ | ||
| void addCloseListener(ActionListener<TcpChannel> listener); | ||
|
|
||
|
|
||
| /** | ||
| * This sets the low level socket option {@link java.net.StandardSocketOptions} SO_LINGER on a channel. | ||
| * | ||
| * @param value to set for SO_LINGER | ||
| * @throws IOException that can be throw by the low level socket implementation | ||
| */ | ||
| void setSoLinger(int value) throws IOException; | ||
|
|
||
|
|
||
| /** | ||
| * Indicates whether a channel is currently open | ||
| * | ||
| * @return boolean indicating if channel is open | ||
| */ | ||
| boolean isOpen(); | ||
|
|
||
| /** | ||
| * Closes the channel. | ||
| * | ||
| * @param channel to close | ||
| * @param blocking indicates if we should block on channel close | ||
| */ | ||
| static <C extends TcpChannel> void closeChannel(C channel, boolean blocking) { | ||
| closeChannels(Collections.singletonList(channel), blocking); | ||
| } | ||
|
|
||
| /** | ||
| * Closes the channels. | ||
| * | ||
| * @param channels to close | ||
| * @param blocking indicates if we should block on channel close | ||
| */ | ||
| static <C extends TcpChannel> void closeChannels(List<C> channels, boolean blocking) { | ||
| if (blocking) { | ||
| ArrayList<ActionFuture<TcpChannel>> futures = new ArrayList<>(channels.size()); | ||
| for (final C channel : channels) { | ||
| if (channel.isOpen()) { | ||
| PlainActionFuture<TcpChannel> closeFuture = PlainActionFuture.newFuture(); | ||
| channel.addCloseListener(closeFuture); | ||
| channel.close(); | ||
| futures.add(closeFuture); | ||
| } | ||
| } | ||
| blockOnFutures(futures); | ||
| } else { | ||
| Releasables.close(channels); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Awaits for all of the pending connections to complete. Will throw an exception if at least one of the | ||
| * connections fails. | ||
| * | ||
| * @param discoveryNode the node for the pending connections | ||
| * @param connectionFutures representing the pending connections | ||
| * @param connectTimeout to wait for a connection | ||
| * @param <C> the type of channel | ||
| * @throws ConnectTransportException if one of the connections fails | ||
| */ | ||
| static <C extends TcpChannel> void awaitConnected(DiscoveryNode discoveryNode, List<ActionFuture<C>> connectionFutures, | ||
| TimeValue connectTimeout) throws ConnectTransportException { | ||
| Exception connectionException = null; | ||
| boolean allConnected = true; | ||
|
|
||
| for (ActionFuture<C> connectionFuture : connectionFutures) { | ||
| try { | ||
| connectionFuture.get(connectTimeout.getMillis(), TimeUnit.MILLISECONDS); | ||
| } catch (TimeoutException e) { | ||
| allConnected = false; | ||
| break; | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new IllegalStateException(e); | ||
| } catch (ExecutionException e) { | ||
| allConnected = false; | ||
| connectionException = (Exception) e.getCause(); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| if (allConnected == false) { | ||
| if (connectionException == null) { | ||
| throw new ConnectTransportException(discoveryNode, "connect_timeout[" + connectTimeout + "]"); | ||
| } else { | ||
| throw new ConnectTransportException(discoveryNode, "connect_exception", connectionException); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| static void blockOnFutures(List<ActionFuture<TcpChannel>> futures) { | ||
| for (ActionFuture<TcpChannel> future : futures) { | ||
| try { | ||
| future.get(); | ||
| } catch (ExecutionException e) { | ||
| // Ignore as we are only interested in waiting for the close process to complete. Logging | ||
| // close exceptions happens elsewhere. | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new IllegalStateException("Future got interrupted", e); | ||
| } | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just
return wrap(runnable:run, e -> runnable.run());?