-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Add connectToNodeAsExtension in TransportService #6866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
de8b8b8
0003dcc
4d87934
98b1af0
e1c754b
1ef93fc
38ca9da
5ebed1c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -165,7 +165,7 @@ public void close() {} | |
| * Build the service. | ||
| * | ||
| * @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings | ||
| * * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}. | ||
| * * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}. | ||
| */ | ||
| public TransportService( | ||
| Settings settings, | ||
|
|
@@ -397,6 +397,15 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException { | |
| connectToNode(node, (ConnectionProfile) null); | ||
| } | ||
|
|
||
| /** | ||
| * Connect to the specified node as an extension with the default connection profile | ||
| * | ||
| * @param node the node to connect to | ||
| */ | ||
| public void connectToNodeAsExtension(DiscoveryNode node, String extensionUniqueId) throws ConnectTransportException { | ||
| connectToNodeAsExtension(node, (ConnectionProfile) null, extensionUniqueId); | ||
| } | ||
|
|
||
| // We are skipping node validation for extensibility as extensionNode and opensearchNode(LocalNode) will have different ephemeral id's | ||
| public void connectToExtensionNode(final DiscoveryNode node) { | ||
| PlainActionFuture.get(fut -> connectToExtensionNode(node, (ConnectionProfile) null, ActionListener.map(fut, x -> null))); | ||
|
|
@@ -412,6 +421,19 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection | |
| PlainActionFuture.get(fut -> connectToNode(node, connectionProfile, ActionListener.map(fut, x -> null))); | ||
| } | ||
|
|
||
| /** | ||
| * Connect to the specified node with the given connection profile | ||
| * | ||
| * @param node the node to connect to | ||
| * @param connectionProfile the connection profile to use when connecting to this node | ||
| * @param extensionUniqueIq the id of the extension | ||
| */ | ||
| public void connectToNodeAsExtension(final DiscoveryNode node, ConnectionProfile connectionProfile, String extensionUniqueIq) { | ||
| PlainActionFuture.get( | ||
| fut -> connectToNodeAsExtension(node, connectionProfile, extensionUniqueIq, ActionListener.map(fut, x -> null)) | ||
| ); | ||
| } | ||
|
|
||
| public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile) { | ||
| PlainActionFuture.get(fut -> connectToExtensionNode(node, connectionProfile, ActionListener.map(fut, x -> null))); | ||
| } | ||
|
|
@@ -447,6 +469,33 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection | |
| connectionManager.connectToNode(node, connectionProfile, connectionValidator(node), listener); | ||
| } | ||
|
|
||
| /** | ||
| * Connect to the specified node as an extension with the given connection profile. | ||
| * The ActionListener will be called on the calling thread or the generic thread pool. | ||
| * | ||
| * @param node the node to connect to | ||
| * @param connectionProfile the connection profile to use when connecting to this node | ||
| * @param extensionUniqueId the id of the extension | ||
| * @param listener the action listener to notify | ||
| */ | ||
| public void connectToNodeAsExtension( | ||
| final DiscoveryNode node, | ||
| ConnectionProfile connectionProfile, | ||
| String extensionUniqueId, | ||
| ActionListener<Void> listener | ||
| ) { | ||
| if (isLocalNode(node)) { | ||
| listener.onResponse(null); | ||
| return; | ||
| } | ||
| connectionManager.connectToNode( | ||
| node, | ||
| connectionProfile, | ||
| connectionValidatorForExtensionConnectingToNode(node, extensionUniqueId), | ||
| listener | ||
| ); | ||
| } | ||
|
|
||
| public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Void> listener) { | ||
| if (isLocalNode(node)) { | ||
| listener.onResponse(null); | ||
|
|
@@ -470,6 +519,25 @@ public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode n | |
| }; | ||
| } | ||
|
|
||
| public ConnectionManager.ConnectionValidator connectionValidatorForExtensionConnectingToNode( | ||
| DiscoveryNode node, | ||
| String extensionUniqueId | ||
| ) { | ||
| return (newConnection, actualProfile, listener) -> { | ||
| // We don't validate cluster names to allow for CCS connections. | ||
| threadPool.getThreadContext().putHeader("extension_unique_id", extensionUniqueId); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am confused on how using the thread context will work here. The action futures are all going to be working asynchronously so how can we inject to the thread context at this point?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm trying to model this like cross-cluster which uses a thread context header to store the cluster name. See: https://github.com/opensearch-project/security/blob/main/src/main/java/org/opensearch/security/transport/SecurityInterceptor.java#L131-L132 With this, during the handshake it would inject the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This uses the |
||
| handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> { | ||
| final DiscoveryNode remote = resp.discoveryNode; | ||
|
|
||
| if (node.equals(remote) == false) { | ||
| throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); | ||
| } | ||
|
|
||
| return null; | ||
| })); | ||
| }; | ||
| } | ||
|
|
||
| public ConnectionManager.ConnectionValidator extensionConnectionValidator(DiscoveryNode node) { | ||
| return (newConnection, actualProfile, listener) -> { | ||
| // We don't validate cluster names to allow for CCS connections. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the line that makes this different than
connectToNode. This injectsextension_unique_idinto the threadcontext for the handshake which allows the security plugin to identify that the transport request was initiated by an extensions. See companion PR in security repo: opensearch-project/security#2599