Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- [Extensions] Moving Extensions APIs to protobuf serialization. ([#6960](https://github.com/opensearch-project/OpenSearch/pull/6960))
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void close() {}
* Build the service.
*
* @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
* * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
* * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
*/
public TransportService(
Settings settings,
Expand Down Expand Up @@ -400,6 +400,15 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
connectToNode(node, (ConnectionProfile) null);
}

/**
* Connect to the specified node as an extension with the default connection profile
*
* @param node the node to connect to
*/
public void connectToNodeAsExtension(DiscoveryNode node, String extensionUniqueId) throws ConnectTransportException {
connectToNodeAsExtension(node, (ConnectionProfile) null, extensionUniqueId);
}

// We are skipping node validation for extensibility as extensionNode and opensearchNode(LocalNode) will have different ephemeral id's
public void connectToExtensionNode(final DiscoveryNode node) {
PlainActionFuture.get(fut -> connectToExtensionNode(node, (ConnectionProfile) null, ActionListener.map(fut, x -> null)));
Expand All @@ -415,6 +424,19 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
PlainActionFuture.get(fut -> connectToNode(node, connectionProfile, ActionListener.map(fut, x -> null)));
}

/**
* Connect to the specified node with the given connection profile
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param extensionUniqueIq the id of the extension
*/
public void connectToNodeAsExtension(final DiscoveryNode node, ConnectionProfile connectionProfile, String extensionUniqueIq) {
PlainActionFuture.get(
fut -> connectToNodeAsExtension(node, connectionProfile, extensionUniqueIq, ActionListener.map(fut, x -> null))
);
}

public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile) {
PlainActionFuture.get(fut -> connectToExtensionNode(node, connectionProfile, ActionListener.map(fut, x -> null)));
}
Expand Down Expand Up @@ -450,6 +472,33 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
connectionManager.connectToNode(node, connectionProfile, connectionValidator(node), listener);
}

/**
* Connect to the specified node as an extension with the given connection profile.
* The ActionListener will be called on the calling thread or the generic thread pool.
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param extensionUniqueId the id of the extension
* @param listener the action listener to notify
*/
public void connectToNodeAsExtension(
final DiscoveryNode node,
ConnectionProfile connectionProfile,
String extensionUniqueId,
ActionListener<Void> listener
) {
if (isLocalNode(node)) {
listener.onResponse(null);
return;
}
connectionManager.connectToNode(
node,
connectionProfile,
connectionValidatorForExtensionConnectingToNode(node, extensionUniqueId),
listener
);
}

public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Void> listener) {
if (isLocalNode(node)) {
listener.onResponse(null);
Expand All @@ -473,6 +522,25 @@ public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode n
};
}

public ConnectionManager.ConnectionValidator connectionValidatorForExtensionConnectingToNode(
DiscoveryNode node,
String extensionUniqueId
) {
return (newConnection, actualProfile, listener) -> {
// We don't validate cluster names to allow for CCS connections.
threadPool.getThreadContext().putHeader("extension_unique_id", extensionUniqueId);
handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> {
final DiscoveryNode remote = resp.discoveryNode;

if (node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}

return null;
}));
};
}

public ConnectionManager.ConnectionValidator extensionConnectionValidator(DiscoveryNode node) {
return (newConnection, actualProfile, listener) -> {
// We don't validate cluster names to allow for CCS connections.
Expand Down