Skip to content

Commit 25f539e

Browse files
original-brownbearjakelandis
authored andcommitted
INGEST: Allow Repeated Invocation of Pipeline (#33419)
* Allows repeated, non-recursive invocation of the same pipeline
1 parent cf0bb4c commit 25f539e

File tree

2 files changed

+23
-3
lines changed

2 files changed

+23
-3
lines changed

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,20 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception {
113113
"Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage()
114114
);
115115
}
116+
117+
public void testAllowsRepeatedPipelineInvocations() throws Exception {
118+
String innerPipelineId = "inner";
119+
IngestService ingestService = mock(IngestService.class);
120+
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
121+
Map<String, Object> outerConfig = new HashMap<>();
122+
outerConfig.put("pipeline", innerPipelineId);
123+
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
124+
Pipeline inner = new Pipeline(
125+
innerPipelineId, null, null, new CompoundProcessor()
126+
);
127+
when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
128+
Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig);
129+
outerProc.execute(testIngestDocument);
130+
outerProc.execute(testIngestDocument);
131+
}
116132
}

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -649,10 +649,14 @@ private static Object deepCopy(Object value) {
649649
* @throws Exception On exception in pipeline execution
650650
*/
651651
public IngestDocument executePipeline(Pipeline pipeline) throws Exception {
652-
if (this.executedPipelines.add(pipeline) == false) {
653-
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
652+
try {
653+
if (this.executedPipelines.add(pipeline) == false) {
654+
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
655+
}
656+
return pipeline.execute(this);
657+
} finally {
658+
executedPipelines.remove(pipeline);
654659
}
655-
return pipeline.execute(this);
656660
}
657661

658662
@Override

0 commit comments

Comments
 (0)