From 2d4f7be5bd225a4d75c080b43f33be9ed5cb0258 Mon Sep 17 00:00:00 2001 From: Ryan Bogan Date: Wed, 14 Dec 2022 20:56:02 +0000 Subject: [PATCH 1/4] Renaming tests to be consistent with main branch Signed-off-by: Ryan Bogan --- .../extensions/ExtensionsManager.java | 6 ++-- .../rest/RestSendToExtensionAction.java | 12 ++++---- .../extensions/ExtensionsManagerTests.java | 30 +++++++++---------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 86293d38fcfd7..e32c437e4a953 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -65,7 +65,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; /** - * The main class for orchestrating Extension communication with the OpenSearch Node. + * The main class for managing Extension communication with the OpenSearch Node. * * @opensearch.internal */ @@ -445,7 +445,7 @@ private void onIndexModule(IndexModule indexModule, DiscoveryNode extensionNode) final CompletableFuture inProgressFuture = new CompletableFuture<>(); final CompletableFuture inProgressIndexNameFuture = new CompletableFuture<>(); - final TransportResponseHandler indicesModuleNameResponseHandler = new TransportResponseHandler< + final TransportResponseHandler extensionBooleanResponseHandler = new TransportResponseHandler< ExtensionBooleanResponse>() { @Override public void handleResponse(ExtensionBooleanResponse response) { @@ -497,7 +497,7 @@ public void beforeIndexRemoved( extensionNode, INDICES_EXTENSION_NAME_ACTION_NAME, new IndicesModuleRequest(indexModule), - indicesModuleNameResponseHandler + extensionBooleanResponseHandler ); /* * Making async synchronous for now. diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java index b1b508805d9fe..4f4df77d274c5 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java @@ -34,7 +34,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -124,7 +124,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC emptyList(), false ); - final CountDownLatch inProgressLatch = new CountDownLatch(1); + final CompletableFuture inProgressFuture = new CompletableFuture<>(); final TransportResponseHandler restExecuteOnExtensionResponseHandler = new TransportResponseHandler< RestExecuteOnExtensionResponse>() { @@ -145,16 +145,16 @@ public void handleResponse(RestExecuteOnExtensionResponse response) { if (response.isContentConsumed()) { request.content(); } - inProgressLatch.countDown(); + inProgressFuture.complete(response); } @Override public void handleException(TransportException exp) { - logger.debug("REST request failed", exp); + logger.error("REST request failed", exp); // Status is already defaulted to 500 (INTERNAL_SERVER_ERROR) byte[] responseBytes = ("Request failed: " + exp.getMessage()).getBytes(StandardCharsets.UTF_8); restExecuteOnExtensionResponse.setContent(responseBytes); - inProgressLatch.countDown(); + inProgressFuture.completeExceptionally(exp); } @Override @@ -175,7 +175,7 @@ public String executor() { restExecuteOnExtensionResponseHandler ); try { - inProgressLatch.await(ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); + inProgressFuture.get(ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException e) { return channel -> channel.sendResponse( new BytesRestResponse(RestStatus.REQUEST_TIMEOUT, "No response from extension to request.") diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index c63e3d8423527..d222be1b35c06 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -215,18 +215,18 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } - public void testExtensionsDiscovery() throws Exception { + public void testDiscover() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - List expectedExtensionsList = new ArrayList(); + List expectedUninitializedExtensions = new ArrayList(); String expectedUniqueId = "uniqueid0"; Version expectedVersion = Version.fromString("2.0.0"); ExtensionDependency expectedDependency = new ExtensionDependency(expectedUniqueId, expectedVersion); - expectedExtensionsList.add( + expectedUninitializedExtensions.add( new DiscoveryExtensionNode( "firstExtension", "uniqueid1", @@ -250,7 +250,7 @@ public void testExtensionsDiscovery() throws Exception { ) ); - expectedExtensionsList.add( + expectedUninitializedExtensions.add( new DiscoveryExtensionNode( "secondExtension", "uniqueid2", @@ -273,10 +273,10 @@ public void testExtensionsDiscovery() throws Exception { List.of(expectedDependency) ) ); - assertEquals(expectedExtensionsList.size(), extensionsManager.getExtensionIdMap().values().size()); - assertEquals(List.of(expectedDependency), expectedExtensionsList.get(1).getDependencies()); - assertTrue(expectedExtensionsList.containsAll(extensionsManager.getExtensionIdMap().values())); - assertTrue(extensionsManager.getExtensionIdMap().values().containsAll(expectedExtensionsList)); + assertEquals(expectedUninitializedExtensions.size(), extensionsManager.getExtensionIdMap().values().size()); + assertEquals(List.of(expectedDependency), expectedUninitializedExtensions.get(1).getDependencies()); + assertTrue(expectedUninitializedExtensions.containsAll(extensionsManager.getExtensionIdMap().values())); + assertTrue(extensionsManager.getExtensionIdMap().values().containsAll(expectedUninitializedExtensions)); } public void testNonUniqueExtensionsDiscovery() throws Exception { @@ -288,9 +288,9 @@ public void testNonUniqueExtensionsDiscovery() throws Exception { ExtensionsManager extensionsManager = new ExtensionsManager(settings, emptyExtensionDir); - List expectedExtensionsList = new ArrayList(); + List expectedUninitializedExtensions = new ArrayList(); - expectedExtensionsList.add( + expectedUninitializedExtensions.add( new DiscoveryExtensionNode( "firstExtension", "uniqueid1", @@ -313,10 +313,10 @@ public void testNonUniqueExtensionsDiscovery() throws Exception { Collections.emptyList() ) ); - assertEquals(expectedExtensionsList.size(), extensionsManager.getExtensionIdMap().values().size()); - assertTrue(expectedExtensionsList.containsAll(extensionsManager.getExtensionIdMap().values())); - assertTrue(extensionsManager.getExtensionIdMap().values().containsAll(expectedExtensionsList)); - assertTrue(expectedExtensionsList.containsAll(emptyList())); + assertEquals(expectedUninitializedExtensions.size(), extensionsManager.getExtensionIdMap().values().size()); + assertTrue(expectedUninitializedExtensions.containsAll(extensionsManager.getExtensionIdMap().values())); + assertTrue(extensionsManager.getExtensionIdMap().values().containsAll(expectedUninitializedExtensions)); + assertTrue(expectedUninitializedExtensions.containsAll(emptyList())); } public void testDiscoveryExtension() throws Exception { @@ -418,7 +418,7 @@ public void testEmptyExtensionsFile() throws Exception { expectThrows(IOException.class, () -> new ExtensionsManager(settings, emptyExtensionDir)); } - public void testExtensionsInitialize() throws Exception { + public void testInitialize() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); initialize(extensionsManager); From 0eeddf2d3dfb5eb2755cd8e07a64496abe52fd31 Mon Sep 17 00:00:00 2001 From: Ryan Bogan Date: Wed, 14 Dec 2022 20:58:55 +0000 Subject: [PATCH 2/4] Update CHANGELOG Signed-off-by: Ryan Bogan --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e99cd4a6cf5e..f92bd656a691a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -134,6 +134,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Renaming to match merge to main branch ([5362](https://github.com/opensearch-project/OpenSearch/pull/5362)) - Updated settings registration changes to reflect main ([5532](https://github.com/opensearch-project/OpenSearch/pull/5532)) - Added dependency information to Extensions ([#5438](https://github.com/opensearch-project/OpenSearch/pull/5438)) + - Renaming tests to be consistent with main branch ([#5571](https://github.com/opensearch-project/OpenSearch/pull/5571)) ## [Unreleased 2.x] ### Added From a973607b17195c2e6c659b2829d481959d68bdce Mon Sep 17 00:00:00 2001 From: Ryan Bogan Date: Thu, 15 Dec 2022 22:52:26 +0000 Subject: [PATCH 3/4] Removed NamedWriteableRegistry from extensions and added TODO comments Signed-off-by: Ryan Bogan --- .../NamedWriteableRegistryParseRequest.java | 99 ----------- .../NamedWriteableRegistryResponse.java | 92 ---------- .../ExtensionNamedWriteableRegistry.java | 153 ---------------- .../extensions/ExtensionsManager.java | 21 +-- ...WriteableRegistryParseResponseHandler.java | 47 ----- ...NamedWriteableRegistryResponseHandler.java | 144 --------------- .../rest/RestSendToExtensionAction.java | 1 + .../extensions/ExtensionsManagerTests.java | 165 ------------------ 8 files changed, 4 insertions(+), 718 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/common/io/stream/NamedWriteableRegistryParseRequest.java delete mode 100644 server/src/main/java/org/opensearch/common/io/stream/NamedWriteableRegistryResponse.java delete mode 100644 server/src/main/java/org/opensearch/extensions/ExtensionNamedWriteableRegistry.java delete mode 100644 server/src/main/java/org/opensearch/extensions/NamedWriteableRegistryParseResponseHandler.java delete mode 100644 server/src/main/java/org/opensearch/extensions/NamedWriteableRegistryResponseHandler.java diff --git a/server/src/main/java/org/opensearch/common/io/stream/NamedWriteableRegistryParseRequest.java b/server/src/main/java/org/opensearch/common/io/stream/NamedWriteableRegistryParseRequest.java deleted file mode 100644 index dac746841bcf4..0000000000000 --- a/server/src/main/java/org/opensearch/common/io/stream/NamedWriteableRegistryParseRequest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.io.stream; - -import org.opensearch.transport.TransportRequest; -import java.io.IOException; -import java.util.Arrays; -import java.util.Objects; - -/** - * Extensibility support for Named Writeable Registry: request to extensions to parse context - * - * @opensearch.internal - */ -public class NamedWriteableRegistryParseRequest extends TransportRequest { - - private final Class categoryClass; - private byte[] context; - - /** - * @param categoryClass Class category for this parse request - * @param context StreamInput object to convert into a byte array and transport to the extension - * @throws IllegalArgumentException if context bytes could not be read - */ - public NamedWriteableRegistryParseRequest(Class categoryClass, StreamInput context) { - try { - byte[] streamInputBytes = context.readAllBytes(); - this.categoryClass = categoryClass; - this.context = Arrays.copyOf(streamInputBytes, streamInputBytes.length); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid context", e); - } - } - - /** - * @param in StreamInput from which class fields are read from - * @throws IllegalArgumentException if the fully qualified class name is invalid and the class object cannot be generated at runtime - */ - @SuppressWarnings("unchecked") - public NamedWriteableRegistryParseRequest(StreamInput in) throws IOException { - super(in); - try { - this.categoryClass = (Class) Class.forName(in.readString()); - this.context = in.readByteArray(); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Category class definition not found", e); - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(categoryClass.getName()); - out.writeByteArray(context); - } - - @Override - public String toString() { - return "NamedWriteableRegistryParseRequest{" - + "categoryClass=" - + categoryClass.getName() - + ", context=" - + context.toString() - + " }"; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NamedWriteableRegistryParseRequest that = (NamedWriteableRegistryParseRequest) o; - return Objects.equals(categoryClass, that.categoryClass) && Objects.equals(context, that.context); - } - - @Override - public int hashCode() { - return Objects.hash(categoryClass, context); - } - - /** - * Returns the class instance of the category class sent over by the SDK - */ - public Class getCategoryClass() { - return this.categoryClass; - } - - /** - * Returns a copy of a byte array that a {@link Writeable.Reader} will be applied to. This byte array is generated from a {@link StreamInput} instance and transported to the SDK for deserialization. - */ - public byte[] getContext() { - return Arrays.copyOf(this.context, this.context.length); - } -} diff --git a/server/src/main/java/org/opensearch/common/io/stream/NamedWriteableRegistryResponse.java b/server/src/main/java/org/opensearch/common/io/stream/NamedWriteableRegistryResponse.java deleted file mode 100644 index 83d765cad9fdc..0000000000000 --- a/server/src/main/java/org/opensearch/common/io/stream/NamedWriteableRegistryResponse.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.io.stream; - -import org.opensearch.transport.TransportResponse; -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -/** - * Extensibility support for Named Writeable Registry: response from extensions for name writeable registry entries - * - * @opensearch.internal - */ -public class NamedWriteableRegistryResponse extends TransportResponse { - - private final Map> registry; - - /** - * @param registry Map of writeable names and their associated category class - */ - public NamedWriteableRegistryResponse(Map> registry) { - this.registry = new HashMap<>(registry); - } - - /** - * @param in StreamInput from which map entries of writeable names and their associated category classes are read from - * @throws IllegalArgumentException if the fully qualified class name is invalid and the class object cannot be generated at runtime - */ - public NamedWriteableRegistryResponse(StreamInput in) throws IOException { - super(in); - // Stream output for registry map begins with a variable integer that tells us the number of entries being sent across the wire - Map> registry = new HashMap<>(); - int registryEntryCount = in.readVInt(); - for (int i = 0; i < registryEntryCount; i++) { - try { - String name = in.readString(); - @SuppressWarnings("unchecked") - Class categoryClass = (Class) Class.forName(in.readString()); - registry.put(name, categoryClass); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Category class definition not found", e); - } - } - - this.registry = registry; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - // Stream out registry size prior to streaming out registry entries - out.writeVInt(this.registry.size()); - for (Map.Entry> entry : registry.entrySet()) { - out.writeString(entry.getKey()); // Unique named writeable name - out.writeString(entry.getValue().getName()); // Fully qualified category class name - } - } - - @Override - public String toString() { - return "NamedWritableRegistryResponse{" + "registry=" + registry.toString() + "}"; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NamedWriteableRegistryResponse that = (NamedWriteableRegistryResponse) o; - return Objects.equals(registry, that.registry); - } - - @Override - public int hashCode() { - return Objects.hash(registry); - } - - /** - * Returns a map of writeable names and their associated category class - */ - public Map> getRegistry() { - return Collections.unmodifiableMap(this.registry); - } - -} diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionNamedWriteableRegistry.java b/server/src/main/java/org/opensearch/extensions/ExtensionNamedWriteableRegistry.java deleted file mode 100644 index 17e11adaa0849..0000000000000 --- a/server/src/main/java/org/opensearch/extensions/ExtensionNamedWriteableRegistry.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.extensions; - -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.io.stream.NamedWriteable; -import org.opensearch.extensions.ExtensionsManager.OpenSearchRequestType; -import org.opensearch.transport.TransportService; - -/** - * API for Named Writeables for extensions - * - * @opensearch.internal - */ -public class ExtensionNamedWriteableRegistry { - - private static final Logger logger = LogManager.getLogger(ExtensionNamedWriteableRegistry.class); - - private Map, Map>> extensionNamedWriteableRegistry; - private List extensionsInitializedList; - private TransportService transportService; - - /** - * Initializes a new ExtensionNamedWriteableRegistry - * - * @param extensionsInitializedList List of DiscoveryExtensionNodes to send requests to - * @param transportService Service that facilitates transport requests - */ - public ExtensionNamedWriteableRegistry(List extensionsInitializedList, TransportService transportService) { - this.extensionsInitializedList = extensionsInitializedList; - this.extensionNamedWriteableRegistry = new HashMap<>(); - this.transportService = transportService; - - getNamedWriteables(); - } - - /** - * Iterates through all discovered extensions, sends transport requests for named writeables and consolidates all entires into a central named writeable registry for extensions. - */ - public void getNamedWriteables() { - // Retrieve named writeable registry entries from each extension - for (DiscoveryNode extensionNode : extensionsInitializedList) { - try { - Map, Map>> extensionRegistry = - getNamedWriteables(extensionNode); - if (extensionRegistry.isEmpty() == false) { - this.extensionNamedWriteableRegistry.putAll(extensionRegistry); - } - } catch (UnknownHostException e) { - logger.error(e.toString()); - } - } - - // TODO : Invoke during the consolidation of named writeables within Node.java and return extension entries there - // (https://github.com/opensearch-project/OpenSearch/issues/4067) - } - - /** - * Sends a transport request for named writeables to an extension, identified by the given DiscoveryNode, and processes the response into registry entries - * - * @param extensionNode DiscoveryNode identifying the extension - * @throws UnknownHostException if connection to the extension node failed - * @return A map of category classes and their associated names and readers for this discovery node - */ - private Map, Map>> getNamedWriteables( - DiscoveryNode extensionNode - ) throws UnknownHostException { - NamedWriteableRegistryResponseHandler namedWriteableRegistryResponseHandler = new NamedWriteableRegistryResponseHandler( - extensionNode, - transportService, - ExtensionsManager.REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE - ); - try { - logger.info("Sending extension request type: " + ExtensionsManager.REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY); - transportService.sendRequest( - extensionNode, - ExtensionsManager.REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY, - new OpenSearchRequest(OpenSearchRequestType.REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY), - namedWriteableRegistryResponseHandler - ); - } catch (Exception e) { - logger.error(e.toString()); - } - - return namedWriteableRegistryResponseHandler.getExtensionRegistry(); - } - - /** - * Iterates through list of discovered extensions and returns the callback method associated with the given category class and name - * - * @param categoryClass Class that the Writeable object extends - * @param name Unique name identifiying the Writeable object - * @throws IllegalArgumentException if there is no reader associated with the given category class and name - * @return A map of the discovery node and its associated extension reader - */ - public Map getExtensionReader(Class categoryClass, String name) { - - ExtensionReader reader = null; - DiscoveryNode extension = null; - - // The specific extension that the reader is associated with is not known, must iterate through all of them - for (DiscoveryNode extensionNode : extensionsInitializedList) { - reader = getExtensionReader(extensionNode, categoryClass, name); - if (reader != null) { - extension = extensionNode; - break; - } - } - - // At this point, if reader does not exist throughout all extensionNodes, named writeable is not registered, throw exception - if (reader == null) { - throw new IllegalArgumentException("Unknown NamedWriteable [" + categoryClass.getName() + "][" + name + "]"); - } - return Collections.singletonMap(extension, reader); - } - - /** - * Returns the callback method associated with the given extension node, category class and name - * - * @param extensionNode Discovery Node identifying the extension associated with the category class and name - * @param categoryClass Class that the Writeable object extends - * @param name Unique name identifying the Writeable object - * @return The extension reader - */ - private ExtensionReader getExtensionReader(DiscoveryNode extensionNode, Class categoryClass, String name) { - ExtensionReader reader = null; - Map, Map> categoryMap = this.extensionNamedWriteableRegistry.get( - extensionNode - ); - if (categoryMap != null) { - Map readerMap = categoryMap.get(categoryClass); - if (readerMap != null) { - reader = readerMap.get(name); - } - } - return reader; - } - -} diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index e32c437e4a953..d0d78c9057526 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -127,7 +127,6 @@ public static enum OpenSearchRequestType { private CustomSettingsRequestHandler customSettingsRequestHandler; private TransportService transportService; private ClusterService clusterService; - private ExtensionNamedWriteableRegistry namedWriteableRegistry; private ExtensionActionListener listener; private ExtensionActionListenerHandler listenerHandler; private Settings environmentSettings; @@ -150,7 +149,6 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept // will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized this.transportService = null; this.clusterService = null; - this.namedWriteableRegistry = null; this.client = null; this.extensionTransportActionsHandler = null; @@ -353,13 +351,12 @@ private void loadExtension(Extension extension) throws IOException { } /** - * Iterate through all extensions and initialize them. Initialized extensions will be added to the {@link #extensions}, and the {@link #namedWriteableRegistry} will be initialized. + * Iterate through all extensions and initialize them. Initialized extensions will be added to the {@link #extensions}. */ public void initialize() { for (DiscoveryExtensionNode extension : extensionIdMap.values()) { initializeExtension(extension); } - this.namedWriteableRegistry = new ExtensionNamedWriteableRegistry(extensions, transportService); } private void initializeExtension(DiscoveryExtensionNode extension) { @@ -499,9 +496,7 @@ public void beforeIndexRemoved( new IndicesModuleRequest(indexModule), extensionBooleanResponseHandler ); - /* - * Making async synchronous for now. - */ + // TODO: make asynchronous inProgressIndexNameFuture.get(100, TimeUnit.SECONDS); logger.info("Received ack response from Extension"); } catch (Exception e) { @@ -533,9 +528,7 @@ public String executor() { new IndicesModuleRequest(indexModule), indicesModuleResponseHandler ); - /* - * Making asynchronous for now. - */ + // TODO: make asynchronous inProgressFuture.get(100, TimeUnit.SECONDS); logger.info("Received response from Extension"); } catch (Exception e) { @@ -658,10 +651,6 @@ public ClusterService getClusterService() { return clusterService; } - public ExtensionNamedWriteableRegistry getNamedWriteableRegistry() { - return namedWriteableRegistry; - } - public ExtensionActionListener getListener() { return listener; } @@ -682,8 +671,4 @@ public NodeClient getClient() { return client; } - public void setNamedWriteableRegistry(ExtensionNamedWriteableRegistry namedWriteableRegistry) { - this.namedWriteableRegistry = namedWriteableRegistry; - } - } diff --git a/server/src/main/java/org/opensearch/extensions/NamedWriteableRegistryParseResponseHandler.java b/server/src/main/java/org/opensearch/extensions/NamedWriteableRegistryParseResponseHandler.java deleted file mode 100644 index 721cfcb7b656f..0000000000000 --- a/server/src/main/java/org/opensearch/extensions/NamedWriteableRegistryParseResponseHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.extensions; - -import java.io.IOException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportResponseHandler; - -/** - * Response handler for NamedWriteableRegistryParse Requests - * - * @opensearch.internal - */ -public class NamedWriteableRegistryParseResponseHandler implements TransportResponseHandler { - private static final Logger logger = LogManager.getLogger(NamedWriteableRegistryParseResponseHandler.class); - - @Override - public ExtensionBooleanResponse read(StreamInput in) throws IOException { - return new ExtensionBooleanResponse(in); - } - - @Override - public void handleResponse(ExtensionBooleanResponse response) { - logger.info("response {}", response.getStatus()); - } - - @Override - public void handleException(TransportException exp) { - logger.error(new ParameterizedMessage("NamedWriteableRegistryParseRequest failed"), exp); - } - - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } -} diff --git a/server/src/main/java/org/opensearch/extensions/NamedWriteableRegistryResponseHandler.java b/server/src/main/java/org/opensearch/extensions/NamedWriteableRegistryResponseHandler.java deleted file mode 100644 index e9e42d61a53da..0000000000000 --- a/server/src/main/java/org/opensearch/extensions/NamedWriteableRegistryResponseHandler.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.extensions; - -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.io.stream.NamedWriteable; -import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest; -import org.opensearch.common.io.stream.NamedWriteableRegistryResponse; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportResponseHandler; -import org.opensearch.transport.TransportService; - -/** - * Response handler for NamedWriteableRegistry Requests - * - * @opensearch.internal - */ -public class NamedWriteableRegistryResponseHandler implements TransportResponseHandler { - private static final Logger logger = LogManager.getLogger(NamedWriteableRegistryResponseHandler.class); - - private final Map, Map>> extensionRegistry; - private final DiscoveryNode extensionNode; - private final TransportService transportService; - private final String requestType; - - /** - * Instantiates a new NamedWriteableRegistry response handler - * - * @param extensionNode Discovery Node identifying the extension associated with the category class and name - * @param transportService The transport service communicating with the SDK - * @param requestType The type of request that OpenSearch will send to the SDK - */ - public NamedWriteableRegistryResponseHandler(DiscoveryNode extensionNode, TransportService transportService, String requestType) { - this.extensionRegistry = new HashMap(); - this.extensionNode = extensionNode; - this.transportService = transportService; - this.requestType = requestType; - } - - /** - * @return A map of the given DiscoveryNode and its inner named writeable registry map - */ - public Map, Map>> getExtensionRegistry() { - return Collections.unmodifiableMap(this.extensionRegistry); - } - - /** - * Transports a StreamInput, converted into a byte array, and associated category class to the given extension, identified by its discovery node - * - * @param extensionNode Discovery Node identifying the extension associated with the category class and name - * @param categoryClass Class that the Writeable object extends - * @param context StreamInput object to convert into a byte array and transport to the extension - * @throws UnknownHostException if connection to the extension node failed - */ - public void parseNamedWriteable(DiscoveryNode extensionNode, Class categoryClass, StreamInput context) - throws UnknownHostException { - NamedWriteableRegistryParseResponseHandler namedWriteableRegistryParseResponseHandler = - new NamedWriteableRegistryParseResponseHandler(); - try { - logger.info("Sending extension request type: " + requestType); - transportService.sendRequest( - extensionNode, - requestType, - new NamedWriteableRegistryParseRequest(categoryClass, context), - namedWriteableRegistryParseResponseHandler - ); - } catch (Exception e) { - logger.error(e.toString()); - } - } - - @Override - public NamedWriteableRegistryResponse read(StreamInput in) throws IOException { - return new NamedWriteableRegistryResponse(in); - } - - @Override - public void handleResponse(NamedWriteableRegistryResponse response) { - - logger.info("response {}", response); - logger.info("EXTENSION [" + extensionNode.getName() + "] returned " + response.getRegistry().size() + " entries"); - - if (response.getRegistry().isEmpty() == false) { - - // Extension has sent over entries to register, initialize inner category map - Map, Map> categoryMap = new HashMap<>(); - - // Reader map associated with this current category - Map readers = null; - Class currentCategory = null; - - for (Map.Entry> entry : response.getRegistry().entrySet()) { - - String name = entry.getKey(); - Class categoryClass = entry.getValue(); - if (currentCategory != categoryClass) { - // After first pass, readers and current category are set - if (currentCategory != null) { - categoryMap.put(currentCategory, readers); - } - readers = new HashMap<>(); - currentCategory = categoryClass; - } - - // Add name and callback method reference to inner reader map, - ExtensionReader callBack = (en, cc, context) -> parseNamedWriteable(en, cc, (StreamInput) context); - readers.put(name, callBack); - } - - // Handle last category and reader entry - categoryMap.put(currentCategory, readers); - - // Attach extension node to categoryMap - extensionRegistry.put(extensionNode, categoryMap); - } - } - - @Override - public void handleException(TransportException exp) { - logger.error(new ParameterizedMessage("OpenSearchRequest failed"), exp); - } - - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } -} diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java index 4f4df77d274c5..45d6c6ae23fc4 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java @@ -175,6 +175,7 @@ public String executor() { restExecuteOnExtensionResponseHandler ); try { + // TODO: make asynchronous inProgressFuture.get(ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException e) { return channel -> channel.sendResponse( diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index d222be1b35c06..37b9eaf11d407 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -20,9 +20,7 @@ import static org.mockito.Mockito.verify; import static org.opensearch.test.ClusterServiceUtils.createClusterService; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.net.InetAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -33,8 +31,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -55,12 +51,7 @@ import org.opensearch.common.io.PathUtils; import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.common.io.stream.InputStreamStreamInput; -import org.opensearch.common.io.stream.NamedWriteable; import org.opensearch.common.io.stream.NamedWriteableRegistry; -import org.opensearch.common.io.stream.NamedWriteableRegistryResponse; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -736,162 +727,6 @@ public void testRegisterHandler() throws Exception { verify(mockTransportService, times(9)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any()); } - private static class Example implements NamedWriteable { - public static final String INVALID_NAME = "invalid_name"; - public static final String NAME = "example"; - private final String message; - - Example(String message) { - this.message = message; - } - - Example(StreamInput in) throws IOException { - this.message = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(message); - } - - @Override - public String getWriteableName() { - return NAME; - } - - @Override - public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) { - return false; - } - Example that = (Example) o; - return Objects.equals(message, that.message); - } - - @Override - public int hashCode() { - return Objects.hash(message); - } - } - - public void testGetNamedWriteables() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - initialize(extensionsManager); - - try ( - MockLogAppender mockLogAppender = MockLogAppender.createForLoggers( - LogManager.getLogger(NamedWriteableRegistryResponseHandler.class) - ) - ) { - - mockLogAppender.addExpectation( - new MockLogAppender.SeenEventExpectation( - "OpenSearchRequest Failure", - "org.opensearch.extensions.NamedWriteableRegistryResponseHandler", - Level.ERROR, - "OpenSearchRequest failed" - ) - ); - List extensionsList = new ArrayList<>(extensionsManager.getExtensionIdMap().values()); - extensionsManager.setNamedWriteableRegistry(new ExtensionNamedWriteableRegistry(extensionsList, transportService)); - extensionsManager.getNamedWriteableRegistry().getNamedWriteables(); - mockLogAppender.assertAllExpectationsMatched(); - } - } - - public void testNamedWriteableRegistryResponseHandler() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - initialize(extensionsManager); - - List extensionsList = new ArrayList<>(extensionsManager.getExtensionIdMap().values()); - DiscoveryNode extensionNode = extensionsList.get(0); - String requestType = ExtensionsManager.REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY; - - // Create response to pass to response handler - Map> responseRegistry = new HashMap<>(); - responseRegistry.put(Example.NAME, Example.class); - NamedWriteableRegistryResponse response = new NamedWriteableRegistryResponse(responseRegistry); - - NamedWriteableRegistryResponseHandler responseHandler = new NamedWriteableRegistryResponseHandler( - extensionNode, - transportService, - requestType - ); - responseHandler.handleResponse(response); - - // Ensure that response entries have been processed correctly into their respective maps - Map, Map>> extensionsRegistry = responseHandler - .getExtensionRegistry(); - assertEquals(extensionsRegistry.size(), 1); - - Map, Map> categoryMap = extensionsRegistry.get(extensionNode); - assertEquals(categoryMap.size(), 1); - - Map readerMap = categoryMap.get(Example.class); - assertEquals(readerMap.size(), 1); - - ExtensionReader callback = readerMap.get(Example.NAME); - assertNotNull(callback); - } - - public void testGetExtensionReader() throws IOException { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - - extensionsManager.setNamedWriteableRegistry( - spy(new ExtensionNamedWriteableRegistry(extensionsManager.getExtensions(), transportService)) - ); - - Exception e = expectThrows( - Exception.class, - () -> extensionsManager.getNamedWriteableRegistry().getExtensionReader(Example.class, Example.NAME) - ); - assertEquals(e.getMessage(), "Unknown NamedWriteable [" + Example.class.getName() + "][" + Example.NAME + "]"); - verify(extensionsManager.getNamedWriteableRegistry(), times(1)).getExtensionReader(any(), any()); - } - - public void testParseNamedWriteables() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); - initialize(extensionsManager); - - String requestType = ExtensionsManager.REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE; - List extensionsList = new ArrayList<>(extensionsManager.getExtensionIdMap().values()); - DiscoveryNode extensionNode = extensionsList.get(0); - Class categoryClass = Example.class; - - // convert context into an input stream then stream input for mock - byte[] context = new byte[0]; - InputStream inputStream = new ByteArrayInputStream(context); - StreamInput in = new InputStreamStreamInput(inputStream); - - try ( - MockLogAppender mockLogAppender = MockLogAppender.createForLoggers( - LogManager.getLogger(NamedWriteableRegistryParseResponseHandler.class) - ) - ) { - - mockLogAppender.addExpectation( - new MockLogAppender.SeenEventExpectation( - "NamedWriteableRegistryParseRequest Failure", - "org.opensearch.extensions.NamedWriteableRegistryParseResponseHandler", - Level.ERROR, - "NamedWriteableRegistryParseRequest failed" - ) - ); - - NamedWriteableRegistryResponseHandler responseHandler = new NamedWriteableRegistryResponseHandler( - extensionNode, - transportService, - requestType - ); - responseHandler.parseNamedWriteable(extensionNode, categoryClass, in); - mockLogAppender.assertAllExpectationsMatched(); - } - } - public void testOnIndexModule() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); From bc35f7491d6ac958bfd9f5ae5cfde1574ff95c51 Mon Sep 17 00:00:00 2001 From: Ryan Bogan Date: Fri, 16 Dec 2022 17:51:21 +0000 Subject: [PATCH 4/4] Rename ExtensionBooleanResponse to AcknowledgedResponse Signed-off-by: Ryan Bogan --- ...leanResponse.java => AcknowledgedResponse.java} | 10 +++++----- .../AddSettingsUpdateConsumerRequestHandler.java | 4 ++-- .../extensions/ExtensionActionListenerHandler.java | 8 ++++---- .../opensearch/extensions/ExtensionsManager.java | 14 +++++++------- .../extensions/UpdateSettingsResponseHandler.java | 8 ++++---- .../action/ExtensionTransportActionsHandler.java | 8 ++++---- .../extensions/ExtensionResponseTests.java | 6 +++--- .../extensions/ExtensionsManagerTests.java | 6 +++--- .../ExtensionTransportActionsHandlerTests.java | 14 ++++++++------ 9 files changed, 40 insertions(+), 38 deletions(-) rename server/src/main/java/org/opensearch/extensions/{ExtensionBooleanResponse.java => AcknowledgedResponse.java} (81%) diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionBooleanResponse.java b/server/src/main/java/org/opensearch/extensions/AcknowledgedResponse.java similarity index 81% rename from server/src/main/java/org/opensearch/extensions/ExtensionBooleanResponse.java rename to server/src/main/java/org/opensearch/extensions/AcknowledgedResponse.java index fc5855ea50a68..be7eb9c03076e 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionBooleanResponse.java +++ b/server/src/main/java/org/opensearch/extensions/AcknowledgedResponse.java @@ -19,18 +19,18 @@ * * @opensearch.internal */ -public class ExtensionBooleanResponse extends TransportResponse { +public class AcknowledgedResponse extends TransportResponse { private final boolean status; /** * @param status Boolean indicating the status of the parse request sent to the SDK */ - public ExtensionBooleanResponse(boolean status) { + public AcknowledgedResponse(boolean status) { this.status = status; } - public ExtensionBooleanResponse(StreamInput in) throws IOException { + public AcknowledgedResponse(StreamInput in) throws IOException { super(in); this.status = in.readBoolean(); } @@ -42,14 +42,14 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return "ExtensionBooleanResponse{" + "status=" + this.status + "}"; + return "AcknowledgedResponse{" + "status=" + this.status + "}"; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - ExtensionBooleanResponse that = (ExtensionBooleanResponse) o; + AcknowledgedResponse that = (AcknowledgedResponse) o; return Objects.equals(this.status, that.status); } diff --git a/server/src/main/java/org/opensearch/extensions/AddSettingsUpdateConsumerRequestHandler.java b/server/src/main/java/org/opensearch/extensions/AddSettingsUpdateConsumerRequestHandler.java index caaf7cc19ab31..791482aad0432 100644 --- a/server/src/main/java/org/opensearch/extensions/AddSettingsUpdateConsumerRequestHandler.java +++ b/server/src/main/java/org/opensearch/extensions/AddSettingsUpdateConsumerRequestHandler.java @@ -52,7 +52,7 @@ public AddSettingsUpdateConsumerRequestHandler( * Handles a {@link AddSettingsUpdateConsumerRequest}. * * @param addSettingsUpdateConsumerRequest The request to handle. - * @return A {@link ExtensionBooleanResponse} indicating success. + * @return A {@link AcknowledgedResponse} indicating success. * @throws Exception if the request is not handled properly. */ TransportResponse handleAddSettingsUpdateConsumerRequest(AddSettingsUpdateConsumerRequest addSettingsUpdateConsumerRequest) @@ -86,6 +86,6 @@ TransportResponse handleAddSettingsUpdateConsumerRequest(AddSettingsUpdateConsum status = false; } - return new ExtensionBooleanResponse(status); + return new AcknowledgedResponse(status); } } diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionActionListenerHandler.java b/server/src/main/java/org/opensearch/extensions/ExtensionActionListenerHandler.java index 7edae0fa7a943..ceba1e1f65000 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionActionListenerHandler.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionActionListenerHandler.java @@ -29,15 +29,15 @@ public ExtensionActionListenerHandler(ExtensionActionListener listener) { * Handles a {@link ExtensionActionListenerOnFailureRequest}. * * @param request The request to handle. - * @return A {@link ExtensionBooleanResponse} indicating success or failure. + * @return A {@link AcknowledgedResponse} indicating success or failure. */ - public ExtensionBooleanResponse handleExtensionActionListenerOnFailureRequest(ExtensionActionListenerOnFailureRequest request) { + public AcknowledgedResponse handleExtensionActionListenerOnFailureRequest(ExtensionActionListenerOnFailureRequest request) { try { listener.onFailure(new OpenSearchException(request.getFailureException())); - return new ExtensionBooleanResponse(true); + return new AcknowledgedResponse(true); } catch (Exception e) { logger.error(e.getMessage()); - return new ExtensionBooleanResponse(false); + return new AcknowledgedResponse(false); } } } diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index d0d78c9057526..f04488ba1908c 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -440,12 +440,12 @@ public void onIndexModule(IndexModule indexModule) throws UnknownHostException { private void onIndexModule(IndexModule indexModule, DiscoveryNode extensionNode) throws UnknownHostException { logger.info("onIndexModule index:" + indexModule.getIndex()); final CompletableFuture inProgressFuture = new CompletableFuture<>(); - final CompletableFuture inProgressIndexNameFuture = new CompletableFuture<>(); + final CompletableFuture inProgressIndexNameFuture = new CompletableFuture<>(); - final TransportResponseHandler extensionBooleanResponseHandler = new TransportResponseHandler< - ExtensionBooleanResponse>() { + final TransportResponseHandler acknowledgedResponseHandler = new TransportResponseHandler< + AcknowledgedResponse>() { @Override - public void handleResponse(ExtensionBooleanResponse response) { + public void handleResponse(AcknowledgedResponse response) { logger.info("ACK Response" + response); inProgressIndexNameFuture.complete(response); } @@ -461,8 +461,8 @@ public String executor() { } @Override - public ExtensionBooleanResponse read(StreamInput in) throws IOException { - return new ExtensionBooleanResponse(in); + public AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); } }; @@ -494,7 +494,7 @@ public void beforeIndexRemoved( extensionNode, INDICES_EXTENSION_NAME_ACTION_NAME, new IndicesModuleRequest(indexModule), - extensionBooleanResponseHandler + acknowledgedResponseHandler ); // TODO: make asynchronous inProgressIndexNameFuture.get(100, TimeUnit.SECONDS); diff --git a/server/src/main/java/org/opensearch/extensions/UpdateSettingsResponseHandler.java b/server/src/main/java/org/opensearch/extensions/UpdateSettingsResponseHandler.java index fbb30baebcfaa..be8f43b5cfce6 100644 --- a/server/src/main/java/org/opensearch/extensions/UpdateSettingsResponseHandler.java +++ b/server/src/main/java/org/opensearch/extensions/UpdateSettingsResponseHandler.java @@ -22,16 +22,16 @@ * * @opensearch.internal */ -public class UpdateSettingsResponseHandler implements TransportResponseHandler { +public class UpdateSettingsResponseHandler implements TransportResponseHandler { private static final Logger logger = LogManager.getLogger(UpdateSettingsResponseHandler.class); @Override - public ExtensionBooleanResponse read(StreamInput in) throws IOException { - return new ExtensionBooleanResponse(in); + public AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); } @Override - public void handleResponse(ExtensionBooleanResponse response) { + public void handleResponse(AcknowledgedResponse response) { logger.info("response {}", response.getStatus()); } diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java index bdc30cb46340a..ac3ec6630634a 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java @@ -14,7 +14,7 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.extensions.DiscoveryExtensionNode; -import org.opensearch.extensions.ExtensionBooleanResponse; +import org.opensearch.extensions.AcknowledgedResponse; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.RegisterTransportActionsRequest; import org.opensearch.threadpool.ThreadPool; @@ -82,7 +82,7 @@ public DiscoveryExtensionNode getExtension(String action) { * Handles a {@link RegisterTransportActionsRequest}. * * @param transportActionsRequest The request to handle. - * @return A {@link ExtensionBooleanResponse} indicating success. + * @return A {@link AcknowledgedResponse} indicating success. */ public TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) { /* @@ -96,9 +96,9 @@ public TransportResponse handleRegisterTransportActionsRequest(RegisterTransport } } catch (Exception e) { logger.error("Could not register Transport Action " + e); - return new ExtensionBooleanResponse(false); + return new AcknowledgedResponse(false); } - return new ExtensionBooleanResponse(true); + return new AcknowledgedResponse(true); } /** diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionResponseTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionResponseTests.java index 321de3c5efe6a..3b2993cb164c0 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionResponseTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionResponseTests.java @@ -15,9 +15,9 @@ public class ExtensionResponseTests extends OpenSearchTestCase { - public void testExtensionBooleanResponse() throws Exception { + public void testAcknowledgedResponse() throws Exception { boolean response = true; - ExtensionBooleanResponse booleanResponse = new ExtensionBooleanResponse(response); + AcknowledgedResponse booleanResponse = new AcknowledgedResponse(response); assertEquals(response, booleanResponse.getStatus()); @@ -25,7 +25,7 @@ public void testExtensionBooleanResponse() throws Exception { booleanResponse.writeTo(out); out.flush(); try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { - booleanResponse = new ExtensionBooleanResponse(in); + booleanResponse = new AcknowledgedResponse(in); assertEquals(response, booleanResponse.getStatus()); } diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 37b9eaf11d407..3b905ca018d09 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -532,7 +532,7 @@ public void testHandleActionListenerOnFailureRequest() throws Exception { ExtensionActionListenerOnFailureRequest listenerFailureRequest = new ExtensionActionListenerOnFailureRequest("Test failure"); assertEquals( - ExtensionBooleanResponse.class, + AcknowledgedResponse.class, extensionsManager.getListenerHandler().handleExtensionActionListenerOnFailureRequest(listenerFailureRequest).getClass() ); assertEquals("Test failure", extensionsManager.getListener().getExceptionList().get(0).getMessage()); @@ -665,9 +665,9 @@ public void testHandleAddSettingsUpdateConsumerRequest() throws Exception { ); TransportResponse response = extensionsManager.getAddSettingsUpdateConsumerRequestHandler() .handleAddSettingsUpdateConsumerRequest(addSettingsUpdateConsumerRequest); - assertEquals(ExtensionBooleanResponse.class, response.getClass()); + assertEquals(AcknowledgedResponse.class, response.getClass()); // Should fail as component settings are not registered within cluster settings - assertEquals(false, ((ExtensionBooleanResponse) response).getStatus()); + assertEquals(false, ((AcknowledgedResponse) response).getStatus()); } public void testUpdateSettingsRequest() throws Exception { diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java index 646f027c2b45a..45ec5e827f5f8 100644 --- a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java @@ -20,7 +20,7 @@ import org.opensearch.common.transport.TransportAddress; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.extensions.DiscoveryExtensionNode; -import org.opensearch.extensions.ExtensionBooleanResponse; +import org.opensearch.extensions.AcknowledgedResponse; import org.opensearch.extensions.RegisterTransportActionsRequest; import org.opensearch.extensions.rest.RestSendToExtensionActionTests; import org.opensearch.indices.breaker.NoneCircuitBreakerService; @@ -132,13 +132,14 @@ public void testRegisterAction() { public void testRegisterTransportActionsRequest() { String action = "test-action"; RegisterTransportActionsRequest request = new RegisterTransportActionsRequest("uniqueid1", Map.of(action, TransportAction.class)); - ExtensionBooleanResponse response = (ExtensionBooleanResponse) extensionTransportActionsHandler - .handleRegisterTransportActionsRequest(request); + AcknowledgedResponse response = (AcknowledgedResponse) extensionTransportActionsHandler.handleRegisterTransportActionsRequest( + request + ); assertTrue(response.getStatus()); assertEquals(discoveryExtension, extensionTransportActionsHandler.getExtension(action)); // Test duplicate action registration - response = (ExtensionBooleanResponse) extensionTransportActionsHandler.handleRegisterTransportActionsRequest(request); + response = (AcknowledgedResponse) extensionTransportActionsHandler.handleRegisterTransportActionsRequest(request); assertFalse(response.getStatus()); } @@ -166,8 +167,9 @@ public void testSendTransportRequestToExtension() throws InterruptedException { "uniqueid1", Map.of(action, TransportAction.class) ); - ExtensionBooleanResponse response = (ExtensionBooleanResponse) extensionTransportActionsHandler - .handleRegisterTransportActionsRequest(registerRequest); + AcknowledgedResponse response = (AcknowledgedResponse) extensionTransportActionsHandler.handleRegisterTransportActionsRequest( + registerRequest + ); assertTrue(response.getStatus()); ExtensionActionResponse extensionResponse = extensionTransportActionsHandler.sendTransportRequestToExtension(request);