From 8a39f44db638a68e8968d4d113aca52f7627f70a Mon Sep 17 00:00:00 2001 From: Rai Date: Wed, 28 Aug 2024 12:51:07 +0530 Subject: [PATCH 01/12] Add limit on number of processors in Ingest pipelines Signed-off-by: Rai --- CHANGELOG.md | 1 + .../ingest/SimulateExecutionService.java | 1 + .../SimulatePipelineTransportAction.java | 3 ++ .../common/settings/ClusterSettings.java | 2 + .../ingest/IngestPipelineValidator.java | 54 +++++++++++++++++++ .../org/opensearch/ingest/IngestService.java | 4 ++ .../cluster/service/ClusterServiceTests.java | 13 +++++ 7 files changed, 78 insertions(+) create mode 100644 server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cd030290d4de..d43fa6ab31308 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) +- Add limit on number of processors in Ingest pipelines([#15460](https://github.com/opensearch-project/OpenSearch/issues/15460)). ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java index c7c0f21eb0876..818100539849a 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java @@ -91,6 +91,7 @@ void executeDocument( } public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { + threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { final AtomicInteger counter = new AtomicInteger(); final List responses = new CopyOnWriteArrayList<>( diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java index 4753679d370af..22075076f0cf3 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java @@ -38,6 +38,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -88,6 +89,8 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe return; } + IngestPipelineValidator.validateIngestPipeline(simulateRequest.getPipeline(), ingestService.getClusterService()); + executionService.execute(simulateRequest, listener); } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8daf9125bb27e..4d605dcf5d819 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -132,6 +132,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.store.IndicesStore; +import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.monitor.fs.FsHealthService; import org.opensearch.monitor.fs.FsService; import org.opensearch.monitor.jvm.JvmGcMonitorService; @@ -405,6 +406,7 @@ public void apply(Settings value, Settings current, Settings previous) { ClusterService.USER_DEFINED_METADATA, ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS, SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING, SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java b/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java new file mode 100644 index 0000000000000..ff3abee9f5a11 --- /dev/null +++ b/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Setting; + +import java.util.List; + +/** + * This class contains methods to validate the ingest pipeline. + */ +public class IngestPipelineValidator { + + /** + * Defines the limit for the number of processors which can run on a given document during ingestion. + */ + public static final Setting MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting( + "cluster.ingest.max_number_processors", + Integer.MAX_VALUE, + 1, + Integer.MAX_VALUE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Validates that the number of compound processors in the pipeline does not exceed the configured limit. + * + * @param pipeline + * @param clusterService + */ + public static void validateIngestPipeline(Pipeline pipeline, ClusterService clusterService) { + + List processors = pipeline.getCompoundProcessor().getProcessors(); + int maxNumberOfIngestProcessorsAllowed = clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS); + + if (processors.size() > maxNumberOfIngestProcessorsAllowed) { + throw new IllegalStateException( + "Cannot use more than the maximum processors allowed. Number of processors configured is [" + + processors.size() + + "] which exceeds the maximum allowed configuration of [" + + maxNumberOfIngestProcessorsAllowed + + "] processors." + ); + } + } +} diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 938ca7493926e..d614badcaea2d 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -494,6 +494,9 @@ void validatePipeline(Map ingestInfos, PutPipelineReq Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2(); Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService); + + IngestPipelineValidator.validateIngestPipeline(pipeline, clusterService); + List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { for (Map.Entry entry : ingestInfos.entrySet()) { @@ -1099,6 +1102,7 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) { processorFactories, scriptService ); + IngestPipelineValidator.validateIngestPipeline(newPipeline, clusterService); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); if (previous == null) { diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java index 4d88683826af7..f7e6d065dca67 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java @@ -10,6 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.junit.After; @@ -37,4 +38,16 @@ public void testDeprecatedGetMasterServiceBWC() { assertThat(masterService, equalTo(clusterManagerService)); } } + + public void testUpdateMaxIngestProcessorCountSetting() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + // verify defaults + assertEquals(Integer.MAX_VALUE, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); + + // verify update max processor + Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 3).build(); + clusterSettings.applySettings(newSettings); + assertEquals(3, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); + } } From 99eaf02e0904fd74b9bb2e2570bbd032d364198f Mon Sep 17 00:00:00 2001 From: Rai Date: Wed, 28 Aug 2024 20:00:31 +0530 Subject: [PATCH 02/12] Resolved the comments Signed-off-by: Rai --- .../ingest/SimulateExecutionService.java | 1 - .../SimulatePipelineTransportAction.java | 3 +- .../common/settings/ClusterSettings.java | 4 +- .../ingest/IngestPipelineValidator.java | 54 ------------------- .../org/opensearch/ingest/IngestService.java | 37 ++++++++++++- .../cluster/service/ClusterServiceTests.java | 13 ----- .../opensearch/ingest/IngestServiceTests.java | 13 +++++ 7 files changed, 51 insertions(+), 74 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java index 818100539849a..c7c0f21eb0876 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java @@ -91,7 +91,6 @@ void executeDocument( } public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { - threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { final AtomicInteger counter = new AtomicInteger(); final List responses = new CopyOnWriteArrayList<>( diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java index 22075076f0cf3..eff038b109aea 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java @@ -38,7 +38,6 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -89,7 +88,7 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe return; } - IngestPipelineValidator.validateIngestPipeline(simulateRequest.getPipeline(), ingestService.getClusterService()); + ingestService.validateProcessorCountForIngestPipeline(simulateRequest.getPipeline()); executionService.execute(simulateRequest, listener); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 4d605dcf5d819..251e9124218b1 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -132,7 +132,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.store.IndicesStore; -import org.opensearch.ingest.IngestPipelineValidator; +import org.opensearch.ingest.IngestService; import org.opensearch.monitor.fs.FsHealthService; import org.opensearch.monitor.fs.FsService; import org.opensearch.monitor.jvm.JvmGcMonitorService; @@ -406,7 +406,7 @@ public void apply(Settings value, Settings current, Settings previous) { ClusterService.USER_DEFINED_METADATA, ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, - IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS, + IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS, SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING, SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java b/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java deleted file mode 100644 index ff3abee9f5a11..0000000000000 --- a/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ingest; - -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Setting; - -import java.util.List; - -/** - * This class contains methods to validate the ingest pipeline. - */ -public class IngestPipelineValidator { - - /** - * Defines the limit for the number of processors which can run on a given document during ingestion. - */ - public static final Setting MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting( - "cluster.ingest.max_number_processors", - Integer.MAX_VALUE, - 1, - Integer.MAX_VALUE, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - - /** - * Validates that the number of compound processors in the pipeline does not exceed the configured limit. - * - * @param pipeline - * @param clusterService - */ - public static void validateIngestPipeline(Pipeline pipeline, ClusterService clusterService) { - - List processors = pipeline.getCompoundProcessor().getProcessors(); - int maxNumberOfIngestProcessorsAllowed = clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS); - - if (processors.size() > maxNumberOfIngestProcessorsAllowed) { - throw new IllegalStateException( - "Cannot use more than the maximum processors allowed. Number of processors configured is [" - + processors.size() - + "] which exceeds the maximum allowed configuration of [" - + maxNumberOfIngestProcessorsAllowed - + "] processors." - ); - } - } -} diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index d614badcaea2d..99b0774df7009 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -62,6 +62,7 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.metrics.OperationMetrics; import org.opensearch.common.regex.Regex; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -107,6 +108,18 @@ public class IngestService implements ClusterStateApplier, ReportingService MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting( + "cluster.ingest.max_number_processors", + Integer.MAX_VALUE, + 1, + Integer.MAX_VALUE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private static final Logger logger = LogManager.getLogger(IngestService.class); private final ClusterService clusterService; @@ -123,6 +136,7 @@ public class IngestService implements ClusterStateApplier, ReportingService processorFactories(List ingestPlugins, Processor.Parameters parameters) { @@ -495,7 +515,7 @@ void validatePipeline(Map ingestInfos, PutPipelineReq Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2(); Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService); - IngestPipelineValidator.validateIngestPipeline(pipeline, clusterService); + validateProcessorCountForIngestPipeline(pipeline); List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { @@ -510,6 +530,20 @@ void validatePipeline(Map ingestInfos, PutPipelineReq ExceptionsHelper.rethrowAndSuppress(exceptions); } + public void validateProcessorCountForIngestPipeline(Pipeline pipeline) { + List processors = pipeline.getCompoundProcessor().getProcessors(); + + if (processors.size() > maxIngestProcessorCount) { + throw new IllegalStateException( + "Cannot use more than the maximum processors allowed. Number of processors being configured is [" + + processors.size() + + "] which exceeds the maximum allowed configuration of [" + + maxIngestProcessorCount + + "] processors." + ); + } + } + public void executeBulkRequest( int numberOfActionRequests, Iterable> actionRequests, @@ -1102,7 +1136,6 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) { processorFactories, scriptService ); - IngestPipelineValidator.validateIngestPipeline(newPipeline, clusterService); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); if (previous == null) { diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java index f7e6d065dca67..4d88683826af7 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java @@ -10,7 +10,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.junit.After; @@ -38,16 +37,4 @@ public void testDeprecatedGetMasterServiceBWC() { assertThat(masterService, equalTo(clusterManagerService)); } } - - public void testUpdateMaxIngestProcessorCountSetting() { - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - - // verify defaults - assertEquals(Integer.MAX_VALUE, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); - - // verify update max processor - Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 3).build(); - clusterSettings.applySettings(newSettings); - assertEquals(3, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); - } } diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 1f4b1d635d438..e4e21b7654e79 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -58,6 +58,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.metrics.OperationStats; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.xcontent.XContentType; @@ -2058,6 +2059,18 @@ public void testPrepareBatches_different_index_pipeline() { assertEquals(4, batches.size()); } + public void testUpdateMaxIngestProcessorCountSetting() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + // verify defaults + assertEquals(Integer.MAX_VALUE, clusterSettings.get(IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); + + // verify update max processor + Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 3).build(); + clusterSettings.applySettings(newSettings); + assertEquals(3, clusterSettings.get(IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); + } + private IngestService.IndexRequestWrapper createIndexRequestWrapper(String index, List pipelines) { IndexRequest indexRequest = new IndexRequest(index); return new IngestService.IndexRequestWrapper(0, indexRequest, pipelines, true); From bcf489851eab498f5da9fd22bb6c4c08211a624a Mon Sep 17 00:00:00 2001 From: Rai Date: Thu, 29 Aug 2024 12:00:36 +0530 Subject: [PATCH 03/12] Updated the UTs Signed-off-by: Rai --- CHANGELOG.md | 1 + .../opensearch/ingest/IngestServiceTests.java | 42 +++++++++++++++++-- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49792c178dbf2..fbe30119cb0ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - Add limit on number of processors in Ingest pipelines([#15460](https://github.com/opensearch-project/OpenSearch/issues/15460)). - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) +- Add limit on number of processors in Ingest pipelines([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index e4e21b7654e79..fa241876f86cd 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -152,8 +152,10 @@ public void setup() { public void testIngestPlugin() { Client client = mock(Client.class); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); IngestService ingestService = new IngestService( - mock(ClusterService.class), + clusterService, threadPool, null, null, @@ -187,8 +189,10 @@ public void testIngestPluginDuplicate() { public void testExecuteIndexPipelineDoesNotExist() { Client client = mock(Client.class); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); IngestService ingestService = new IngestService( - mock(ClusterService.class), + clusterService, threadPool, null, null, @@ -721,6 +725,32 @@ public void testValidate() throws Exception { ingestService.validatePipeline(ingestInfos, putRequest); } + public void testValidateProcessorCountForIngestPipelineThrowsException(){ + IngestService ingestService = createWithProcessors(); + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id", + new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," + + "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}" + ), + MediaTypeRegistry.JSON + ); + + DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("_node_id2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + Map ingestInfos = new HashMap<>(); + ingestInfos.put(node1, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); + ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set")))); + + Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 1).build(); + ingestService.getClusterService().getClusterSettings().applySettings(newSettings); + + IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> ingestService.validatePipeline(ingestInfos, putRequest) + ); + } + public void testExecuteIndexPipelineExistsButFailedParsing() { IngestService ingestService = createWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> new AbstractProcessor("mock", "description") { @@ -1507,8 +1537,10 @@ public Map getProcessors(Processor.Parameters paramet // Create ingest service: Client client = mock(Client.class); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); IngestService ingestService = new IngestService( - mock(ClusterService.class), + clusterService, threadPool, null, null, @@ -2106,7 +2138,9 @@ private static IngestService createWithProcessors(Map ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); - return new IngestService(mock(ClusterService.class), threadPool, null, null, null, Collections.singletonList(new IngestPlugin() { + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + return new IngestService(clusterService, threadPool, null, null, null, Collections.singletonList(new IngestPlugin() { @Override public Map getProcessors(final Processor.Parameters parameters) { return processors; From 2b3f7c808c7214b9907ffe7510be49ad81cc4dec Mon Sep 17 00:00:00 2001 From: anand kumar rai Date: Thu, 29 Aug 2024 12:21:59 +0530 Subject: [PATCH 04/12] Update CHANGELOG.md Signed-off-by: anand kumar rai --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fbe30119cb0ef..3eae2218a90bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381)) - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) -- Add limit on number of processors in Ingest pipelines([#15460](https://github.com/opensearch-project/OpenSearch/issues/15460)). - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) - Add limit on number of processors in Ingest pipelines([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). From 2390dbc72e5e960314abaaed8d604aaa8225f521 Mon Sep 17 00:00:00 2001 From: Rai Date: Thu, 29 Aug 2024 13:30:42 +0530 Subject: [PATCH 05/12] Java spotless fix Signed-off-by: Rai --- .../opensearch/ingest/IngestServiceTests.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index fa241876f86cd..cc0a78ce761a9 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -153,7 +153,9 @@ public void setup() { public void testIngestPlugin() { Client client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); IngestService ingestService = new IngestService( clusterService, threadPool, @@ -190,7 +192,9 @@ public void testIngestPluginDuplicate() { public void testExecuteIndexPipelineDoesNotExist() { Client client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); IngestService ingestService = new IngestService( clusterService, threadPool, @@ -725,7 +729,7 @@ public void testValidate() throws Exception { ingestService.validatePipeline(ingestInfos, putRequest); } - public void testValidateProcessorCountForIngestPipelineThrowsException(){ + public void testValidateProcessorCountForIngestPipelineThrowsException() { IngestService ingestService = createWithProcessors(); PutPipelineRequest putRequest = new PutPipelineRequest( "_id", @@ -745,10 +749,7 @@ public void testValidateProcessorCountForIngestPipelineThrowsException(){ Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 1).build(); ingestService.getClusterService().getClusterSettings().applySettings(newSettings); - IllegalStateException e = expectThrows( - IllegalStateException.class, - () -> ingestService.validatePipeline(ingestInfos, putRequest) - ); + IllegalStateException e = expectThrows(IllegalStateException.class, () -> ingestService.validatePipeline(ingestInfos, putRequest)); } public void testExecuteIndexPipelineExistsButFailedParsing() { @@ -1538,7 +1539,9 @@ public Map getProcessors(Processor.Parameters paramet // Create ingest service: Client client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); IngestService ingestService = new IngestService( clusterService, threadPool, @@ -2139,7 +2142,9 @@ private static IngestService createWithProcessors(Map when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); return new IngestService(clusterService, threadPool, null, null, null, Collections.singletonList(new IngestPlugin() { @Override public Map getProcessors(final Processor.Parameters parameters) { From 0a30192ea42ab6733433b20df7c3e3cc9e728832 Mon Sep 17 00:00:00 2001 From: Rai Date: Fri, 30 Aug 2024 19:01:34 +0530 Subject: [PATCH 06/12] Added a check on the onfailures processor Signed-off-by: Rai --- .../org/opensearch/ingest/IngestService.java | 2 +- .../opensearch/ingest/IngestServiceTests.java | 97 ++++++++++++++++++- 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 99b0774df7009..0315a960dae92 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -531,7 +531,7 @@ void validatePipeline(Map ingestInfos, PutPipelineReq } public void validateProcessorCountForIngestPipeline(Pipeline pipeline) { - List processors = pipeline.getCompoundProcessor().getProcessors(); + List processors = pipeline.flattenAllProcessors(); if (processors.size() > maxIngestProcessorCount) { throw new IllegalStateException( diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index cc0a78ce761a9..e1e1ea561284b 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -749,7 +749,102 @@ public void testValidateProcessorCountForIngestPipelineThrowsException() { Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 1).build(); ingestService.getClusterService().getClusterSettings().applySettings(newSettings); - IllegalStateException e = expectThrows(IllegalStateException.class, () -> ingestService.validatePipeline(ingestInfos, putRequest)); + expectThrows(IllegalStateException.class, () -> ingestService.validatePipeline(ingestInfos, putRequest)); + } + + public void testValidateProcessorCountForWithNestedOnFailureProcessorThrowsException() { + IngestService ingestService = createWithProcessors(); + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id", + new BytesArray( + "{\n" + + " \"processors\": [\n" + + " {\n" + + " \"set\": {\n" + + " \"field\": \"timestamp_field_1\",\n" + + " \"value\": \"value\",\n" + + " \"on_failure\": [\n" + + " {\n" + + " \"set\": {\n" + + " \"field\": \"ingest_error1\",\n" + + " \"value\": \"failed\",\n" + + " \"tag\": \"tagggg\",\n" + + " \"on_failure\": [\n" + + " {\n" + + " \"set\": {\n" + + " \"field\": \"ingest_error1\",\n" + + " \"value\": \"failed\",\n" + + " \"tag\": \"tagggg\",\n" + + " \"on_failure\": [\n" + + " {\n" + + " \"set\": {\n" + + " \"field\": \"ingest_error1\",\n" + + " \"value\": \"failed\",\n" + + " \"tag\": \"tagggg\",\n" + + " \"on_failure\": [\n" + + " {\n" + + " \"set\": {\n" + + " \"field\": \"ingest_error1\",\n" + + " \"value\": \"failed\",\n" + + " \"tag\": \"tagggg\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"set\": {\n" + + " \"field\": \"ingest_error2\",\n" + + " \"value\": \"failed\",\n" + + " \"tag\": \"tagggg\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " {\n" + + " \"set\": {\n" + + " \"field\": \"ingest_error2\",\n" + + " \"value\": \"failed\",\n" + + " \"tag\": \"tagggg\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " {\n" + + " \"set\": {\n" + + " \"field\": \"ingest_error2\",\n" + + " \"value\": \"failed\",\n" + + " \"tag\": \"tagggg\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " {\n" + + " \"set\": {\n" + + " \"field\": \"ingest_error2\",\n" + + " \"value\": \"failed\",\n" + + " \"tag\": \"tagggg\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + " ]\n" + + "}" + ), + MediaTypeRegistry.JSON + ); + + DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("_node_id2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + Map ingestInfos = new HashMap<>(); + ingestInfos.put(node1, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); + ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set")))); + + Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 7).build(); + ingestService.getClusterService().getClusterSettings().applySettings(newSettings); + + expectThrows(IllegalStateException.class, () -> ingestService.validatePipeline(ingestInfos, putRequest)); } public void testExecuteIndexPipelineExistsButFailedParsing() { From d01e0740d26e306c6f60ca79bb0137369e7cb274 Mon Sep 17 00:00:00 2001 From: Rai Date: Mon, 2 Sep 2024 11:26:52 +0530 Subject: [PATCH 07/12] added test for StimulateExecutionService Signed-off-by: Rai --- .../ingest/SimulateExecutionService.java | 8 +++++- .../SimulatePipelineTransportAction.java | 4 +-- .../ingest/SimulateExecutionServiceTests.java | 27 ++++++++++++++++++- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java index c7c0f21eb0876..459466f8c8ab6 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java @@ -36,6 +36,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.ingest.CompoundProcessor; import org.opensearch.ingest.IngestDocument; +import org.opensearch.ingest.IngestService; import org.opensearch.ingest.Pipeline; import org.opensearch.threadpool.ThreadPool; @@ -56,9 +57,11 @@ class SimulateExecutionService { private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT; private final ThreadPool threadPool; + private final IngestService ingestService; - SimulateExecutionService(ThreadPool threadPool) { + SimulateExecutionService(ThreadPool threadPool, IngestService ingestService) { this.threadPool = threadPool; + this.ingestService = ingestService; } void executeDocument( @@ -91,6 +94,9 @@ void executeDocument( } public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { + + ingestService.validateProcessorCountForIngestPipeline(request.getPipeline()); + threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { final AtomicInteger counter = new AtomicInteger(); final List responses = new CopyOnWriteArrayList<>( diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java index eff038b109aea..5eeb09c4d50c0 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java @@ -69,7 +69,7 @@ public SimulatePipelineTransportAction( (Writeable.Reader) SimulatePipelineRequest::new ); this.ingestService = ingestService; - this.executionService = new SimulateExecutionService(threadPool); + this.executionService = new SimulateExecutionService(threadPool, ingestService); } @Override @@ -88,8 +88,6 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe return; } - ingestService.validateProcessorCountForIngestPipeline(simulateRequest.getPipeline()); - executionService.execute(simulateRequest, listener); } } diff --git a/server/src/test/java/org/opensearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/opensearch/action/ingest/SimulateExecutionServiceTests.java index a5a082286f123..fd0a6ad36c31a 100644 --- a/server/src/test/java/org/opensearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/opensearch/action/ingest/SimulateExecutionServiceTests.java @@ -39,6 +39,7 @@ import org.opensearch.ingest.DropProcessor; import org.opensearch.ingest.IngestDocument; import org.opensearch.ingest.IngestProcessorException; +import org.opensearch.ingest.IngestService; import org.opensearch.ingest.Pipeline; import org.opensearch.ingest.Processor; import org.opensearch.ingest.RandomDocumentPicks; @@ -67,6 +68,8 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; public class SimulateExecutionServiceTests extends OpenSearchTestCase { @@ -75,11 +78,13 @@ public class SimulateExecutionServiceTests extends OpenSearchTestCase { private TestThreadPool threadPool; private SimulateExecutionService executionService; private IngestDocument ingestDocument; + private IngestService ingestService; @Before public void setup() { + ingestService = mock(IngestService.class); threadPool = new TestThreadPool(SimulateExecutionServiceTests.class.getSimpleName()); - executionService = new SimulateExecutionService(threadPool); + executionService = new SimulateExecutionService(threadPool, ingestService); ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); } @@ -400,6 +405,26 @@ public String getType() { } } + public void testValidateProcessorCountForIngestPipelineThrowsException() { + + int numDocs = randomIntBetween(1, 64); + List documents = new ArrayList<>(numDocs); + for (int id = 0; id < numDocs; id++) { + documents.add(new IngestDocument("_index", Integer.toString(id), null, 0L, VersionType.INTERNAL, new HashMap<>())); + } + + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor()); + SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false); + + AtomicReference responseHolder = new AtomicReference<>(); + AtomicReference errorHolder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + doThrow(new IllegalStateException()).when(ingestService).validateProcessorCountForIngestPipeline(pipeline); + + expectThrows(IllegalStateException.class, () -> executionService.execute(request, ActionListener.wrap(response -> {}, e -> {}))); + } + private static void assertVerboseResult( SimulateProcessorResult result, String expectedPipelineId, From eda0b1ae32e58ab30b2cbb819253a997435f2933 Mon Sep 17 00:00:00 2001 From: Rai Date: Mon, 2 Sep 2024 11:35:42 +0530 Subject: [PATCH 08/12] added test for StimulateExecutionService Signed-off-by: Rai --- .../action/ingest/SimulateExecutionServiceTests.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/test/java/org/opensearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/opensearch/action/ingest/SimulateExecutionServiceTests.java index fd0a6ad36c31a..a26afeee0f912 100644 --- a/server/src/test/java/org/opensearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/opensearch/action/ingest/SimulateExecutionServiceTests.java @@ -416,10 +416,6 @@ public void testValidateProcessorCountForIngestPipelineThrowsException() { Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor()); SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false); - AtomicReference responseHolder = new AtomicReference<>(); - AtomicReference errorHolder = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - doThrow(new IllegalStateException()).when(ingestService).validateProcessorCountForIngestPipeline(pipeline); expectThrows(IllegalStateException.class, () -> executionService.execute(request, ActionListener.wrap(response -> {}, e -> {}))); From abe72991c5921cfcc1afacf8099ecfe4517fe02e Mon Sep 17 00:00:00 2001 From: anand kumar rai Date: Mon, 2 Sep 2024 13:47:07 +0530 Subject: [PATCH 09/12] Update CHANGELOG.md Signed-off-by: anand kumar rai --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df45b26c3a635..7d4b3fdaf8a48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) -- Add limit on number of processors in Ingest pipelines([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). +- Add limit on number of processors in Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) - Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454)) - [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428))) From f41573b3777af060323facf01bb9d4fb3f6839fd Mon Sep 17 00:00:00 2001 From: Rai Date: Mon, 2 Sep 2024 23:43:32 +0530 Subject: [PATCH 10/12] Trigger build Signed-off-by: Rai --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c08eb26021b34..9f9843e931399 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) -- Add limit on number of processors in Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). +- Add limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) - Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454)) - [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428))) From cf83ca7918527246383e4736bb774ddb612e317d Mon Sep 17 00:00:00 2001 From: Rai Date: Tue, 3 Sep 2024 00:58:52 +0530 Subject: [PATCH 11/12] rebase Signed-off-by: Rai --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f9843e931399..ad6ad7269060b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) -- Add limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). +- Add a limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) - Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454)) - [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428))) From 9149544e29679f29a94a14742101e1d9fe2b9e7b Mon Sep 17 00:00:00 2001 From: Rai Date: Tue, 3 Sep 2024 02:36:19 +0530 Subject: [PATCH 12/12] Retry build Signed-off-by: Rai --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad6ad7269060b..9f9843e931399 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) -- Add a limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). +- Add limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)). - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) - Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454)) - [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))